flume实际生产场景分析
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,需求:A B两台日志服务器实时生产日志主要类型为access.log、nginx.log、web.log,现在要求:把A、B机器中的access.log、nginx.log、web.log 采集汇总到
千家信息网最后更新 2025年12月02日flume实际生产场景分析
需求:A B两台日志服务器实时生产日志主要类型为access.log、nginx.log、web.log,现在要求:
把A、B机器中的access.log、nginx.log、web.log 采集汇总到 C 机器上然后统一收集到 hdfs中,但是在hdfs中要求的目录为:
/source/logs/access/日期/**
/source/logs/nginx/日期/**
/source/logs/web/日期/**
场景分析:
规划:
hadoop01(web01):
source:access.log 、nginx.log、web.log
channel:memory
sink:avro
hadoop02(web02):
source:access.log 、nginx.log、web.log
channel:memory
sink:avro
hadoop03(数据收集):
source;avro
channel:memory
sink:hdfs
配置文件:
#exec_source_avro_sink.properties#指定各个核心组件a1.sources = r1 r2 r3a1.sinks = k1a1.channels = c1#r1a1.sources.r1.type = execa1.sources.r1.command = tail -F /home/hadoop/flume_data/access.loga1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = typea1.sources.r1.interceptors.i1.value = access#r2a1.sources.r2.type = execa1.sources.r2.command = tail -F /home/hadoop/flume_data/nginx.loga1.sources.r2.interceptors = i2a1.sources.r2.interceptors.i2.type = statica1.sources.r2.interceptors.i2.key = typea1.sources.r2.interceptors.i2.value = nginx#r3a1.sources.r3.type = execa1.sources.r3.command = tail -F /home/hadoop/flume_data/web.loga1.sources.r3.interceptors = i3a1.sources.r3.interceptors.i3.type = statica1.sources.r3.interceptors.i3.key = typea1.sources.r3.interceptors.i3.value = web#Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop03a1.sinks.k1.port = 41414#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 20000a1.channels.c1.transactionCapacity = 10000#Bind the source and sink to the channela1.sources.r1.channels = c1a1.sources.r2.channels = c1a1.sources.r3.channels = c1a1.sinks.k1.channel = c1#avro_source_hdfs_sink.properties#定义 agent 名, source、channel、sink 的名称a1.sources = r1a1.sinks = k1a1.channels = c1#定义 sourcea1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port =41414#添加时间拦截器a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor$Builder#定义 channelsa1.channels.c1.type = memorya1.channels.c1.capacity = 20000a1.channels.c1.transactionCapacity = 10000#定义 sinka1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path=hdfs://myha01/source/logs/%{type}/%Y%m%da1.sinks.k1.hdfs.filePrefix =eventsa1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.writeFormat = Text#时间类型a1.sinks.k1.hdfs.useLocalTimeStamp = true#生成的文件不按条数生成a1.sinks.k1.hdfs.rollCount = 0#生成的文件按时间生成a1.sinks.k1.hdfs.rollInterval = 30#生成的文件按大小生成a1.sinks.k1.hdfs.rollSize = 10485760#批量写入 hdfs 的个数a1.sinks.k1.hdfs.batchSize = 20#flume 操作 hdfs 的线程数(包括新建,写入等)a1.sinks.k1.hdfs.threadsPoolSize=10#操作 hdfs 超时时间a1.sinks.k1.hdfs.callTimeout=30000#组装 source、channel、sinka1.sources.r1.channels = c1a1.sinks.k1.channel = c1测试:
#在hadoop01和 hadoop02上的/home/hadoop/data 有数据文件 access.log、nginx.log、 web.log#先启动hadoop03上的flume:(存储)flume-ng agent -c conf -f avro_source_hdfs_sink.properties -name a1 -Dflume.root.logger=DEBUG,console#然后在启动hadoop01和hadoop02上的命令flume(收集)flume-ng agent -c conf -f exec_source_avro_sink.properties -name a1 -Dflume.root.logger=DEBUG,console
生成
文件
时间
日期
数据
日志
机器
类型
场景
分析
生产
个数
名称
命令
大小
实时
是在
服务器
核心
目录
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发外包期末试卷
青年租房软件开发
临时限速服务器
如何备考三级计算机网络技术
网购中网络技术的制约
戴尔服务器跳过开机自检
网络安全小知识观后感第197期
软件开发的工作工资怎么样
数据库技术产品调研
常州企业软件开发推荐咨询
网络安全小品视频下载
阿里云企业邮箱服务器
hp服务器怎么改家用电脑
数据库应用包含exls表吗
java服务器进不去怎么办
学软件开发什么书好
芜湖医疗软件开发公司
中国网络技术是自给自足吗
怎么制定软件开发计划书
聚嘉网络技术
j1900服务器多少人访问
网络安全运营工作怎样
要加强网络安全宣传单
赤峰专业app软件开发培训班
广州程序软件开发哪家便宜
软件开发制度包括
达内科技 重庆软件开发
数据库一对一关系 示例
软件开发书推荐
有趣网络安全小提示