怎么用C语言与java实现kafka avro生产者和消费者
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,本篇内容介绍了"怎么用C语言与java实现kafka avro生产者和消费者"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔
千家信息网最后更新 2025年12月02日怎么用C语言与java实现kafka avro生产者和消费者
本篇内容介绍了"怎么用C语言与java实现kafka avro生产者和消费者"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
原始数据格式
请求IP 应答IP 域名 类型
3183375114 3729673322 "mx.hc.spdrb.com" A
以上数据是test文件的内容
schema定义如下
{
"type":"record",
"name":"data",
"fields":
[
{"name":"qip","type":"long"},
{"name":"aip","type":"long"},
{"name":"domain","type":"string"},
{"name":"type","type":"string"}
]
}
C语言生产者代码如下
#include#include #include #include #include "avro.h" #include "producer.h"const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";const char *file = "avro_file.dat";const char *brokers = "xxxxx:9092"; const char *topic = "topic1";void print_avro_value(avro_value_t *value) { char *json; if (!avro_value_to_json(value, 1, &json)) { printf("%s\n", json); free(json); } }if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &test_schema, &error)) { fprintf(stderr, "schema error\n"); exit(EXIT_FAILURE);}return test_schema;avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error;}void add_data(avro_writer_t writer, avro_schema_t schema, int64_t qip, uint64_t aip, const char* domain, const char* type) { avro_datum_t data = avro_record(schema); avro_datum_t dqip = avro_int64(qip); avro_datum_t daip = avro_int64(aip); avro_datum_t ddomain = avro_string(domain); avro_datum_t dtype = avro_string(type); avro_record_set(data, "qip", dqip); avro_record_set(data, "aip", daip); avro_record_set(data, "domain", ddomain); avro_record_set(data, "type", dtype); avro_write_data(writer, NULL, f2c); avro_datum_decref(dqip); avro_datum_decref(daip); avro_datum_decref(ddomain); avro_datum_decref(dtype); avro_datum_decref(data);}int main(int argc, char* argv[]) { int len = 0; avro_schema_t schema; avro_writer_t mem_writer; char buf[1024]; char tmp[4][500]={{0x00}}; FILE *fp = fopen("test","r"); if(!fp) { printf("open test file error!\n"); return -1; } schema = init_schema(); mem_writer = avro_writer_memory(buf, 1024); while(fgets(buf, 1024,fp)!=NULL) { if(buf[strlen(buf)] == '\n') buf[strlen(buf)] = '\0'; if(sscanf(buf, "%s%s%s%s", tmp[0],tmp[1],tmp[2],tmp[3])!=4) continue; add_data(mem_writer,schema,atol(tmp[0]),atol(tmp[1]),tmp[2],tmp[3]); printf("data len = %ld\n", avro_writer_tell(mem_writer)); len = avro_writer_tell(mem_writer); kafka_putdata(buf, len,brokers,topic);//librdkafka实现的生产者代码 未列出 memset(tmp, 0x00, sizeof(tmp)); memset(buf, 0x00, sizeof(buf)); avro_writer_reset(mem_writer); } fclose(fp); avro_writer_free(mem_writer); return 0;}
C语言实现的消费者如下
#include "consumer.h" #include "avro.h" #include#include const char *brokers = "xxxx:9092"; const char *topic = "topic1"; const char *group = "avrotest"; const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error; if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &test_schema, &error)) { fprintf(stderr, "schema error\n"); exit(EXIT_FAILURE); } return test_schema;}void print_data(avro_reader_t reader, avro_schema_t schema) { avro_datum_t data; if(avro_read_data(reader, schema, schema, &data) == 0) { int64_t qip; int64_t aip; char *domain; char *type; avro_datum_t q_datum,a_datum,d_datum,t_datum; avro_record_get(data, "qip", &q_datum); avro_int64_get(q_datum, &qip); avro_record_get(data, "aip", &a_datum); avro_int64_get(a_datum, &aip); avro_record_get(data, "domain", &d_datum); avro_string_get(d_datum, &domain); avro_record_get(data, "type", &t_datum); avro_string_get(t_datum, &type); printf("qip: %lld, aip: %lld,domain: %s,type:%s\n", qip,aip,domain,type); avro_datum_decref(data);}int main(int argc, char* argv[]) { rd_kafka_t *rk; rd_kafka_topic_partition_list_t *topics; if(initKafka(&rk, brokers, group, topic, &topics)<0){return -1;} char buf[1024] = {0x00}; int len = 0; avro_schema_t schema; avro_reader_t mem_reader; schema = init_schema(); mem_reader = avro_reader_memory(buf, 1024); while(1) { get_consumer_msg(rk, buf, &len); //librdkafka实现的消费者 代码未列出 if(len == 0) continue; printf("len=%d\n",len); print_data(mem_reader,schema); avro_reader_reset(mem_reader); memset(buf, 0x00, sizeof(buf)); } return 0;}
C编译的Makefile如下 两个C程序通用
TARGET=avro-test INCLUDE=./avrolib/include/ SLIB=./avrolib/lib/libavro.a DLIB=-lz -llzma -lrdkafka INC = -I. -I./avrolib/include SOURCES =$(wildcard *.c) OBJECTS =$(SOURCES:.c=.o) RM=rm -rf CC=gcc -g CFLAGS= -Wall $(INC) all:$(TARGET) $(TARGET): $(OBJECTS) $(CC) -o $@ $? $(SLIB) $(DLIB) $(CFLAGS) :$(SOURCES) $(CC) -c clean: $(RM) $(TARGET) $(OBJECTS) *~
java消费者 gradle配置
dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' compile group: 'org.apache.avro', name: 'avro', version: '1.9.1' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' }avro解析 借鉴别人 言作者未知 请作者见谅
package zc;import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader;import java.io.IOException;public class MyRecordDecoder { public static GenericRecord genericRecord; datumReader; static MyRecordDecoder myRecordDecoder = new MyRecordDecoder(); final String USER_SCHEMA = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}"; public MyRecordDecoder() { Schema schema = null; schema = new Schema.Parser().parse(USER_SCHEMA); datumReader = new SpecificDatumReader(schema); } public GenericRecord getGenericRecord(BinaryDecoder decoder, byte[] value) throws IOException{ return datumReader.read(null, decoder); } public static MyRecordDecoder getInstance() { if (myRecordDecoder==null) myRecordDecoder = new MyRecordDecoder(); return myRecordDecoder; }} java消费者 package zc;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections; import java.util.Properties;public class KafkaMessageAvro{ public static void main(String[] args) throws Exception { String inTopic = args[0]; Properties props = new Properties(); props.setProperty("bootstrap.servers", "xxxxx:9092"); props.setProperty("group.id", "flink-topn-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(inTopic)); try { while (true) { ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { byte[] ss = record.value(); if (ss==null) { continue; } System.out.println(ss.toString()); GenericRecord genericRecord = null; BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(ss, null); while (!decoder.isEnd()) { genericRecord = MyRecordDecoder.getInstance().getGenericRecord(decoder, ss); System.out.println(genericRecord.get("qip").toString()+" "+genericRecord.get("aip").toString()+" "+genericRecord.get("domain").toString()+" "+genericRecord.get("type").toString()); } } } } finally { consumer.close(); }} "怎么用C语言与java实现kafka avro生产者和消费者"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
消费者
消费
生产者
语言
生产
代码
内容
作者
数据
更多
知识
原始
实用
学有所成
接下来
两个
困境
域名
实际
情况
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
四川都仆网络技术有限公司
第一章认识数据库
新西兰 网络安全
hive数据库更新
回徐州的软件开发
软件开发乙方注意
lucene的数据库
安庆企业软件开发公司
eveiws怎么导入数据库
怎么写软件开发需求说明书
电脑上在哪里找车牌管理服务器
网络安全领域公众号
网络安全观点视频
回收服务器机房网络设备
淘宝自学软件开发
java数据库连接池
中国有多少软件开发者
解说我的世界斗罗大陆服务器
查询网页服务器地址
江北区软件开发培训
数据库链接 400够用吗
iso是什么软件开发的
计算机网络安全的实现论文
聚合物材料物性数据库
2010年软件开发工具
数据库年龄需要加引号吗
回收服务器机房网络设备
都江堰市网络安全保卫大队
服务器上传文件找不到密码
be服务器未连接