怎么进行Pulsar Kafka Client的简单分析
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。为了方便
千家信息网最后更新 2025年12月02日怎么进行Pulsar Kafka Client的简单分析为了方便 Kafka 用户使用 Pulsar,Pulsar 对 Kafka Client 做了一些封装,让 Kafka 用户更方便的使用 Pulsar。
下面主要介绍 Kafka Client 如何将消息发送到 Pulsar, 并从 Pulsar 消费消息,以及如何使用 Pulsar Schema。
依赖引入了 Kafka 的 0.10.2.1 版本的客户端,还有 Pulsar 对 Kafka Client 封装后的客户端。 在上述配置中 topic 是指 Pulsar 中的 Topic,接着使用 Kafka 的配置方式来初始化各种配置,包括 Server 地址、key 的序列化与 value 的序列化类,然后构造一个 ProducerRecord 的类将其发送出去。 有些配置同生产者代码的配置是类似的,例如 topic,Server 等。另外使用 Kafka 的 group.id 作为配置 Pulsar 中的订阅名称,关闭自动提交,在消费者端为 key 和 value 配置的是反序列化的类。然后同常规的消费者类似,开始消费消息。
在上述情况中使用的是 Kafka 的 Schema 来进行序列化与反序列化,当然也支持使用 Pulsar 的 Schema 来进行此过程。下面使用 AVRO 进行简单的介绍。 首先定义 Schema 所需要使用的 pojo 类。 可以看到大部分配置同上面使用 Kafka Client 的配置是类似的,但是中间加入了一些 Pulsar 的 Schema,使用 Foo 作为 key,使用 Bar 类作为 value。 消费者端同样是类似的配置,使用与生产者端相同的 Schema 进行数据的反序列化。
本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
⌨️ 引入依赖
org.apache.pulsar pulsar-client-kafka {project.version}
⌨️ 使用 Kafka Schema
>>> 添加生产者代码
String topic = "persistent://public/default/test";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord(topic, i, Integer.toString(i))); }producer.close();
>>> 添加消费者代码
String topic = "persistent://public/default/test";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());@SuppressWarnings("resource")Consumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecordsrecords = consumer.poll(100); records.forEach(record -> {log.info("Received record: {}", record);});// Commit last offsetconsumer.commitSync();}
⌨️ 使用 Pulsar Schema
@Data@ToString@EqualsAndHashCodepublic class Foo {@Nullableprivate String field1;@Nullableprivate String field2;private int field3;}@Data@ToString@EqualsAndHashCodepublic class Bar {private boolean field1;}
>>> 生产者端代码
String topic = "persistent://public/default/test-avro";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());AvroSchemabarSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Bar.class).build()); AvroSchemafooSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Foo.class).build()); Bar bar = new Bar();bar.setField1(true);Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);Producerproducer = new KafkaProducer<>(props, fooSchema, barSchema); for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord(topic, i, foo, bar)); log.info("Message {} sent successfully", i);}producer.close();
>>> 消费者端代码
String topic = "persistent://public/default/test-avro";Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());AvroSchemabarSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Bar.class).build()); AvroSchemafooSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Foo.class).build()); Bar bar = new Bar();bar.setField1(true);Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);@SuppressWarnings("resource")Consumerconsumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema); consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecordsrecords = consumer.poll(100); records.forEach(record -> {log.info("Received record: {}", record);});// Commit last offsetconsumer.commitSync();}
以上就是怎么进行Pulsar Kafka Client的简单分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
配置
消费
序列
代码
消费者
消息
生产者
生产
分析
客户
客户端
更多
用户
知识
篇文章
封装
实用
相同
产者
名称
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
湖州信息网络技术
mac常用软件开发
网络安全管理岗位人员
商业银行数据库管理实践 迟鲲
网络安全渗透测试服务
ipad设置代理服务器
金山区本地网络技术售后服务
软件开发每天薪酬
数据库升级是提示列名无效
服务器 负载
东莞市国弘网络技术有限公司
根服务器很难被研发出来吗
软件开发的目标顾客
网络安全手抄报小学三年级筒单
软件开发调研途径有那些
国外服务器空间
天津特种网络技术工程
自己的照片发到网络安全吗
郑州网络安全体验馆
大型政务云服务器
宁夏云海网络技术公司
中国通信服务软件开发
达梦数据库update不生效
2019上市公司债券数据库
网络安全软件学习强国
网易版怎么把模组加到服务器
陇剑杯网络安全大赛在兰州开幕
安仁计算机软件开发薪资
数据库模糊查询的方法
观看网络安全专题教育心得体会