Finology 大数据金融

通过大数据以量化金融

当一个项目中出现多于一个 Kafka 集群时,这时需要让 Springboot 自动装配失效。

首页在 pom.xml 里面添加依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.1</version>
</dependency>

在项目启动类上加上如下注解:

1
2
3
@SpringBootApplication(exclude = {
KafkaAutoConfiguration.class
})

在 yaml 文件中配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
# kafka配置
kafka:
# kafka集群1
cluster1:
bootstrap-servers: server1:9092,server2:9092,server3:9092
producer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 3
acks: 1

# kafka集群2
cluster2:
bootstrap-servers: server4:9092,server5:9092,server6:9092
producer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 3
acks: 1
consumer:
group-id: consumer_group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest

创建 Kafka 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package gy.finolo.config.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class KafkaConfig {

@Bean
@ConfigurationProperties(prefix = "spring.kafka.cluster1")
public KafkaProperties cluster1KafkaProperties() {
return new KafkaProperties();
}

@Bean
@ConfigurationProperties(prefix = "spring.kafka.cluster2")
public KafkaProperties cluster2KafkaProperties() {
return new KafkaProperties();
}

@Bean("cluster1KafkaTemplate")
public KafkaTemplate<String, String> cluster1KafkaTemplate() {
return new KafkaTemplate<>(cluster1ProducerFactory());
}

@Bean("cluster2KafkaTemplate")
public KafkaTemplate<String, String> cluster2KafkaTemplate() {
return new KafkaTemplate<>(cluster2ProducerFactory());
}

@Bean("cluster1ProducerFactory")
public ProducerFactory<String, String> cluster1ProducerFactory() {
// 直接使用KafkaProperties,更简洁
Map<String, Object> configProps = new HashMap<>();

// 使用KafkaProperties的getter方法
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster1KafkaProperties().getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
cluster1KafkaProperties().getProducer().getKeySerializer());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
cluster1KafkaProperties().getProducer().getValueSerializer());

// 添加其他必要的配置
configProps.put(ProducerConfig.ACKS_CONFIG,
cluster1KafkaProperties().getProducer().getAcks());
configProps.put(ProducerConfig.RETRIES_CONFIG,
cluster1KafkaProperties().getProducer().getRetries());

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean("cluster2ProducerFactory")
public ProducerFactory<String, String> cluster2ProducerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 使用KafkaProperties的getter方法
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster2KafkaProperties().getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
cluster2KafkaProperties().getProducer().getKeySerializer());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
cluster2KafkaProperties().getProducer().getValueSerializer());

// 添加其他必要的配置
configProps.put(ProducerConfig.ACKS_CONFIG,
cluster2KafkaProperties().getProducer().getAcks());
configProps.put(ProducerConfig.RETRIES_CONFIG,
cluster2KafkaProperties().getProducer().getRetries());

return new DefaultKafkaProducerFactory<>(configProps);
}

}

在服务类里面,注入 KafkaTemplate,然后调用 send 方法就是了

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Service
public class KafkaMessageService {

@Autowired
@Qualifier("cluster1KafkaTemplate")
private KafkaTemplate<String, String> kafkaTemplate;

public CompletableFuture<SendResult<String, String>> sendMessage(String message) {
String topic = "kafka_topic";
return kafkaTemplate.send(topic, message);
}
}

之前文章下面一直使用的是Valine评论框,Valine已经不开源了,所以也遭到了开源社区的抛弃。

使用新版本的 Hexo NexT,哪怕是安装了主题作者提供的 Valine 插件,也不能正常使用。

不过有非常完美的替代方案,使用 Waline,数据结构都是相同的,迁移起来非常方便。

在 Hexo NexT 主题下配置 Waline 评论系统和邮件提醒的方法:

一、安装插件

1
npm install @waline/hexo-next

并在 _config.next.yml 配置文件里面配置参数

这下面有些参数可能是没有用的,这个自行研究一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
waline:
enable: true # When enable is set to be true, leancloud_visitors is recommended to be closed for the re-initialization problem within different leancloud adk version.
appid: # your leancloud application appid
appkey: # your leancloud application appkey
serverURL: # https://my-waline-beryl.vercel.app/ Vercel 地址
avatar: mm # gravatar style
meta: [nick, mail, link] # Custom comment header
pageSize: 10 # pagination size
visitor: true # leancloud-counter-security is not supported for now. When visitor is set to be true, appid and appkey are recommended to be the same as leancloud_visitors' for counter compatibility. Article reading statistic https://valine.js.org/visitor.html
comment_count: true # If false, comment count will only be displayed in post page, not in home page
recordIP: false # Whether to record the commenter IP
enableQQ: false # Whether to enable the Nickname box to automatically get QQ Nickname and QQ Avatar
requiredFields: [] # Set required fields: [nick] | [nick, mail]
libUrl: # Waline.min.js file URL in CDN (or local path)

二、注册leanCloud

推荐注册海外版,使用国内版的话,在域名上会有限制,需要备案之类的。

三、Vercel部署

根据官网的安装方法,一步步操作就行

https://waline.js.org/guide/get-started/

在部署的时候,如果需要配置邮件提醒,还需要添加如下环境变量。

在使用 git push 或 Github Desktop 等其他 Git 操作时,可能遇到以下错误:

1
2
3
4
ssh: connect to host github.com port 22: Connection timed out
fatal: Could not read from remote repository.

Please make sure you have the correct access rights and the repository exists.

这是由于默认的 SSH 22 端口被防火墙或网络策略限制,导致无法连接到 GitHub 的服务器。

为了避免这一问题,可以将连接改为 SSH 的 443 端口。以下是详细的解决方法,包括 Windows 和 Linux/Mac 的操作步骤。

Linux/Mac 下操作步骤

  1. 修改 SSH 配置文件
    SSH 的配置文件通常位于 ~/.ssh/config,如果文件不存在,可以手动创建一个:
1
touch ~/.ssh/config

打开文件并添加以下内容:

1
2
3
4
5
6
Host github.com
HostName ssh.github.com # **这是最重要的部分**
User git
Port 443
PreferredAuthentications publickey
IdentityFile ~/.ssh/id_rsa
  1. 验证 SSH 配置
    配置完成后,通过以下命令测试连接是否正常:
1
ssh -T git@github.com

如果配置成功,应该看到类似以下输出:

1
Hi <your-username>! You've successfully authenticated, but GitHub does not provide shell access.
  1. 配置 Git 使用新端口
    为确保 Git 使用新的 443 端口,可以运行以下命令:
1
git config --global url."ssh://git@ssh.github.com:443".insteadOf "ssh://git@github.com"

Windows 下操作步骤

  1. 找到 SSH 配置文件
    在 Windows 下,SSH 配置文件通常位于用户目录的 .ssh 文件夹中(例如:C:\Users<你的用户名>.ssh\config)。如果文件不存在,可以手动创建一个:

打开资源管理器并导航到 C:\Users<你的用户名>.ssh。
在 .ssh 文件夹下,新建一个文件,命名为 config(没有扩展名)。
2. 编辑 SSH 配置文件
用记事本或其他文本编辑器打开 config 文件,添加以下内容:

1
2
3
4
5
6
Host github.com
HostName ssh.github.com # **这是最重要的部分**
User git
Port 443
PreferredAuthentications publickey
IdentityFile C:\Users\<你的用户名>\.ssh\id_rsa

注意:
IdentityFile 的路径需要根据你实际存储 SSH 密钥的位置调整,通常是 id_rsa 或 id_ed25519。

  1. 验证 SSH 配置
    打开命令提示符或 PowerShell,运行以下命令测试连接:
1
ssh -T git@github.com

如果配置正确,你应该看到以下输出:

1
Hi <your-username>! You've successfully authenticated, but GitHub does not provide shell access.
  1. 配置 Git 使用新端口
    在命令提示符或 PowerShell 中运行以下命令:

最近这一步不配置,也可以正常使用 Github Desktop 了。

1
git config --global url."ssh://git@ssh.github.com:443".insteadOf "ssh://git@github.com"

总结
当 22 端口被占用或限制 时,通过将 SSH 连接切换到 443 端口,即可解决无法访问 GitHub 的问题。这种方法适用于任何操作系统,尤其是在防火墙限制较严的网络环境中。

0%