flume如何自定义source、sink
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要为大家展示了"flume如何自定义source、sink",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"flume如何自定义source、sin
千家信息网最后更新 2025年12月02日flume如何自定义source、sink
这篇文章主要为大家展示了"flume如何自定义source、sink",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"flume如何自定义source、sink"这篇文章吧。
自定义source开发:
source是收集日志存入channel。
Source提供了两种机制:PollableSource(轮训拉取)和EventDrivenSource(事件驱动),
如果使用EventDrivenSource,你可以在start方法中启动额外的线程,不断的往channel中发数据。如果使用PollableSource,你可以在process()实现不断重发。
public class MySource extends AbstractSource implements Configurable, PollableSource { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation, convert to another type, ...) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external client } @Override public void stop () { // Disconnect from external client and do any additional cleanup // (e.g. releasing resources or nulling-out field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; try { // This try clause includes whatever Channel/Event operations you want to do // Receive new data Event e = getSomeData(); // Store the Event into this Source's associated Channel(s) getChannelProcessor().processEvent(e); status = Status.READY; } catch (Throwable t) { // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; }}或者
package org.apache.flume; import org.apache.flume.conf.Configurable; import org.apache.flume.source.AbstractSource; public class TailSource extends AbstractSource implements EventDrivenSource, Configurable { @Override public void configure(Context context) { } @Override public synchronized void start() { } @Override public synchronized void stop() { } }自定义sink:
sink是从channel中拉取日志处理。
process会不断调用,你只需在process中去取channel的数据即可。
public class MySink extends AbstractSink implements Configurable { private String myProp; @Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); // Process the myProp value (e.g. validation) // Store myProp for later retrieval by process() method this.myProp = myProp; } @Override public void start() { // Initialize the connection to the external repository (e.g. HDFS) that // this Sink will forward Events to .. } @Override public void stop () { // Disconnect from the external respository and do any // additional cleanup (e.g. releasing resources or nulling-out // field values) .. } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error)t; } } return status; }}以上是"flume如何自定义source、sink"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
e.g.
不断
内容
篇文章
数据
日志
学习
帮助
事件
只需
方法
易懂
更多
机制
条理
知识
线程
编带
行业
资讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
世界互联网大会方正科技
新田县人民医院网络安全
电脑机箱防尘棉服务器
山东软件开发多不多
视频网站的数据库
2008服务器配置
计算机网络安全论文答辩道歉
戴尔机架式服务器时间如何调整
数据库num什么类型
frp服务器安全问题
萨弗隆服务器
医院服务器管理系统
软件开发的目标客户群体
不能初始化数据库支持
微机pc服务器哪几种
软件开发App风险与对策
江西网络技术基础期末考试
河南服务器光模块云空间
中学生软件开发学校
跑分系统软件开发
广电总局网络安全排查
怎么看连接的服务器的文件夹
数据库汉字乱码怎么
网络安全法宣传内容
数据库三级结构模式由内到外
精英服务器
移动宽带访问联通服务器掉线
网络安全态势感知研究
网络安全防护加固
灵活用工平台软件开发排名