文章目录
一、原生 KafkaConsumer
kafka_2">1、pom文件引入kafka
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
</dependency>
2、拉取数据
简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:
package com.yogi.test.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
@Component
public class TestMsgConsumer implements InitializingBean {
@Value("${test.kafka.address:127.0.0.1:9092}")
private String kafkaAddress;
@Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")
private String msgTopic;
@Value("${test.consumer.name:yogima}")
private String consumerGroupId;
/**
* 消费开关: true-消费,false-暂停消费
* 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可
*/
private Boolean consumeSwitch = true;
public void consumerMessage(List<String> topic, String groupId) {
LOGGER.info("consumer topic list1:{}",topic.toString());
Properties props = new Properties();
/**
* 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
* 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
*/
LOGGER.info("test.kafka.address:{}",kafkaAddress);
props.put("bootstrap.servers", kafkaAddress);
/**
* 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
* 设置一个有业务意义的名字即可
*/
props.put("group.id", groupId);
/**
* 自动提交位移
*/
props.put("enable.auto.commit", Boolean.TRUE);
/**
* 位移提交超时时间
*/
props.put("auto.commit.interval.ms", "1000");
/**
* 从最早的消息开始消费
* 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
*/
props.put("auto.offset.reset", "latest");
/**
* 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
* Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
* org.apache.kafka.common.serialization.ByteArrayDeserializer
* StringDeserializer
*/
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**
* 对消息体进行解序列化,与key解序列化类似
*/
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
props.put("max.poll.records", "500");
//fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
props.put("fetch.message.max.bytes", "300000000");
KafkaConsumer<String, String> consumer;
try{
/**
* 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器
*/
LOGGER.info("start set consumer,props:{}",props.toString());
consumer = new KafkaConsumer<>(props);
LOGGER.info("set consumer finished");
/**
* 订阅consumer group需要消费的topic列表
*/
LOGGER.info("consumer topic list:{}",topic.toString());
consumer.subscribe(topic);
}catch (Exception e){
LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);
return;
}
/**
* 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,
* 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作
*/
try {
while (true) {
if (!consumeSwitch) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
LOGGER.error("err msg:" + e.getMessage());
}
}
/**
* 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据
* consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
/**
* poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法
* 返回即认为consumer成功消费了消息
*/
for (ConsumerRecord<String, String> record : records) {
LOGGER.debug("offset = {}, key = {}, value = {}"