Finology 大数据金融

通过大数据以量化金融

Spring Boot 2.1.x集成Kafka,其他版本也是类似方法。

确定Kafka版本

在Spring Boot中添加Kafka Client依赖。

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

进入此依赖,查看kafka-clients版本号。

1
2
3
4
5
6
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
<scope>compile</scope>
</dependency>

所以在安装Kafka的时候,选择2.0.1版本就可以了。Scala版本选择最高版本就行。

安装Kafka集群

zookeeper单机,Kafka在同一台机器上安装两个实例。

config/server.properties里面修改如下属性。

broker.id=0

Broker的id号,第一个实例为0,第二个实例为1。

listeners=PLAINTEXT://:9092

第一个实例用默认端口9092,第二个实例用9093。

advertised.listeners=PLAINTEXT://{host-ip}:9092

第一个实例用默认端口9092,第二个实例用9093。

如果不设置上述暴露给生产者和消费者的地址和端口,可能会出现如下错误:

1
java.io.IOException: Can't resolve address

网上有说到解决这个问题的办法是在hosts里面配置主机的ip地址,个人觉得这个办法不是最佳实践。

log.dirs=/tmp/kafka-logs

单机时,这个文件夹得用不同的。

启动zookeeper和kafka我就不在这里讲了。第二个实例启动后可以看到如下信息。

1
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

Spring Boot集成

在application.yml里面需要配置如下:

1
2
3
4
5
spring:
kafka:
bootstrap-servers:
- {host-ip}:9092
- {host-ip}:9093

我们再看看默认配置的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
[16:38:24.889] INFO  org.apache.kafka.common.config.AbstractConfig 279 logAll - ProducerConfig values: 
acks = 1
batch.size = 16384
bootstrap.servers = [{host-ip}:9092, {host-ip}:9093:9093]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Java代码

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@GetMapping("/kafka/{message}")
public void sendMessage(@PathVariable String message) {
kafkaTemplate.send("test-topic", message);
}
}

通过console-consumer就可以消费消息了。

parent-child关系,类似于nested模型,区别在于,nested objects文档中,所有对象都在同一个文档中,而parent-child关系文档中,父对象和子
对象都是完全独立的文档。

使用场景:子文档数量较多,并且子文档创建和修改的频率高时。

与nested objects相比,parent-child关系的优势有:

更新父文档时,不会重新索引子文档。
创建,修改或删除子文档时,不会影响父文档或其他子文档。
子文档可以作为搜索结果独立返回。

对父-子文档关系有个限制条件:父文档和其所有子文档,都必须要存储在同一个分片中。

Parent-Child关系文档映射

建立父-子文档映射关系时只需要指定某一个文档 type 是另一个文档 type 的父亲。 该关系可以在如下两个时间点设置:1)创建索引时;2)在子文档 type 创建之前更新父文档的 mapping。

举例说明,有一个公司在多个城市有分公司,并且每一个分公司下面都有很多员工。

在创建员工 employee 文档 type 时,指定分公司 branch 的文档 type 为其父亲。

1
2
3
4
5
6
7
8
9
10
11
PUT /company
{
"mappings": {
"branch": {},
"employee": {
"_parent": {
"type": "branch"
}
}
}
}

employee 文档 是 branch 文档的子文档。

构建Parent-Child文档索引

为父文档创建索引与为普通文档创建索引没有区别。父文档并不需要知道它有哪些子文档。

1
2
3
4
5
6
7
POST /company/branch/_bulk
{ "index": { "_id": "london" }}
{ "name": "London Westminster", "city": "London", "country": "UK" }
{ "index": { "_id": "liverpool" }}
{ "name": "Liverpool Central", "city": "Liverpool", "country": "UK" }
{ "index": { "_id": "paris" }}
{ "name": "Champs Élysées", "city": "Paris", "country": "France" }

创建子文档时,用户必须要通过 parent 参数来指定该子文档的父文档 ID:

1
2
3
4
5
6
PUT /company/employee/1?parent=london 
{
"name": "Alice Smith",
"dob": "1970-10-24",
"hobby": "hiking"
}

当前 employee 文档的父文档 ID 是 london 。如此保证了父文档和子文档都在同一个分片上。

分片路由的计算公式如下:

shard = hash(routing) % number_of_primary_shards

如果指定了父文档的 ID,那么就会使用父文档的 ID 进行路由,而不会使用当前文档 _id。也就是说,如果父文档和子文档都使用相同的值进行路由,那么父文档和子文档都会确定分布在同一个分片上。

在执行单文档的请求时需要指定父文档的 ID,单文档请求包括:通过 GET 请求获取一个子文档;创建、更新或删除一个子文档。而执行搜索请求时是不需要指定父文档的ID,这是因为搜索请求是向一个索引中的所有分片发起请求,而单文档的操作是只会向存储该文档的分片发送请求。因此,如果操作单个子文档时不指定父文档的 ID,那么很有可能会把请求发送到错误的分片上。

1
2
3
4
5
6
7
POST /company/employee/_bulk
{ "index": { "_id": 2, "parent": "london" }}
{ "name": "Mark Thomas", "dob": "1982-05-16", "hobby": "diving" }
{ "index": { "_id": 3, "parent": "liverpool" }}
{ "name": "Barry Smith", "dob": "1979-04-01", "hobby": "hiking" }
{ "index": { "_id": 4, "parent": "paris" }}
{ "name": "Adrien Grand", "dob": "1987-05-11", "hobby": "horses" }

如果你想要改变一个子文档的 parent 值,仅通过更新这个子文档是不够的,因为新的父文档有可能在另外一个分片上。因此,你必须要先把子文档删除,然后再重新索引这个子文档。

案例:一篇博客对应多个评论。

字段类型由object改为nested即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT /my_index
{
"mappings": {
"blogpost": {
"properties": {
"comments": {
"type": "nested",
"properties": {
"name": { "type": "string" },
"comment": { "type": "string" },
"age": { "type": "short" },
"stars": { "type": "short" },
"date": { "type": "date" }
}
}
}
}
}
}

嵌套对象的查询

嵌套对象被索引在独立隐藏的文档中,所以必须使用nested查询去获取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
GET /my_index/blogpost/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"title": "eggs"
}
},
{
"nested": {
"path": "comments",
"query": {
"bool": {
"must": [
{
"match": {
"comments.name": "john"
}
},
{
"match": {
"comments.age": 28
}
}
]
}
}
}
}
]
}
}
}

nested子句作用于嵌套字段comments。comments.name和comments.age子句操作在同一个嵌套文档中。

nested查询还可以多层嵌套。

默认情况下,根文档的分数是这些嵌套文档分数的平均值。可以通过设置 score_mode 参数来控制这个得分策略,相关策略有 avg (平均值), max (最大值), sum (加和) 和 none (直接返回 1.0 常数值分数)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
GET /my_index/blogpost/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"title": "eggs"
}
},
{
"nested": {
"path": "comments",
"score_mode": "max",
"query": {
"bool": {
"must": [
{
"match": {
"comments.name": "john"
}
},
{
"match": {
"comments.age": 28
}
}
]
}
}
}
}
]
}
}
}

如果nested查询放在filter子句中,则score_mode参数不再生效,因为filter不是打分查询。

使用嵌套字段排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PUT /my_index/blogpost/2
{
"title": "Investment secrets",
"body": "What they don't tell you ...",
"tags": [ "shares", "equities" ],
"comments": [
{
"name": "Mary Brown",
"comment": "Lies, lies, lies",
"age": 42,
"stars": 1,
"date": "2014-10-18"
},
{
"name": "John Smith",
"comment": "You're making it up!",
"age": 28,
"stars": 2,
"date": "2014-10-16"
}
]
}

查询某个时间范围内,有评论的文章,并按stars升序排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
GET /_search
{
"query": {
"nested": {
"path": "comments",
"filter": {
"range": {
"comments.date": {
"gte": "2014-10-01",
"lt": "2014-11-01"
}
}
}
}
},
"sort": {
"comments.stars": {
"order": "asc",
"mode": "min",
"nested_path": "comments",
"nested_filter": {
"range": {
"comments.date": {
"gte": "2014-10-01",
"lt": "2014-11-01"
}
}
}
}
}
}

sort排序子句中的nested_path和nested_filter的查询条件和上面的path、filter重复。原因在于,排序发生在查询执行之后。我们是按某一时间范围
的stars数排序,而不是按所有时间的stars排序。比如blog1的在10月stars:1, blog2在10月stars:2,但blog1的所有stars:20,blog2的所有stars:10。
他们最后的结果是不一样的。

嵌套聚合

https://www.elastic.co/guide/cn/elasticsearch/guide/current/nested-aggregation.html

0%