Kafka简介
特点
1、同时为发布和订阅提供吞吐量。 Kafka 的设计目标是以时间复杂度 O(1)的方式提供消息持久化能力
2、消息持久化,通过将数据持久化到硬盘以及复制防止数据丢失,因此可用于批量消费
3、分布式。支持服务器间的消息分区及分布式消费
4、消费消息采用 Pull 模式,消息被处理状态是 Consumer 端维护的,而不是由服器端维护,Broker 无状态,
Consumer 自己保存 offset
5、支持 Online 和Offiine场景 ,同时支持离线数据处理和实时数据处理
基本概念
Broker : Kafka 群中一台或 多台服务器
Topic :发布到Kafka 的每条消息都有一个类别, 就被称为 Topic。(物理上,不同的topic分开存储,逻辑上
用户只需指定topic即可进行消息的生产和消费,不必关心存放位置)
Partition :物理上的 Topic 分区,一个Topic 可以分为多个 Partition。Partition 都是一个有序的队列,Partition 中的每条消息都会被分配一个有序的 ID(offset)。
Producer / Consumer:生产者/消费者,一般向kafka发送信息的为生产者,从kafka取消息的为消费者
Consumer Group (消费者组):每个消费者都属于一个特定的消费者组(可以指定组名,不指定则属于默认的组)。
消费者组是 Kafka 用来实现一个 Topic 消息的广播(发送给所有的消费者) 和
单播(发送给任意一个消费者) 的手段。
广播:发送到所有的消费者组,但只有组内的任意一个消费者能进行消费
单播:发送到任意一个消费者组,由组内的任意一个消费者消费
SpringBoot 整合 Kafka
前言
一般来说,有Kafka Server的服务环境才能进行Kafka整合。
Kafka是由Scala + Zookeeper构建的,可以从官网下载部署包在本地部署。
但是,为了简化开发环节验证Kafka相关功能,Spring-Kafka-Test已经封装了Kafka-test提供了注解式的一键开启Kafka Server的功能。
所以,就懒得安装Kafka了。
1、导包
首先建立一个SpringBoot工程,然后进行依赖导入
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.9</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
//要使用到web开发,写个测试接口啥的
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
//kafka所需依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
//测试包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
//kafka嵌入式服务,即可以不安装kafka也能验证kafka功能的关键
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
//日志打印log.info
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependency>
</dependencies>
2、写配置
嵌入式Kfaka:
使用一个注解@EmbeddedKafka即可启动一个功能完整的Kafka服务:
@EmbeddedKafka注解中的可设置参数 :
ports:指定启动一个Kafka服务对应的端口号
value:broker节点数量
count:同value作用一样,也是配置的broker的节点数量
controlledShutdown:控制关闭开关,主要用来在Broker意外关闭时减少此Broker上Partition的不可用时间
@SpringBootTest
@EmbeddedKafka(ports = 9092)
@Slf4j
class KafkaDemoApplicationTests {
@Test
void contextLoads() throws IOException {
// System.in.read会使线程一直阻塞,从而使服务一直运行
System.in.read();
}
//@KafkaListener 注解声明了一个消费者方法,用于接收从topic_input 主题中读取的消息。
//ID 设置为 test-group。
//id和groupId是有点差别的,但都能指消费者组
@KafkaListener(id = "test-group", topics = "topic_input")
public void listen(String input) {
log.info("Test接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
}
}
id和groupId的差别:https://developer.aliyun.com/article/846305
kafka配置类:
使用kafka发送消息,得有KafkaTemplate组件。
@Configuration
public class KafkaConfigration {
//配置kafka的服务端口号
private static final String bootstrapServers = "localhost:9092";
//设置 Kafka 生产者的配置,具体的属性可以深入了解
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
//创建 Kafka 生产者工厂
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
//使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate 实例
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
控制器
@RestController
@Slf4j
public class KafkaController {
//将 KafkaTemplate 实例注入到 KafkaController 类中
@Autowired
private KafkaTemplate<String, String> template;
//使用 kafkaTemplate.send 方法发送消息到 topic_input 主题
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
log.info("输入参数为:{}",input);
this.template.send("topic_input", input);
log.info("消息发送完成");
}
//监听topic_input主题,此监听和上面测试类的监听属于同一个消费组,但是不同的消费者。
//id在同一个容器中不能重复,就想一个组里不会有相同的学号
@KafkaListener(id = "my-group", topics = "topic_input")
public void listen(String input) {
log.info("controller接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
}
}
3、测试
将测试类和应用都启动。测试:
测试结果:由于一个主题的消息,能被多个消费者消费,点击发送后,两个消费者组都能接受到消息
问题:既然一个容器中配置的id不能重复,那么怎么实现在一个容器中配置出一个消费者组,多个消费者呢?
下面举的例子可能不太准确,建议参考链接的文章
springboot集成kafka + 多groupId 配置消费组:https://blog.csdn.net/tangfeng61/article/details/107621114
1、写一个消费者配置类
import com.example.kafkademo.service.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private String bootstrapServers = "localhost:9092";
private String groupId = "my-group";
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
//监听器工厂
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(3); //设置并发消费者数
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaConsumer listener() {
return new KafkaConsumer();
}
}
KafkaConsumer:
在这里写多个监听器。
package com.example.kafkademo.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(groupId = "my-group", topics = "topic_input")
public void listen1(String input) {
log.info("KafkaConsumer listen1接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
}
@KafkaListener(groupId = "my-group", topics = "topic_input")
public void listen2(String input) {
log.info("KafkaConsumer listen2 接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
}
}
测试结果:确实是只有一个消费组里的其中一个消费者消费,但总感觉好像不是这样的,但先不管了。
只是了解下怎么用的先
KafkaTemplate消息发送
如上面的 SpringBoot整合 Kafka 实例中,发送消息是通过KafkaTemplate的send 方法实现的,但对于Kafka 的消息发送会有三种情况:
1、立即发送 :只需要把消息发送到服务端,而不关心消息发送的结果,也就是例子中的那样
2、同步发送 :调用 send 方法发送消息后,获取该方法返回的ListenableFuture对象,根据该对象的结果
查看send调用是否成功
3、异步发送 : 先注册一个回调函数,通过调用send 方法发送消息时把回调函数作为入参传入,
这样当生产者接收到 Kafka 服务器的响应时会触发执行回调函数
同步发送
ListenableFuture<SendResult<String, String>> future1 = this.template.send("topic_input", input + "同步发送");
try {
SendResult<String, String> result = future1.get();
log.info("同步返回结果:{}",result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
异步发送
ListenableFuture<SendResult<String, String>> future = this.template.send("topic_input", input);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("onSuccess");
}
@Override
public void onFailure(Throwable throwable) {
log.info("onFailure");
}
});
由于 Kafka 自身的高可用性,生产者也提供了自动重试等机制,所以在大部分情况下发送的方式是会成功的。通常需要根据实际的业务场景选择用哪种方式,
如果比较关心消息发送的结果,则可以用同步发送的方式;
如果除了关心消息发送的结果,还注重发送端的性能,则可以选择异步发送的方式。
消息发送还需要考虑失败的情况,在 Kafka 的消息发送过程中错误一般有两种,
其中一种是可重试的错误,比如服务器连接错误,这种错误可通过把生产者配置成自动重试的方式来解决,
如果多次重试还是不行,应用将会收到一个重试异常;
另一种是不能通过重试来解决的错误,对于这种情况会直接抛出异常给生产者。
kafka还支持事务消息、消息重试和死信队列的应用等。
先不搞这么多了,入门了解一下先
具体的可以参考以下链接的文章:
实战:彻底搞定 SpringBoot 整合 Kafka(spring-kafka深入探秘):https://zhuanlan.zhihu.com/p/93445381
spring-kafka】@KafkaListener详解与使用:https://developer.aliyun.com/article/846305
SpringBoot kafka开启事务报错:The ‘ProducerFactory‘ must support transactions :https://blog.csdn.net/qq_35507119/article/details/124040768
聊聊Kafka(三)Kafka消费者与消费组: https://blog.csdn.net/z591045/article/details/112329381