千家信息网

【Flume】HDFSSink源码理解

发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,HDFSSink组件中,主要由HDFSEventSink,BucketWriter,HDFSWriter几个类构成。其中HDFSEventSink主要功能呢是判定Sink的配置条件是否合法,并负责从C
千家信息网最后更新 2025年12月02日【Flume】HDFSSink源码理解

HDFSSink组件中,主要由HDFSEventSink,BucketWriter,HDFSWriter几个类构成。

其中HDFSEventSink主要功能呢是判定Sink的配置条件是否合法,并负责从Channel中获取events,通过解析event的header信息决定event对应的BucketWriter。

BucketWriter负责按照rollCount,rollSize等条件在HDFS端生成(roll)文件,通过配置文件配置的文件数据格式以及序列化的方式,在每个BucetWriter同一处理。

HDFSWriter作为接口,其具体实现有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream这三种




HDFSSink功能中关键类类图


HDFSEventSink类

走通HDFSEventSink之前,肯定要对其中配置参数有了解(Flume-HDFSSink配置参数说明)

1、configure()方法中,从配置文件中获取filePath,fileName等信息,具体参数含义可以参考(Flume-HDFSSink配置参数说明)


2、start()方法,初始化固定大小线程池callTimeoutPool, 周期执行线程池timedRollerPool,以及sfWriters,并启动sinkCounter

  1. callTimeoutPool

  2. timedRollerPool,周期执行线程池中主要有HDFS文件重命名的线程(根据retryInterval),达到生成文件要求进行roll操作的线程(根据idleTimeout),关闭闲置文件的线程等(rollInterval)

  3. sfWriters sfWriters实际是一个LinkedHashMap的实现类,通过重写removeEldestEntry方法,将最久未使用的writer移除,保证sfWriters中能够维护一个固定大小(maxOpenFiles)的最大打开文件数

  4. sinkCounter sink组件监控指标的计数器


3、process() 方法是HDFSEventSink中最主要的逻辑(部分关键节点通过注释写到代码中),

  1. process()方法中获取到Channel,

  2. 并按照batchSize大小循环从Channel中获取event,通过解析event得到event的header等信息,确定该event的HDFS目的路径以及目的文件名

  3. 每个event可能对应不同的bucketWriter和hdfswriter,将每个event添加到相应的writer中

  4. 当event个数达到batchSize个数后,writer进行flush,同时提交事务

其中bucketWriter负责生成(roll)文件的方式,处理文件格式以及序列化等逻辑

其中hdfsWriter具体实现有"SequenceFile","DataStream","CompressedStream";三种,用户根据hdfs.fileType参数确定具体hdfsWriter的实现

public Status process() throws EventDeliveryException {Channel channel = getChannel(); //调用父类getChannel方法,建立Channel与Sink之间的连接Transaction transaction = channel.getTransaction();//每次batch提交都建立在一个事务上transaction.begin();try {Set writers = new LinkedHashSet<>();int txnEventCount = 0;for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {Event event = channel.take();//从Channel中取出eventif (event == null) {//没有新的event的时候,则不需要按照batchSize循环取break;}// reconstruct the path name by substituting place holders// 在配置文件中会有"a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S"这样的%表示的变量// 解析配置文件中的变量构造realPath 和 realNameString realPath = BucketPath.escapeString(filePath, event.getHeaders(),timeZone, needRounding, roundUnit, roundValue, useLocalTime);String realName = BucketPath.escapeString(fileName, event.getHeaders(),timeZone, needRounding, roundUnit, roundValue, useLocalTime);String lookupPath = realPath + DIRECTORY_DELIMITER + realName;BucketWriter bucketWriter;HDFSWriter hdfsWriter = null;WriterCallback closeCallback = new WriterCallback() {@Overridepublic void run(String bucketPath) {LOG.info("Writer callback called.");synchronized (sfWritersLock) {sfWriters.remove(bucketPath);//sfWriters以LRU方式维护了一个maxOpenFiles大小的map.始终保持最多打开文件个数}}};synchronized (sfWritersLock) {bucketWriter = sfWriters.get(lookupPath);// we haven't seen this file yet, so open it and cache the handleif (bucketWriter == null) {hdfsWriter = writerFactory.getWriter(fileType);//通过工厂获取文件类型,其中包括"SequenceFile","DataStream","CompressedStream";bucketWriter = initializeBucketWriter(realPath, realName,lookupPath, hdfsWriter, closeCallback);sfWriters.put(lookupPath, bucketWriter);}}// Write the data to HDFStry {bucketWriter.append(event);} catch (BucketClosedException ex) {LOG.info("Bucket was closed while trying to append, " +"reinitializing bucket and writing event.");hdfsWriter = writerFactory.getWriter(fileType);bucketWriter = initializeBucketWriter(realPath, realName,lookupPath, hdfsWriter, closeCallback);synchronized (sfWritersLock) {sfWriters.put(lookupPath, bucketWriter);}bucketWriter.append(event);}// track the buckets getting written in this transactionif (!writers.contains(bucketWriter)) {writers.add(bucketWriter);}}if (txnEventCount == 0) {sinkCounter.incrementBatchEmptyCount();} else if (txnEventCount == batchSize) {sinkCounter.incrementBatchCompleteCount();} else {sinkCounter.incrementBatchUnderflowCount();}// flush all pending buckets before committing the transactionfor (BucketWriter bucketWriter : writers) {bucketWriter.flush();}transaction.commit();if (txnEventCount < 1) {return Status.BACKOFF;} else {sinkCounter.addToEventDrainSuccessCount(txnEventCount);return Status.READY;}} catch (IOException eIO) {transaction.rollback();LOG.warn("HDFS IO error", eIO);return Status.BACKOFF;} catch (Throwable th) {transaction.rollback();LOG.error("process failed", th);if (th instanceof Error) {throw (Error) th;} else {throw new EventDeliveryException(th);}} finally {transaction.close();}}

BucketWriter

flush() 方法:

BucketWriter中维护了一个batchCounter,在这个batchCounter大小不为0的时候会进行doFlush(), doFlush()主要就是对batch中的event进行序列化和输出流flush操作,最终结果就是将events写入HDFS中。

如果用户设置了idleTimeout参数不为0,在doFlush()操作之后,会往定时执行线程池中添加一个任务,该关闭当前连接HDFS的输出对象HDFSWriter,执行时间间隔为idleTimeout,并将这个延迟调度的任务赋值给idleFuture变量。


append()方法:

在介绍flush()方法中,会介绍一个idleFuture变量对应的功能,在append()方法执行前首先会检查idleFuture任务是否执行完毕,如果没有执行完成会设置一个超时时间callTimeout等待该进程完成,然后再进行append之后的操作。这样做主要是为了防止关闭HdfsWriter的过程中还在往HDFS中append数据,在append一半时候,HdfsWriter关闭了。

之后,在正是append()之前,又要首先检查当前是否存在HDFSWirter可用于append操作,如果没有调用open()方法。

每次将event往hdfs中append的时候都需要对rollCount,rollSize两个参数进行检查,在满足这两个参数条件的情况下,就需要将临时文件重命名为(roll)正式的HDFS文件。之后,重新再open一个hdfswriter,往这个hdfswriter中append每个event,当event个数达到batchSize时,进行flush操作。

public synchronized void append(final Event event) throws IOException, InterruptedException {checkAndThrowInterruptedException();// idleFuture是ScheduledFuture实例,主要功能关闭当前HDFSWriter,在append event之前需要判断// idleFuture是否已经执行完成,否则会造成在append一半的时候 hdfswriter被关闭if (idleFuture != null) {idleFuture.cancel(false);// There is still a small race condition - if the idleFuture is already// running, interrupting it can cause HDFS close operation to throw -// so we cannot interrupt it while running. If the future could not be// cancelled, it is already running - wait for it to finish before// attempting to write.if (!idleFuture.isDone()) {try {idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);} catch (TimeoutException ex) {LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +" file close may have failed", ex);} catch (Exception ex) {LOG.warn("Error while trying to cancel closing of idle file. ", ex);}}idleFuture = null;}// If the bucket writer was closed due to roll timeout or idle timeout,// force a new bucket writer to be created. Roll count and roll size will// just reuse this oneif (!isOpen) {if (closed) {throw new BucketClosedException("This bucket writer was closed and " +"this handle is thus no longer valid");}open();}// 检查rollCount,rollSize两个roll文件的参数,判断是否roll出新文件if (shouldRotate()) {boolean doRotate = true;if (isUnderReplicated) {if (maxConsecUnderReplRotations > 0 &&consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {doRotate = false;if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {LOG.error("Hit max consecutive under-replication rotations ({}); " +"will not continue rolling files under this path due to " +"under-replication", maxConsecUnderReplRotations);}} else {LOG.warn("Block Under-replication detected. Rotating file.");}consecutiveUnderReplRotateCount++;} else {consecutiveUnderReplRotateCount = 0;}if (doRotate) {close();open();}}// write the eventtry {sinkCounter.incrementEventDrainAttemptCount();// sinkCounter统计metrixcallWithTimeout(new CallRunner() {@Overridepublic Void call() throws Exception {writer.append(event); //writer是通过配置参数hdfs.fileType创建的HDFSWriter实现return null;}});} catch (IOException e) {LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +bucketPath + ") and rethrowing exception.",e.getMessage());try {close(true);} catch (IOException e2) {LOG.warn("Caught IOException while closing file (" +bucketPath + "). Exception follows.", e2);}throw e;}// update statisticsprocessSize += event.getBody().length;eventCounter++;batchCounter++;if (batchCounter == batchSize) {flush();}}


文件 方法 参数 配置 线程 大小 时候 个数 功能 变量 检查 两个 任务 信息 序列 方式 条件 生成 事务 周期 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 仿真软件开发 工具 阿里云服务器区别 阿里云mqtt服务器最低多少钱 大数据时代信息网络安全 微信公众号配置服务器 各种主流无线传感网络技术 福山区直播软件开发公司有哪些 怎么检查数据库有没有安装 ps SQL数据库默认用户 业务逻辑服务器 服务器断电如何保证数据一致性 大方互动网络技术有限公司 崇明区辅助软件开发收费套餐 有发展的语音聊天软件开发 软件开发类游戏 搭建内网穿透服务器 带web 现在安卓软件开发多吗 软件开发名片实例 店达网络技术有限公司 杭州应用软件开发收费报价表 mysql数据库控制台 模拟城市我是市长服务器怎么登陆 徐州专业联想服务器安装 如何维护云服务器安全 软件开发工程师快速成长的套路 软件开发银行项目简历模板 手机断断续续连接不上服务器 相亲交友找对象软件开发 数据库导入表结构 和个人相关的网络安全
0