Finology 大数据金融

通过大数据以量化金融

MySQL和MongoDB概念对比

在mongodb中基本的概念是文档、集合、数据库。

SQL术语/概念 MongoDB术语/概念 解释/说明
atabase database 数据库
table collection 数据库表/集合
row document 数据记录行/文档
column field 数据字段/域
index index 索引
table joins 表连接, MongoDB不支持
primary key primary key 主键, MongoDB自动将_id字段设置为主键

数据库

进入MongoDB shell

1
2
3
4
bin/mongo
MongoDB shell version v4.0.2
connecting to: mongodb://127.0.0.1:27017
MongoDB server version: 4.0.2

查看所有的数据库

1
2
3
4
5
> show dbs
admin 0.000GB
config 0.000GB
local 0.000GB
mongo1-db 0.000GB

执行db命令可以显示当前数据库

1
2
> db
local

运行use命令,可以连接到一个指定的数据库。

1
2
> use mongo1-db
switched to db mongo1-db

ObjectId

ObjectId 类似唯一主键,可以很快的去生成和排序,包含 12 bytes,含义是:

前 4 个字节表示创建 unix 时间戳,格林尼治时间 UTC 时间,比北京时间晚了 8 个小时

接下来的 3 个字节是机器标识码

紧接的两个字节由进程 id 组成 PID

最后三个字节是随机数

MongoDB 中存储的文档必须有一个 _id 键。这个键的值可以是任何类型的,默认是个 ObjectId 对象

由于 ObjectId 中保存了创建的时间戳,所以你不需要为你的文档保存时间戳字段,你可以通过 getTimestamp 函数来获取文档的创建时间:

1
2
3
4
5
6
> var newObj = ObjectId()
> newObj.getTimestamp()
ISODate("2019-02-06T16:01:21Z")

> newObj.str
5c5b04d1dd377f0cd58338aa

什么叫NoSQL

NoSQL(NoSQL = Not Only SQL ),意即”不仅仅是SQL”。

在现代的计算系统上每天网络上都会产生庞大的数据量。

这些数据有很大一部分是由关系数据库管理系统(RDMBSs)来处理。 1970年 E.F.Codd’s提出的关系模型的论文 “A relational model of data for large shared data banks”,这使得数据建模和应用程序编程更加简单。

通过应用实践证明,关系模型是非常适合于客户服务器编程,远远超出预期的利益,今天它是结构化数据存储在网络和商务应用的主导技术。

NoSQL 是一项全新的数据库革命性运动,早期就有人提出,发展至2009年趋势越发高涨。NoSQL的拥护者们提倡运用非关系型的数据存储,相对于铺天盖地的关系型数据库运用,这一概念无疑是一种全新的思维的注入。

关系型数据库遵循ACID规则

事务在英文中是transaction,和现实世界中的交易很类似,它有如下四个特性:

  1. A (Atomicity) 原子性
    原子性很容易理解,也就是说事务里的所有操作要么全部做完,要么都不做,事务成功的条件是事务里的所有操作都成功,只要有一个操作失败,整个事务就失败,需要回滚。
    比如银行转账,从A账户转100元至B账户,分为两个步骤:1)从A账户取100元;2)存入100元至B账户。这两步要么一起完成,要么一起不完成,如果只完成第一步,第二步失败,钱会莫名其妙少了100元。

  2. C (Consistency) 一致性
    一致性也比较容易理解,也就是说数据库要一直处于一致的状态,事务的运行不会改变数据库原本的一致性约束。
    例如现有完整性约束a+b=10,如果一个事务改变了a,那么必须得改变b,使得事务结束后依然满足a+b=10,否则事务失败。

  3. I (Isolation) 独立性
    所谓的独立性是指并发的事务之间不会互相影响,如果一个事务要访问的数据正在被另外一个事务修改,只要另外一个事务未提交,它所访问的数据就不受未提交事务的影响。
    比如现有有个交易是从A账户转100元至B账户,在这个交易还未完成的情况下,如果此时B查询自己的账户,是看不到新增加的100元的。

  4. D (Durability) 持久性
    持久性是指一旦事务提交后,它所做的修改将会永久的保存在数据库上,即使出现宕机也不会丢失。

RDBMS vs NoSQL

RDBMS

  • 高度组织化结构化数据
  • 结构化查询语言(SQL) (SQL)
  • 数据和关系都存储在单独的表中。
  • 数据操纵语言,数据定义语言
  • 严格的一致性
  • 基础事务

NoSQL

  • 代表着不仅仅是SQL
  • 没有声明性查询语言
  • 没有预定义的模式
  • 键 - 值对存储,列存储,文档存储,图形数据库
  • 最终一致性,而非ACID属性
  • 非结构化和不可预知的数据
  • CAP定理
  • 高性能,高可用性和可伸缩性

CAP定理(CAP theorem)

在计算机科学中, CAP定理(CAP theorem), 又被称作 布鲁尔定理(Brewer’s theorem), 它指出对于一个分布式计算系统来说,不可能同时满足以下三点:

一致性(Consistency) (所有节点在同一时间具有相同的数据)

可用性(Availability) (保证每个请求不管成功或者失败都有响应)

分隔容忍(Partition tolerance) (系统中任意信息的丢失或失败不会影响系统的继续运作)

CAP理论的核心是:一个分布式系统不可能同时很好的满足一致性,可用性和分区容错性这三个需求,最多只能同时较好的满足两个。

因此,根据 CAP 原理将 NoSQL 数据库分成了满足 CA 原则、满足 CP 原则和满足 AP 原则三 大类:

CA - 单点集群,满足一致性,可用性的系统,通常在可扩展性上不太强大。

CP - 满足一致性,分区容忍性的系统,通常性能不是特别高。

AP - 满足可用性,分区容忍性的系统,通常可能对一致性要求低一些。

NoSQL 数据库分类

类型 部分代表 特点
列存储 Hbase, Cassandra, Hypertable 顾名思义,是按列存储数据的。最大的特点是方便存储结构化和半结构化数据,方便做数据压缩,对针对某一列或者某几列的查询有非常大的IO优势。
文档存储 MongoDB, CouchDB 文档存储一般用类似json的格式存储,存储的内容是文档型的。这样也就有有机会对某些字段建立索引,实现关系数据库的某些功能。
key-value存储 Tokyo Cabinet / Tyrant, Berkeley DB, MemcacheDB, Redis 可以通过key快速查询到其value。一般来说,存储不管value的格式,照单全收。(Redis包含了其他功能)
图存储 Neo4J, FlockDB 图形关系的最佳存储。使用传统关系数据库来解决的话性能低下,而且设计使用不方便。
对象存储 db4o, Versant 通过类似面向对象语言的语法操作数据库,通过对象的方式存取数据。
xml数据库 Berkeley DB XML, BaseX 高效的存储XML数据,并支持XML的内部查询语法,比如XQuery,Xpath

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

这篇文章将描述运行Spark官网上一个案例程序。

执行spark-submit命令,查看命令使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cd $SPARK_HOME

bin/spark-submit

Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
...

运行NetworkWordCount案例,我们需要指定master, class参数。

1
2
3
bin/spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount examples/jars/spark-examples_2.11-2.2.3.jar

Usage: NetworkWordCount <hostname> <port>

终端又给我们使用方法的提示,需要提供网络服务器地址hostname和端口port

如果网络上还没有提供服务时,执行上述命令,则会遇到如下错误。

1
2
19/01/24 10:49:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting
to localhost:9999 - java.net.ConnectException: Connection refused (Connection refused)

所以,另开一个Shell终端,启动Netcat, 在端口9999上提供服务

1
nc -lk 9999

继续执行NetworkWordCount案例

1
2
3
4
5
6
7
8
9
10
11
bin/spark-submit --master local[2] --class org.apache.spark.examples.streaming.NetworkWordCount examples/jars/spark-examples_2.11-2.2.3.jar localhost 9999

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/24 11:04:46 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
19/01/24 11:04:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/24 11:04:47 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
19/01/24 11:04:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/24 11:04:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
-------------------------------------------
Time: 1548299089000 ms
-------------------------------------------

我们在Netcat终端上输入几个单词,看看统计的结果。

1
hello hello spark spark spark

很快的,我们在spark终端上就能看到统计的结果了

1
2
3
4
5
6

-------------------------------------------
Time: 1548298250000 ms
-------------------------------------------
(hello,2)
(spark,3)

我们再换一种方式,通过spark-shell的方式,输入代码,运行一下单词统计的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bin/spark-shell --master local[2]

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/01/24 12:45:00 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.1.112 instead (on interface en0)
19/01/24 12:45:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/24 12:45:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.112:4040
Spark context available as 'sc' (master = local[2], app id = local-1548305101841).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.3
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

我们按exmaples案例,输入以下代码。源代码可参看 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

1
2
3
4
5
6
7
8
9
10
11
scala>

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

netcat终端输入

1
hello hello spark spark spark

我们一样能看到统计的结果

1
2
3
4
5
6

-------------------------------------------
Time: 1548305187000 ms
-------------------------------------------
(hello,2)
(spark,3)
0%