Flume的介绍和简单操作
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Flume是什么Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据
千家信息网最后更新 2025年12月03日Flume的介绍和简单操作
Flume是什么
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume的功能
- 支持在日志系统中定制各类数据发送方,用于收集数据
- 提供对数据简单处理,并写到各类数据接收方(可定制)的能力
Flume的组成
- Agent:核心组件
- source 负责数据的产生或搜集
- channel 是一种短暂的存储容器,负责数据的存储持久化
- sink 负责数据的转发
Flume的工作流示意图
- 数据流模型
- 多Agent模型
- 合并模型
- 混合模型
Flume的安装
下载安装包并解压
wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gztar -zxvf apache-flume-1.8.0-bin.tar.gz配置环境变量
vim ~/.bashrcexport FLUME_HOME=/usr/local/src/apache-flume-1.8.0-binexport PATH=$PATH:$FLUME_HOME/binsource ~/.bashrcFlume简单操作
- netcat模式
进入conf目录下编写netcat.conf文件,内容如下:
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = netcatagent.sources.netcatSource.bind = localhostagent.sources.netcatSource.port = 11111agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10启动一个实例
(py27) [root@master conf]# pwd/usr/local/src/apache-flume-1.8.0-bin/conf(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./netcat.conf --name agent -Dflume.root.logger=INFO,console启动成功
18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent]18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource, type netcat18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink]18/10/24 11:26:35 INFO node.Application: Starting new configuration:{ sourceRunners:{netcatSource=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcatSource,state:IDLE} }} sinkRunners:{loggerSink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup:{ name:null counters:{} } }} channels:{memoryChannel=org.apache.flume.channel.MemoryChannel{name: memoryChannel}} }18/10/24 11:26:35 INFO node.Application: Starting Channel memoryChannel18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: memoryChannel started18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource18/10/24 11:26:36 INFO source.NetcatSource: Source starting18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.155.120:11111]然后新开一个终端,发送数据
(py27) [root@master apache-flume-1.8.0-bin]# telnet localhost 11111Trying 127.0.0.1...Connected to localhost.Escape character is '^]'.1OK查看接收数据
18/10/24 11:30:15 INFO sink.LoggerSink: Event: { headers:{} body: 31 0D 1. }注:如果没有telnet工具,请先安装:yum install telnet
- Exec模式
编写配置文件exec.conf
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10启动实例
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec.conf --name agent -Dflume.root.logger=INFO,console启动成功后,创建配置文件中的exec.log文件
(py27) [root@master test_data]# lsexec.log(py27) [root@master test_data]# pwd/home/master/FlumeTest/test_data(py27) [root@master test_data]#然后通过echo命令模拟日志的产生
(py27) [root@master test_data]# echo 'Hello World!!!' >> exec.log查看接收的日志
18/10/25 09:19:52 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21 Hello World!!! }如何将日志保存到HDFS上
修改配置文件
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = hdfs agent.sinks.loggerSink.hdfs.path = /flume/%y-%m-%d/%H%M/agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_agent.sinks.loggerSink.hdfs.round = trueagent.sinks.loggerSink.hdfs.roundValue = 1agent.sinks.loggerSink.hdfs.roundUnit = minuteagent.sinks.loggerSink.hdfs.rollInterval = 3agent.sinks.loggerSink.hdfs.rollSize = 20agent.sinks.loggerSink.hdfs.rollCount = 5agent.sinks.loggerSink.hdfs.useLocalTimeStamp = trueagent.sinks.loggerSink.hdfs.fileType = DataStreamagent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10然后启动实例
(py27) [root@master conf]# flume-ng agent --conf conf --conf-file ./flume_exec_hdfs.conf --name agent -Dflume.root.logger=INFO,console然后可以看到它把exec.log文件里的日志给写到了HDFS上
18/10/25 09:54:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing /flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming /flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to /flume/18-10-25/0954/exec_hdfs_.154047566662318/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.我们进入HDFS查看,可以看到log里的内容
(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0954Found 1 items-rw-r--r-- 3 root supergroup 15 2018-10-25 09:54 /flume/18-10-25/0954/exec_hdfs_.1540475666623(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0954/exec_hdfs_.1540475666623Hello World!!!然后我们再次写入写的log,然后再查看
//写入新的log(py27) [root@master test_data]# echo 'test001' >> exec.log (py27) [root@master test_data]# echo 'test002' >> exec.log//进入HDFS目录查看(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25Found 2 itemsdrwxr-xr-x - root supergroup 0 2018-10-25 09:54 /flume/18-10-25/0954drwxr-xr-x - root supergroup 0 2018-10-25 09:56 /flume/18-10-25/0956(py27) [root@master sbin]# hadoop fs -ls /flume/18-10-25/0956Found 1 items-rw-r--r-- 3 root supergroup 16 2018-10-25 09:56 /flume/18-10-25/0956/exec_hdfs_.1540475766338(py27) [root@master sbin]# hadoop fs -text /flume/18-10-25/0956/exec_hdfs_.1540475766338test001test002- 故障转移实例
首先需要三台机器,master、slave1、slave2,然后分别配置实例并启动,master上的agent实例发送日志,slave1和slave2接收日志
master配置
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSink1 loggerSink2agent.sinkgroups = groupagent.sources.netcatSource.type = execagent.sources.netcatSource.command = tail -f /home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink1.type = avroagent.sinks.loggerSink1.hostname = slave1agent.sinks.loggerSink1.port = 52020agent.sinks.loggerSink1.channel = memoryChannelagent.sinks.loggerSink2.type = avroagent.sinks.loggerSink2.hostname = slave2agent.sinks.loggerSink2.port = 52020agent.sinks.loggerSink2.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000agent.sinkgroups.group.sinks = loggerSink1 loggerSink2agent.sinkgroups.group.processor.type = failoveragent.sinkgroups.group.processor.loggerSink1 = 10agent.sinkgroups.group.processor.loggerSink2 = 1agent.sinkgroups.group.processor.maxpenalty = 10000slave1配置
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave1agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000slave2配置
agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave2agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000分别启动master、slave1、slave2的agent,然后在mater上写入日志,然后观察谁收到了
//master(py27) [root@master test_data]# echo 'hello' >> exec.log //slave118/10/25 10:53:53 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F hello }//slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:39726 发现是slave1收到数据,然后我们把slave1的agent关掉,再次发送日志
//master(py27) [root@master test_data]# echo '11111' >> exec.log //slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:3972618/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31 11111 }然后再次启动slave1的agent
//master(py27) [root@master test_data]# echo '22222' >> exec.log //slave118/10/25 10:58:21 INFO sink.LoggerSink: Event: { headers:{} body: 32 32 32 32 32 22222 }//slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3, /172.16.155.120:39726 => /172.16.155.122:52020] CONNECTED: /172.16.155.120:3972618/10/25 10:56:53 INFO sink.LoggerSink: Event: { headers:{} body: 31 31 31 31 31 11111 }
数据
日志
配置
实例
文件
模型
再次
系统
成功
内容
模式
目录
能力
处理
存储
支持
下编
分布式
功能
变量
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全督查考核制度
水墨网络安全
网络安全防范宣传知识
调用接口同步数据库
电脑怎么控制游戏的服务器
平安社区管理系统服务器设置
宁波专业软件开发公司
数据库连接1114
计算机网络技术的本质
软件开发认证体系
服务器主机区别
印象笔记国际版服务器在哪
一名网站怎么在香港服务器访问
数据库优化的几种方式orar
通信网络安全都有什么问题
门头沟区卫星软件开发诚信经营
熊孩子挑战服务器管理员
软件开发流程与架构
网络安全与犯罪调查第二季
好口碑微信小程序服务器托管
网络安全公司哪个工资高
网络安全预警工作情况
上海转介绍软件开发
江苏idc服务器供货厂
浙江项目软件开发哪家可靠
停车系统软件开发哪家好
网络安全宣传文字内容
网络安全 信息化 人民至上
软件开发职业发展路线
捕获spring数据库错误