千家信息网

如何解析Flume与Kafka整合

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章给大家介绍如何解析Flume与Kafka整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Flume与Kafka整合一、概念1、Flume:Cloudera 开发的分布式
千家信息网最后更新 2025年12月03日如何解析Flume与Kafka整合

这篇文章给大家介绍如何解析Flume与Kafka整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

Flume与Kafka整合


一、概念
1、Flume:Cloudera 开发的分布式日志收集系统,是一种分布式,可靠且可用的服务,用于高效地收集,汇总和移动大量日志数据。 它具有基于流式数据流的简单而灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。Flume分为OG、NG版本,其中Flume OG 的最后一个发行版本 0.94.0,之后为NG版本。

2、Kafka:作为一个集群运行在一台或多台可以跨越多个数据中心的服务器上。在Kafka中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的TCP协议完成的。协议是版本控制的,并保持与旧版本的向后兼容性。Kafka提供Java客户端,但客户端可以使用多种语言。

3、Kafka通常用于两大类应用,如下:
A、构建可在系统或应用程序之间可靠获取数据的实时流数据管道
B、构建实时流应用程序,用于转换或响应数据流
C、Kafka每个记录由一个键,一个值和一个时间戳组成。

二、产述背景
基于大数据领域实现日志数据时时采集及数据传递等需要,据此需求下试着完成flume+kafka扇入、扇出功能整合,其中扇出包括:复制流、复用流等功能性测试。后续根据实际需要,将完善kafka与spark streaming进行整合整理工作。
注:此文档仅限于功能性测试,性能优化方面请大家根据实际情况增加。

三、部署安装
1、测试环境说明:
操作系统:CentOS 7
Flume版本:flume-ng-1.6.0-cdh6.7.0
Kafka版本:kafka_2.11-0.10.0.1
JDK版本:JDK1.8.0
Scala版本:2.11.8
2、测试步骤:
2.1、flume部署
2.1.1、下载安装介质,并解压:

此处)折叠或打开

此处)折叠或打开

此处)折叠或打开

  1. cd /app/apache-flume-1.6.0-cdh6.7.0-bin

  2. vi netcatOrKafka-memory-logger.conf

  3. netcatagent.sources = netcat_sources

  4. netcatagent.channels = c1 c2

  5. netcatagent.sinks = logger_sinks kafka_sinks


  6. netcatagent.sources.netcat_sources.type = netcat

  7. netcatagent.sources.netcat_sources.bind = 0.0.0.0

  8. netcatagent.sources.netcat_sources.port = 44444


  9. netcatagent.channels.c1.type = memory

  10. netcatagent.channels.c1.capacity = 1000

  11. netcatagent.channels.c1.transactionCapacity = 100


  12. netcatagent.channels.c2.type = memory

  13. netcatagent.channels.c2.capacity = 1000

  14. netcatagent.channels.c2.transactionCapacity = 100


  15. netcatagent.sinks.logger_sinks.type = logger


  16. netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

  17. netcatagent.sinks.kafka_sinks.topic = test

  18. netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082

  19. netcatagent.sinks.kafka_sinks.requiredAcks = 0

  20. ##netcatagent.sinks.kafka_sinks.batchSize = 20

  21. netcatagent.sinks.kafka_sinks.producer.type=sync

  22. netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

  23. netcatagent.sinks.kafka_sinks.partition.key=0

  24. netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

  25. netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

  26. netcatagent.sinks.kafka_sinks.max.message.size=1000000


  27. netcatagent.sources.netcat_sources.selector.type = replicating


  28. netcatagent.sources.netcat_sources.channels = c1 c2

  29. netcatagent.sinks.logger_sinks.channel = c1

  30. netcatagent.sinks.kafka_sinks.channel = c2

2.4.2、启动各测试命令:
A、启动flume的agent(于192.168.137.130):
flume-ng agent --name netcatagent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
-Dflume.root.logger=INFO,console
B、启动kafka消费者(于192.168.137.132):
kafka-console-consumer.sh \
--zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--from-beginning --topic test
C、测试发送(于192.168.137.130与于192.168.137.132)
telnet发送结果

kafka消费结果

最终logger接收结果


至此flume+kafka扇出--复制流测试(扇入源为:netcat;输出为:kafka+Flume的Logger)测试与验证完成。

2.5、flume+kafka扇出--复用流测试(扇入源为:netcat;输出为:kafka+Flume的Logger)

暂无,后续补充



四、部署安装及验证过程中出现的问题

1、做flume+kafka扇入测试(扇入源为:netcat+kafka;输出以Flume的Logger类型输出)时,一直未收到kafka数据
主要原因是在做kafka的配置时在配置文件(server.properties)中写成内容:
zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
但在创建topics时,使用的是:
kafka-topics.sh --create \
--zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test

其中在kafka的配置文件中zookeeper配置未加/kakfa,但在创建topics的时增加了/kafka
最终使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令检查没有topics信息才发现此问题

解决办法:将两个信息同步即可

2、做flume+kafka扇入测试(扇入源为:netcat+kafka;输出以Flume的Logger类型输出)时,启动flume的agent时报错。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
... 11 more


解决办法:官网资料存在问题,org.apache.flume.source.kafka,KafkaSource其中不应该包括逗号,改为:org.apache.flume.source.kafka.KafkaSource即可。详细官网

关于如何解析Flume与Kafka整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

数据 测试 版本 输出 整合 应用 配置 内容 功能 客户 客户端 应用程序 日志 程序 系统 结果 问题 服务 之间 信息 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 我的世界生存随机掉落服务器推荐 华为软件开发平台欠费 苏州软件开发实习招聘 做软件开发真的找不到女朋友吗 单元测试操作数据库报错 网络安全的红线视频 战国文字数据库 海康服务器81端口 杨紫网络安全 石家庄手机软件开发 购买一台服务器需要具备什么条件 数据库样本删减是学术不端吗 蓝风宿网络技术 厦门航海时代软件开发有限公司 直播软件开发难点 佛山安捷信网络技术公司 ipad怎么下载宝可梦服务器 服务器logs文件夹是什么意思 为什么原神连接服务器失败 mysql数据库增量备份 摩尔庄园如何看自己在哪个服务器 江西本地软件开发定制市场价格 软件开发企业需要的设备 云南云主机集群管理服务器 特信网络技术 网络技术讨论论坛 秦皇岛软件开发五星服务 区块狗系统软件开发装 中国工商银行招聘软件开发 万古网络技术科技有限公司
0