python如何实现对kafka的基本操作
发表于:2025-11-11 作者:千家信息网编辑
千家信息网最后更新 2025年11月11日,这篇文章主要为大家展示了"python如何实现对kafka的基本操作",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"python如何实现对kafka的基本操
千家信息网最后更新 2025年11月11日python如何实现对kafka的基本操作
这篇文章主要为大家展示了"python如何实现对kafka的基本操作",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"python如何实现对kafka的基本操作"这篇文章吧。
-- coding:utf-8 --
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
"""生产者"""def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close()"""一个消费者消费一个topic"""def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #获取test主题的分区信息print consumer.topics() #获取主题列表print consumer.subscription() #获取当前消费者订阅的主题print consumer.assignment() #获取当前消费者topic、分区信息print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,从第1个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value))"""一个消费者订阅多个topic """def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])consumer.subscribe(topics=('TEST','TEST2')) #订阅要消费的主题print consumer.topics()print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))"""消费者(手动拉取消息)"""def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])consumer.subscribe(topics=('TEST','TEST2'))while True: message = consumer.poll(timeout_ms=5) #从kafka获取消息 if message: print message time.sleep(1)def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()
以上是"python如何实现对kafka的基本操作"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
消费
消费者
主题
偏移
基本操作
内容
篇文章
订阅
信息
学习
帮助
多个
手动
易懂
更多
条理
消息
生产者
知识
编带
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
求生之路com服务器
黄浦区电商软件开发服务价钱
数据库 gpu
管晓宏院士网络安全
数据库技术第二版实训3第二题
河南公安厅网络安全保卫总队
华为网络技术专用图标
记载谱牒的数据库
5g网络技术对运输的影响
计算机网络技术中制作网页的方法
南昌国产化服务器价格
股票分析app软件开发
sql 更改数据库编码
软件开发培训 小说
应用软件开发中系统维护包括
迪哥闯世界在哪个服务器玩
深圳市聚橙网络技术公司招聘
网络安全两高释法
总裁小说软件开发
办公系统软件开发合同简版本
重庆江津生鲜信息软件开发
辽宁省网络安全知识竞赛系统
华为连不了服务器怎么办
服务器会不会自动修改密码
魔兽服务器维护游戏会回档吗
高青家具管理软件开发公司
网络安全职责资格证书
数据库的软件技术
建模软件开发解决方案
软件开发公司派驻人员管理