Administrator
发布于 2023-03-09 / 122 阅读
0
0

SpringBoot 整合 kafka

Kafka简介

特点

1、同时为发布和订阅提供吞吐量。 Kafka 的设计目标是以时间复杂度 O(1)的方式提供消息持久化能力
2、消息持久化,通过将数据持久化到硬盘以及复制防止数据丢失,因此可用于批量消费
3、分布式。支持服务器间的消息分区及分布式消费
4、消费消息采用 Pull 模式,消息被处理状态是 Consumer 端维护的,而不是由服器端维护,Broker 无状态,
Consumer 自己保存 offset
5、支持 Online 和Offiine场景 ,同时支持离线数据处理和实时数据处理

基本概念

Broker : Kafka 群中一台或 多台服务器

Topic :发布到Kafka 的每条消息都有一个类别, 就被称为 Topic。(物理上,不同的topic分开存储,逻辑上
用户只需指定topic即可进行消息的生产和消费,不必关心存放位置)

Partition :物理上的 Topic 分区,一个Topic 可以分为多个 Partition。Partition 都是一个有序的队列,Partition 中的每条消息都会被分配一个有序的 ID(offset)。

Producer / Consumer:生产者/消费者,一般向kafka发送信息的为生产者,从kafka取消息的为消费者

Consumer Group (消费者组):每个消费者都属于一个特定的消费者组(可以指定组名,不指定则属于默认的组)。

消费者组是 Kafka 用来实现一个 Topic 消息的广播(发送给所有的消费者) 和
单播(发送给任意一个消费者) 的手段。

广播:发送到所有的消费者组,但只有组内的任意一个消费者能进行消费
单播:发送到任意一个消费者组,由组内的任意一个消费者消费

SpringBoot 整合 Kafka

前言

一般来说,有Kafka Server的服务环境才能进行Kafka整合。

Kafka是由Scala + Zookeeper构建的,可以从官网下载部署包在本地部署。

但是,为了简化开发环节验证Kafka相关功能,Spring-Kafka-Test已经封装了Kafka-test提供了注解式的一键开启Kafka Server的功能。

所以,就懒得安装Kafka了。

1、导包

首先建立一个SpringBoot工程,然后进行依赖导入

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.9</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        
        //要使用到web开发,写个测试接口啥的
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        //kafka所需依赖
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        
        //测试包
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        //kafka嵌入式服务,即可以不安装kafka也能验证kafka功能的关键
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        
        //日志打印log.info
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        
        </dependency>
    </dependencies>

2、写配置

嵌入式Kfaka:
使用一个注解@EmbeddedKafka即可启动一个功能完整的Kafka服务:

@EmbeddedKafka注解中的可设置参数 :
ports:指定启动一个Kafka服务对应的端口号
value:broker节点数量
count:同value作用一样,也是配置的broker的节点数量
controlledShutdown:控制关闭开关,主要用来在Broker意外关闭时减少此Broker上Partition的不可用时间
@SpringBootTest
@EmbeddedKafka(ports = 9092)
@Slf4j
class KafkaDemoApplicationTests {



    @Test
    void contextLoads() throws IOException {
    // System.in.read会使线程一直阻塞,从而使服务一直运行
        System.in.read();
    }


//@KafkaListener 注解声明了一个消费者方法,用于接收从topic_input 主题中读取的消息。
//ID 设置为 test-group。
//id和groupId是有点差别的,但都能指消费者组
    @KafkaListener(id = "test-group", topics = "topic_input")
    public void listen(String input) {
        log.info("Test接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
    }

}

id和groupId的差别:https://developer.aliyun.com/article/846305

kafka配置类:
使用kafka发送消息,得有KafkaTemplate组件。
@Configuration
public class KafkaConfigration {
	//配置kafka的服务端口号
    private static final String bootstrapServers = "localhost:9092";
    
    //设置 Kafka 生产者的配置,具体的属性可以深入了解
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

	//创建 Kafka 生产者工厂
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    
	//使用了刚刚创建的生产者工厂作为参数,然后返回 KafkaTemplate 实例
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


}
控制器
@RestController
@Slf4j
public class KafkaController {
	
    //将 KafkaTemplate 实例注入到 KafkaController 类中
    @Autowired
    private KafkaTemplate<String, String> template;

	//使用 kafkaTemplate.send 方法发送消息到 topic_input 主题
    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        log.info("输入参数为:{}",input);
        this.template.send("topic_input", input);
        log.info("消息发送完成");
    }
	
    //监听topic_input主题,此监听和上面测试类的监听属于同一个消费组,但是不同的消费者。
    //id在同一个容器中不能重复,就想一个组里不会有相同的学号
    @KafkaListener(id = "my-group", topics = "topic_input")
    public void listen(String input) {
       log.info("controller接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
    }
}

3、测试

将测试类和应用都启动。测试:

image-1678412051286

测试结果:由于一个主题的消息,能被多个消费者消费,点击发送后,两个消费者组都能接受到消息

image-1678413355140

问题:既然一个容器中配置的id不能重复,那么怎么实现在一个容器中配置出一个消费者组,多个消费者呢?

下面举的例子可能不太准确,建议参考链接的文章

springboot集成kafka + 多groupId 配置消费组:https://blog.csdn.net/tangfeng61/article/details/107621114

1、写一个消费者配置类
import com.example.kafkademo.service.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    private String bootstrapServers = "localhost:9092";

    private String groupId = "my-group";

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

//监听器工厂
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(3); //设置并发消费者数
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public KafkaConsumer listener() {
        return new KafkaConsumer();
    }

}

KafkaConsumer:

在这里写多个监听器。

package com.example.kafkademo.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(groupId = "my-group", topics = "topic_input")
    public void listen1(String input) {
        log.info("KafkaConsumer listen1接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
    }

    @KafkaListener(groupId = "my-group", topics = "topic_input")
    public void listen2(String input) {
        log.info("KafkaConsumer listen2 接收到数据为:{},消费者组为:{}",input, KafkaUtils.getConsumerGroupId());
    }
}

测试结果:确实是只有一个消费组里的其中一个消费者消费,但总感觉好像不是这样的,但先不管了。
只是了解下怎么用的先

image-1678416732595

KafkaTemplate消息发送

如上面的 SpringBoot整合 Kafka 实例中,发送消息是通过KafkaTemplate的send 方法实现的,但对于Kafka 的消息发送会有三种情况:

1、立即发送 :只需要把消息发送到服务端,而不关心消息发送的结果,也就是例子中的那样

2、同步发送 :调用 send 方法发送消息后,获取该方法返回的ListenableFuture对象,根据该对象的结果
查看send调用是否成功

3、异步发送 : 先注册一个回调函数,通过调用send 方法发送消息时把回调函数作为入参传入,
这样当生产者接收到 Kafka 服务器的响应时会触发执行回调函数

同步发送

ListenableFuture<SendResult<String, String>> future1 = this.template.send("topic_input", input + "同步发送");
        try {
            SendResult<String, String> result = future1.get();
            log.info("同步返回结果:{}",result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

异步发送

 ListenableFuture<SendResult<String, String>> future = this.template.send("topic_input", input);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("onSuccess");
            }
            @Override
            public void onFailure(Throwable throwable) {
                log.info("onFailure");
            }
        });

由于 Kafka 自身的高可用性,生产者也提供了自动重试等机制,所以在大部分情况下发送的方式是会成功的。通常需要根据实际的业务场景选择用哪种方式,
如果比较关心消息发送的结果,则可以用同步发送的方式;
如果除了关心消息发送的结果,还注重发送端的性能,则可以选择异步发送的方式。

消息发送还需要考虑失败的情况,在 Kafka 的消息发送过程中错误一般有两种,
其中一种是可重试的错误,比如服务器连接错误,这种错误可通过把生产者配置成自动重试的方式来解决,
如果多次重试还是不行,应用将会收到一个重试异常;
另一种是不能通过重试来解决的错误,对于这种情况会直接抛出异常给生产者。

kafka还支持事务消息、消息重试和死信队列的应用等。

先不搞这么多了,入门了解一下先

具体的可以参考以下链接的文章:

实战:彻底搞定 SpringBoot 整合 Kafka(spring-kafka深入探秘):https://zhuanlan.zhihu.com/p/93445381
spring-kafka】@KafkaListener详解与使用:https://developer.aliyun.com/article/846305
SpringBoot kafka开启事务报错:The ‘ProducerFactory‘ must support transactions :https://blog.csdn.net/qq_35507119/article/details/124040768
聊聊Kafka(三)Kafka消费者与消费组: https://blog.csdn.net/z591045/article/details/112329381


评论