欢迎大家来到IT世界,在知识的湖畔探索吧!
关注△mikechen△,十余年BAT架构经验倾囊相授!
欢迎大家来到IT世界,在知识的湖畔探索吧!
大家好,我是mikechen。
Kafka是大型架构的必备技能,掌握Kafka消费至关重要,下面我重点详解Kafka消费者如何实现高吞吐@mikechen
批量拉取
Kafka 客户端,提供批量拉取消息的接口,消费者可以一次从 Broker 拉取大量消息,降低网络和系统调用开销。
Kafka 消费者不是一条一条地从 Broker 拉取消息,而是可以批量地进行拉取,从而提高了吞吐量。
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(200));List<String> batch =newArrayList<>();for(ConsumerRecord<String,String> record : records){ batch.add(record.value());}processBatch(batch);// 批处理逻辑
欢迎大家来到IT世界,在知识的湖畔探索吧!
想象一下,你搬运砖块,一次搬一块和一次搬一堆,效率自然不可同日而语。
多线程消费
单个消费者实例在处理消息时,默认是单线程的。这意味着它需要先拉取一批消息,然后逐条处理。
在高负载场景下,单线程的处理能力往往成为瓶颈。
为了提高消费端的并行处理能力,可以使用多线程来并发地处理拉取到的消息。
欢迎大家来到IT世界,在知识的湖畔探索吧!ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord record : records){ threadPool.submit(()-> process(record));}// 等待处理完成再手动提交 offset
通常会使用线程池来管理这些消费线程,以便更好地控制并发度和资源利用率。
线程池的大小需要根据消费者的处理能力、CPU 核心数以及消息的处理逻辑的复杂程度来合理配置。
多消费者实例
除了在单个消费者实例内部使用多线程,还可以通过启动多个独立的消费者实例来进一步提高整体的消费吞吐量。
consumer.subscribe(Collections.singletonList("orders")); while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String> record : records){ process(record);// 每个实例只消费自己分配到的分区} consumer.commitAsync();}Kafka 的设计保证了同一个消费者组内的多个消费者实例可以并行地消费不同的分区。
例如,如果一个 Topic 有 10 个分区,并且启动了 5 个消费者实例,那么 Kafka 会尝试将每个分区分配给一个消费者实例,从而实现 5 个分区同时被消费。
调整消费者配置
还有一些其他的消费者配置参数,对吞吐量有重要影响。
比如;
|
配置项 |
默认值 |
优化建议 |
含义 |
|
max.poll.records |
500 |
1000~5000 |
每次 poll 最多消息数 |
|
fetch.min.bytes |
1 |
1024~ |
拉取最小字节 |
|
fetch.max.bytes |
50MB |
100MB |
一次拉取最大字节 |
|
max.poll.interval.ms |
5分钟 |
1~5分钟 |
最大处理时间 |
|
session.timeout.ms |
10s |
10~30s |
心跳超时剔除消费者 |
|
enable.auto.commit |
true |
false(推荐) |
是否自动提交位移 |
|
auto.commit.interval.ms |
5s |
10s 或更多 |
自动提交频率 |
通过以上调整消费者配置参数,从而可以实现更大的吞吐量。
本篇已收于mikechen原创超30万字《阿里架构师进阶专题合集》里面。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/122403.html