Spring Boot整合kafka
此处简单记录一下 SpringBoot 和 Kafka 的整合。
先初始化一个SpringBoot工程
搭建kafka环境
这里从用docker方式搭建kafka,kafka需要注册到注册中心上,所以要先安装zookeeper
zookeeper的docker-compose.yaml文件
version: '3.1'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
volumes:
- ./config:/conf
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
接下来是kafka的docker-compose.yaml文件
version: '3.7'
services:
zookeeper:
image: wurstmeister/zookeeper
volumes:
- ./data:/data
ports:
- 2182:2181
kafka:
image: wurstmeister/kafka
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 0
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_CREATE_TOPICS: "kafeidou:2:0" #kafka启动后初始化一个有2个partition(分区)0个副本名叫kafeidou的topic
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
volumes:
- ./kafka-logs:/kafka
depends_on:
- zookeeper
引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在application.properties
中添加配置
# 生产者配置
spring.kafka.bootstrap-servers=127.0.0.1:14993
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
# key的序列化方式为String
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value的序列化方式为字节数组
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 消费者配置
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.group-id=story-has-you
# 自动偏移量设置
spring.kafka.consumer.auto-offset-reset=earliest
# 消费者改成手动提交
spring.kafka.consumer.enable-auto-commit=false
# 手动ack
spring.kafka.advertised.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual
spring.kafka.listener.missing-topics-fatal=false
新增配置类
kafkaConfig.java
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
/**
* The type Kafka config.
*
* @author fangxi created by 2021/6/15
*/
@Configuration
public class KafkaConfig {
/**
* The Kafka template.
*/
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 初始化topic
*
* @return the new topic
*/
@Bean
public NewTopic changChunFawRealFaultInfo() {
return new NewTopic("story-has-you", 1, (short) 1);
}
/**
* 配置kafka的重试次数
*
* @param configurer the configurer
* @param kafkaConsumerFactory the kafka consumer factory
* @return the concurrent kafka listener container factory
*/
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// 最大重试次数3次
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 3)));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
生产者发送
@Autowired
private KafkaTemplate<String, byte[]> kafkaTemplate;
kafkaTemplate.send("story-has-you", "hello");
消费者监听
/**
* On message.
*
* @param records the records
*/
@KafkaListener(topics = "story-has-you", groupId = "story-has-you", containerFactory = "kafkaListenerContainerFactory")
public void qingDaoMessage(ConsumerRecord<String, byte[]> records, Acknowledgment ack) {
try {
String data = new String(records.value());
if (data == null) {
return;
}
// 打印hello
log.info("从kafka接收到消息, {}", data)
} catch (Exception e) {
log.error("kafka处理消息异常", e);
} finally {
// 手动ack, 通知kafka已经消费
ack.acknowledge();
}
}