千家信息网

Kafka Consumer使用要注意什么

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容主要讲解"Kafka Consumer使用要注意什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Kafka Consumer使用要注意什么"吧!
千家信息网最后更新 2025年12月02日Kafka Consumer使用要注意什么

本篇内容主要讲解"Kafka Consumer使用要注意什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Kafka Consumer使用要注意什么"吧!

一、特点:

不用关心offset, 会自动的读zookeeper中该Consumer group的last offset

二、注意事项

1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,

所以consumer数不要大于partition数

2. 如果consumer比partition少,一个consumer会对应于多个partitions,

这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

最好partiton数目是consumer数目的整数倍,所以partition数目很重要,

比如取24,就很容易设定consumer数目

3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,

kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

4. 增减consumer,broker,partition会导致rebalance,

所以rebalance后consumer对应的partition会发生变化

5. High-level接口中获取不到数据的时候是会block的

三、代码如下:

package kafkatest.kakfademo;

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerDemo1 {

public static void main(String[] args) {

ConsumerDemo1 demo = new ConsumerDemo1();

demo.test();

}

@SuppressWarnings("rawtypes")

public void test() {

String topicName = "test";

int numThreads = 1;

Properties properties = new Properties();

properties.put("zookeeper.connect", "hadoop0:2181");// 声明zk

properties.put("group.id", "group--demo");// 必须要使用别的组名称,

// 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据

ConsumerConnector consumer = Consumer

.createJavaConsumerConnector(new ConsumerConfig(properties));

Map topicCountMap = new HashMap();

topicCountMap.put(topicName, numThreads); // 一次从主题中获取一个数据

Map>> messageStreams = consumer

.createMessageStreams(topicCountMap);

// 获取每次接收到的这个数据

List> streams = messageStreams

.get(topicName);

// now launch all the threads

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.execute(new ConsumerMsgTask(stream, threadNumber));

threadNumber++;

}

}

class ConsumerMsgTask implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerMsgTask(KafkaStream stream, int threadNumber) {

m_threadNumber = threadNumber;

m_stream = stream;

}

public void run() {

ConsumerIterator it = m_stream.iterator();

long offset = 0;

try {

while (it.hasNext())

offset = it.next().offset();

byte[] bytes = it.next().message();

String msg = new String(bytes, "UTF-8");

System.out.print("offset: " + offset + ",msg:" + msg);

System.out.println("Shutting down Thread: " + m_threadNumber);

} catch (UnsupportedEncodingException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

四、实验验证







到此,相信大家对"Kafka Consumer使用要注意什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

数据 数目 多个 内容 顺序 保证 学习 不同 实用 更深 有序 重要 不用 主题 事项 代码 兴趣 名称 实用性 实际 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库中rowcount函数 深圳软件开发驻场价格表 手机连接免费网络安全吗 博士软件开发 文献数据库为什么越来越大 java连接数据库编码 数据库中常采用的安全措施 软件开发专业适合女生吗 上海电商软件开发服务要求 连云港海航软件开发业务流程 成都听雨林互联网科技有限公司 房山区网络安全培训 机房服务器升级 网络安全建设方案费用 网络技术岗位面试题目 5g网络技术的机遇分析 静安区营销软件开发制品价格 你发布的商品因请选择服务器 四川旅游app小程序软件开发 司法局网络安全处置 提现用公共网络安全吗 重新启动数据库文件 dns 服务器地址在哪看 安装安全狗后服务器无法启动 威海智慧养老软件开发哪儿好 腾讯轻量服务器流量用完怎么办 英雄联盟服务器ping 小鸡快跑网络技术有限公司融资 网络安全注意事项图解 嘉定区服务器回收厂家哪家便宜
0