kafka javaAPI入库程序的实现方法
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"kafka javaAPI入库程序的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka javaAPI入库程序的实现
千家信息网最后更新 2025年12月02日kafka javaAPI入库程序的实现方法
这篇文章主要讲解了"kafka javaAPI入库程序的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka javaAPI入库程序的实现方法"吧!
讲解
maven导包
org.apache.kafka kafka-clients 2.3.0
连接kafka
Properties props = new Properties();props.put("acks", "all"); //保证所有副本接受到消息props.put("bootstrap.servers", Config.ipList); //可设置多个props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("retries", "2");KafkaProducer produce= new KafkaProducer(props); kerberos认证
kerberos是大数据平台的安全认证策略,可在项目启动时先一步完成。这里介绍两种实现方式。
方式一
指定认证文件
//加载keberos配置文件System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); //加载kerberos用户文件System.setProperty("java.security.auth.login.config", "/etc/kafka/conf/kafka_jaas.conf");方式二
某些时候,考虑到用户切换,不同机器,有不同的用户信息,每个都要通过配置文件设置,比较麻烦,考虑使用java的启动的临时文件功能(主要是炫技--微笑)。
//加载keberos配置文件System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");KafkaUtil.configureJAAS(Config.tabFile, Config.principal); //用户和认证文件/** * 生成jaas.conf临时文件 * @param keyTab tab认证文件位置 * @param principal 认证用户 */public static void configureJAAS(String keyTab, String principal) { String JAAS_TEMPLATE = "KafkaClient {\n" + "com.sun.security.auth.module.Krb5LoginModule required\n" + "useKeyTab=true\n" + "keyTab=\"%1$s\"\n" + "principal=\"%2$s\";\n" + "};"; String content = String.format(JAAS_TEMPLATE, keyTab, principal); File jaasConf = null; PrintWriter writer = null; try { jaasConf = File.createTempFile("jaas", ".conf"); writer = new PrintWriter(jaasConf); writer.println(content); } catch (IOException e) { e.printStackTrace(); } finally { if (writer != null) { writer.close(); } jaasConf.deleteOnExit(); } System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath()); }应用
实际线上使用时,考虑到数据传输效率和稳定性,要做以下优化。
传输类为线程类,线程池管理,增加传输效率。
批量上传数据。
添加Callback处理机制,避免数据丢失。
上传线程类如下。
public class Performance extends Thread{ private final static Logger log = LoggerFactory.getLogger(Performance.class); private List> recordList; public Performance(List> recordList) { this.recordList=recordList; } /** *入库测试方法 */ public static void test() { log.info("Kafka Tool Test"); try { /* parse args */ String topicName ="test40"; /*总发包数*/ long numRecords = 10000000000L; /*包大小*/ int recordSize = 1500; /*每次最多发送包数*/ int throughput = 10000000; Properties props = new Properties(); props.put("acks", "1"); props.put("bootstrap.servers","ip:6667,ip:6667"); props.put("sasl.kerberos.service.name", "kafka"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer(props); /* 创建测试数据 */ byte[] payload = new byte[recordSize]; Random random = new Random(0); for (int i = 0; i < payload.length; ++i) payload[i] = (byte) (random.nextInt(26) + 65); /*创建测试数据发送对象*/ ProducerRecord record = new ProducerRecord(topicName, payload); /*测试数据模型 包总数*/ Stats stats = new Stats(numRecords, 5000); /*启动时间*/ long startMs = System.currentTimeMillis(); /*帮助生成者发送流量类 每次最多发送包数 时间*/ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats,record.topic(),record.value()); producer.send(record, cb); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } } /* 结束任务 */ producer.close(); stats.printTotal(); } catch (Exception e) { log.info("Test Error:"+e); } } /** * 实际入库方法 */ @Override public void run() {// log.info("Start To Send:"); super.run(); KafkaUtil kafkaUtil=new KafkaUtil(); KafkaProducer produce=kafkaUtil.create(); //总包数 long size=recordList.size();// size=10000000000L; /*每次最多发送包数*/ int throughput = 900000;// throughput = 10000000; /*测试数据模型 包总数*/ Stats stats = new Stats(size, 5000); /*启动时间*/ long startMs = System.currentTimeMillis(); /*帮助生成者发送流量类 每次最多发送包数 时间*/ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); int i=0; for (ProducerRecord record:recordList) { long sendStartMs = System.currentTimeMillis(); //参数说明:发送数据时间 数据长度 数据模型类 Callback cb = stats.nextCompletion(sendStartMs, record.value().length, stats,record.topic(),record.value()); produce.send(record,cb); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } i++; } produce.close();// stats.printTotal();// log.info("End to Send"); log.info("Finish Data To Send"); LogModel.sendNum++; } private static class Stats { private long start; private long windowStart; private int[] latencies; private int sampling; private int iteration; private int index; private long count; private long bytes; private int maxLatency; private long totalLatency; private long windowCount; private int windowMaxLatency; private long windowTotalLatency; private long windowBytes; private long reportingInterval; public Stats(long numRecords, int reportingInterval) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.index = 0; this.iteration = 0; this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); this.latencies = new int[(int) (numRecords / this.sampling) + 1]; this.index = 0; this.maxLatency = 0; this.totalLatency = 0; this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; this.reportingInterval = reportingInterval; } public void record(int iter, int latency, int bytes, long time) { this.count++; this.bytes += bytes; this.totalLatency += latency; this.maxLatency = Math.max(this.maxLatency, latency); this.windowCount++; this.windowBytes += bytes; this.windowTotalLatency += latency; this.windowMaxLatency = Math.max(windowMaxLatency, latency); if (iter % this.sampling == 0) { this.latencies[index] = latency; this.index++; } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { printWindow(); newWindow(); } } public Callback nextCompletion(long start, int bytes, Stats stats,String topic,byte[] data) { Callback cb = new PerfCallback(this.iteration, start, bytes, stats,topic,data); this.iteration++; return cb; } /** * 传输效率反馈 */ public void printWindow() { long ellapsed = System.currentTimeMillis() - windowStart; double recsPerSec = 1000.0 * windowCount / (double) ellapsed; double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0); System.out.printf("%d spend time,%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n", ellapsed, windowCount, recsPerSec, mbPerSec, windowTotalLatency / (double) windowCount, (double) windowMaxLatency); } public void newWindow() { this.windowStart = System.currentTimeMillis(); this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; } /** * 传输效率 */ public void printTotal() { long elapsed = System.currentTimeMillis() - start; double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d spend time,%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n", elapsed, count, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency, percs[0], percs[1], percs[2], percs[3]); } private static int[] percentiles(int[] latencies, int count, double... percentiles) { int size = Math.min(count, latencies.length); Arrays.sort(latencies, 0, size); int[] values = new int[percentiles.length]; for (int i = 0; i < percentiles.length; i++) { int index = (int) (percentiles[i] * size); values[i] = latencies[index]; } return values; } } private static final class PerfCallback implements Callback { private final long start; private final int iteration; private final int bytes; private final Stats stats; private final String topic; private final byte[] data; public PerfCallback(int iter, long start, int bytes, Stats stats,String topic,byte[] data) { this.start = start; this.stats = stats; this.iteration = iter; this.bytes = bytes; this.topic=topic; this.data=data; } public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); this.stats.record(iteration, latency, bytes, now); if (exception != null){ ProducerRecord record=new ProducerRecord(topic,data); //将数据重新添加入数据队列,二次上传 ControlTask.recordList.add(record); log.error("Send Error And Second To Send",exception); } } }} KafkaUtil.java
public class KafkaUtil {// private final static Logger log = LoggerFactory.getLogger(KafkaUtil.class); private KafkaProducer produce; /** * 创建连接 * @return */ public KafkaProducer create(){ Properties props = new Properties(); props.put("acks", "all"); props.put("bootstrap.servers", Config.ipList); props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");// props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000); //增加等待时间 props.put("retries", "2"); //kerbores安全认证 if(Config.kerberos==0){ props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "GSSAPI"); props.put("sasl.kerberos.service.name", "kafka"); } produce = new KafkaProducer(props); return produce; } /** * 发送数据 * @param record * @param cb */ public void send(ProducerRecord record,Callback cb){ produce.send(record,cb); } /** * 关闭连接 * @param produce */ public void close(){ produce.flush(); produce.close(); } /** * 生成jaas.conf临时文件 * @param keyTab tab认证文件位置 * @param principal 认证用户 */ public static void configureJAAS(String keyTab, String principal) { String JAAS_TEMPLATE = "KafkaClient {\n" + "com.sun.security.auth.module.Krb5LoginModule required\n" + "useKeyTab=true\n" + "keyTab=\"%1$s\"\n" + "principal=\"%2$s\";\n" + "};"; String content = String.format(JAAS_TEMPLATE, keyTab, principal); File jaasConf = null; PrintWriter writer = null; try { jaasConf = File.createTempFile("jaas", ".conf"); writer = new PrintWriter(jaasConf); writer.println(content); } catch (IOException e) { e.printStackTrace(); } finally { if (writer != null) { writer.close(); } jaasConf.deleteOnExit(); } System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath()); }} 感谢各位的阅读,以上就是"kafka javaAPI入库程序的实现方法"的内容了,经过本文的学习后,相信大家对kafka javaAPI入库程序的实现方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
文件
认证
方法
时间
用户
传输
测试
程序
效率
生成
方式
模型
线程
学习
配置
不同
安全
位置
内容
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
pc软件与手机软件开发
网上商城系统数据库建模
为什么要学习网络安全技术
我的世界创建服务器教程
黄浦区工程网络技术厂家直销
网吧机房服务器开关
服务器通电自启
分布式数据库修改
杭州搜猴网络技术
网络安全管控系统
ps4软件开发商
社团网络技术部的面试问题
亳州设备保养管理软件开发平台
崇越软件开发有限公司
质量监测软件开发
深圳支付软件开发公司
pacs在线存储管理服务器
服务器进安全模式进不去
网络技术专业国有企业
栖霞区常规软件开发创新服务
查看pg数据库是否安装
学校网络安全信息安全重要性
专科生能学网络安全专业吗
上海互联网软件开发专业服务
软件开发架构分工
陕西服务器机柜厂家供应云服务器
联想电脑服务器专卖
数据库常用的常量
hive是分布式数据库吗
查sql 数据库连接数量