0%

查询语句没有经过测试,暂时做个记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"query": {
"function_score": {
"query": {
"match": {
"title": "搜索内容"
}
},
"functions": [
{
"script_score": {
"script": "return doc['flag'].value == TRUE ? 1 : 0"
}
},
{
"gauss": {
"start_time": {
"origin": "$now",
"offset": "1w",
"scale": "1m"
}
}
}
],
"score_mode": "sum",
"boost_mode": "multiply"
}
}
}

Spring Boot 2.1.x集成Kafka,其他版本也是类似方法。

确定Kafka版本

在Spring Boot中添加Kafka Client依赖。

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

进入此依赖,查看kafka-clients版本号。

1
2
3
4
5
6
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.1</version>
<scope>compile</scope>
</dependency>

所以在安装Kafka的时候,选择2.0.1版本就可以了。Scala版本选择最高版本就行。

安装Kafka集群

zookeeper单机,Kafka在同一台机器上安装两个实例。

config/server.properties里面修改如下属性。

broker.id=0

Broker的id号,第一个实例为0,第二个实例为1。

listeners=PLAINTEXT://:9092

第一个实例用默认端口9092,第二个实例用9093。

advertised.listeners=PLAINTEXT://{host-ip}:9092

第一个实例用默认端口9092,第二个实例用9093。

如果不设置上述暴露给生产者和消费者的地址和端口,可能会出现如下错误:

1
java.io.IOException: Can't resolve address

网上有说到解决这个问题的办法是在hosts里面配置主机的ip地址,个人觉得这个办法不是最佳实践。

log.dirs=/tmp/kafka-logs

单机时,这个文件夹得用不同的。

启动zookeeper和kafka我就不在这里讲了。第二个实例启动后可以看到如下信息。

1
INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

Spring Boot集成

在application.yml里面需要配置如下:

1
2
3
4
5
spring:
kafka:
bootstrap-servers:
- {host-ip}:9092
- {host-ip}:9093

我们再看看默认配置的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
[16:38:24.889] INFO  org.apache.kafka.common.config.AbstractConfig 279 logAll - ProducerConfig values: 
acks = 1
batch.size = 16384
bootstrap.servers = [{host-ip}:9092, {host-ip}:9093:9093]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

Java代码

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@GetMapping("/kafka/{message}")
public void sendMessage(@PathVariable String message) {
kafkaTemplate.send("test-topic", message);
}
}

通过console-consumer就可以消费消息了。

parent-child关系,类似于nested模型,区别在于,nested objects文档中,所有对象都在同一个文档中,而parent-child关系文档中,父对象和子
对象都是完全独立的文档。

使用场景:子文档数量较多,并且子文档创建和修改的频率高时。

与nested objects相比,parent-child关系的优势有:

更新父文档时,不会重新索引子文档。
创建,修改或删除子文档时,不会影响父文档或其他子文档。
子文档可以作为搜索结果独立返回。

对父-子文档关系有个限制条件:父文档和其所有子文档,都必须要存储在同一个分片中。

Parent-Child关系文档映射

建立父-子文档映射关系时只需要指定某一个文档 type 是另一个文档 type 的父亲。 该关系可以在如下两个时间点设置:1)创建索引时;2)在子文档 type 创建之前更新父文档的 mapping。

举例说明,有一个公司在多个城市有分公司,并且每一个分公司下面都有很多员工。

在创建员工 employee 文档 type 时,指定分公司 branch 的文档 type 为其父亲。

1
2
3
4
5
6
7
8
9
10
11
PUT /company
{
"mappings": {
"branch": {},
"employee": {
"_parent": {
"type": "branch"
}
}
}
}

employee 文档 是 branch 文档的子文档。

构建Parent-Child文档索引

为父文档创建索引与为普通文档创建索引没有区别。父文档并不需要知道它有哪些子文档。

1
2
3
4
5
6
7
POST /company/branch/_bulk
{ "index": { "_id": "london" }}
{ "name": "London Westminster", "city": "London", "country": "UK" }
{ "index": { "_id": "liverpool" }}
{ "name": "Liverpool Central", "city": "Liverpool", "country": "UK" }
{ "index": { "_id": "paris" }}
{ "name": "Champs Élysées", "city": "Paris", "country": "France" }

创建子文档时,用户必须要通过 parent 参数来指定该子文档的父文档 ID:

1
2
3
4
5
6
PUT /company/employee/1?parent=london 
{
"name": "Alice Smith",
"dob": "1970-10-24",
"hobby": "hiking"
}

当前 employee 文档的父文档 ID 是 london 。如此保证了父文档和子文档都在同一个分片上。

分片路由的计算公式如下:

shard = hash(routing) % number_of_primary_shards

如果指定了父文档的 ID,那么就会使用父文档的 ID 进行路由,而不会使用当前文档 _id。也就是说,如果父文档和子文档都使用相同的值进行路由,那么父文档和子文档都会确定分布在同一个分片上。

在执行单文档的请求时需要指定父文档的 ID,单文档请求包括:通过 GET 请求获取一个子文档;创建、更新或删除一个子文档。而执行搜索请求时是不需要指定父文档的ID,这是因为搜索请求是向一个索引中的所有分片发起请求,而单文档的操作是只会向存储该文档的分片发送请求。因此,如果操作单个子文档时不指定父文档的 ID,那么很有可能会把请求发送到错误的分片上。

1
2
3
4
5
6
7
POST /company/employee/_bulk
{ "index": { "_id": 2, "parent": "london" }}
{ "name": "Mark Thomas", "dob": "1982-05-16", "hobby": "diving" }
{ "index": { "_id": 3, "parent": "liverpool" }}
{ "name": "Barry Smith", "dob": "1979-04-01", "hobby": "hiking" }
{ "index": { "_id": 4, "parent": "paris" }}
{ "name": "Adrien Grand", "dob": "1987-05-11", "hobby": "horses" }

如果你想要改变一个子文档的 parent 值,仅通过更新这个子文档是不够的,因为新的父文档有可能在另外一个分片上。因此,你必须要先把子文档删除,然后再重新索引这个子文档。