flume-ng怎么自定义拦截器
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容主要讲解"flume-ng怎么自定义拦截器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flume-ng怎么自定义拦截器"吧!代码如下:packa
千家信息网最后更新 2025年12月03日flume-ng怎么自定义拦截器
本篇内容主要讲解"flume-ng怎么自定义拦截器",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"flume-ng怎么自定义拦截器"吧!
代码如下:
package com.wy.flume.interceptor;import java.util.List;import java.util.Map;import java.util.regex.Matcher;import java.util.regex.Pattern;import org.apache.commons.lang.StringUtils;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.common.base.Charsets;import com.google.common.base.Preconditions;import com.google.common.base.Throwables;import com.google.common.collect.Lists;public class RegexExtractorHeaderInterceptor implements Interceptor { static final String REGEX = "regex"; static final String SERIALIZERS = "serializers"; static final String EXTRACTOR_HEADER = "extractorHeader"; static final boolean DEFAULT_EXTRACTOR_HEADER = false; static final String EXTRACTOR_HEADER_KEY = "extractorHeaderKey"; private static final Logger logger = LoggerFactory .getLogger(RegexExtractorHeaderInterceptor.class); private final Pattern regex; private final List serializers; private final boolean extractorHeader; private final String extractorHeaderKey; private RegexExtractorHeaderInterceptor(Pattern regex, List serializers,boolean extractorHeader, String extractorHeaderKey) { this.regex = regex; this.serializers = serializers; this.extractorHeader = extractorHeader; this.extractorHeaderKey = extractorHeaderKey; } @Override public void initialize() { // NO-OP... } @Override public void close() { // NO-OP... } @Override public Event intercept(Event event) { String extractorHeaderVal; if (extractorHeader){ extractorHeaderVal = event.getHeaders().get(extractorHeaderKey); }else{ extractorHeaderVal = new String(event.getBody(),Charsets.UTF_8); } Matcher matcher = regex.matcher(extractorHeaderVal); Map headers = event.getHeaders(); if (matcher.find()) { for (int group = 0, count = matcher.groupCount(); group < count; group++) { int groupIndex = group + 1; if (groupIndex > serializers.size()) { if (logger.isDebugEnabled()) { logger.debug("Skipping group {} to {} due to missing serializer", group, count); } break; } NameAndSerializer serializer = serializers.get(group); if (logger.isDebugEnabled()) { logger.debug("Serializing {} using {}", serializer.headerName, serializer.serializer); } headers.put(serializer.headerName, serializer.serializer.serialize(matcher.group(groupIndex))); } } return event; } @Override public List intercept(List events) { List intercepted = Lists.newArrayListWithCapacity(events.size()); for (Event event : events) { Event interceptedEvent = intercept(event); if (interceptedEvent != null) { intercepted.add(interceptedEvent); } } return intercepted; } public static class Builder implements Interceptor.Builder { private Pattern regex; private List serializerList; private boolean extractorHeader; private String extractorHeaderKey; private final RegexExtractorInterceptorPassThroughSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer(); @Override public void configure(Context context) { String regexString = context.getString(REGEX); Preconditions.checkArgument(!StringUtils.isEmpty(regexString), "Must supply a valid regex string"); regex = Pattern.compile(regexString); regex.pattern(); regex.matcher("").groupCount(); configureSerializers(context); extractorHeader = context.getBoolean(EXTRACTOR_HEADER,DEFAULT_EXTRACTOR_HEADER); if (extractorHeader){ extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY); Preconditions.checkArgument(!StringUtils.isEmpty(extractorHeaderKey),"header key must"); } } private void configureSerializers(Context context) { String serializerListStr = context.getString(SERIALIZERS); Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr), "Must supply at least one name and serializer"); String[] serializerNames = serializerListStr.split("\\s+"); Context serializerContexts = new Context(context.getSubProperties(SERIALIZERS + ".")); serializerList = Lists.newArrayListWithCapacity(serializerNames.length); for(String serializerName : serializerNames) { Context serializerContext = new Context( serializerContexts.getSubProperties(serializerName + ".")); String type = serializerContext.getString("type", "DEFAULT"); String name = serializerContext.getString("name"); Preconditions.checkArgument(!StringUtils.isEmpty(name), "Supplied name cannot be empty."); if("DEFAULT".equals(type)) { serializerList.add(new NameAndSerializer(name, defaultSerializer)); } else { serializerList.add(new NameAndSerializer(name, getCustomSerializer( type, serializerContext))); } } } private RegexExtractorInterceptorSerializer getCustomSerializer( String clazzName, Context context) { try { RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class .forName(clazzName).newInstance(); serializer.configure(context); return serializer; } catch (Exception e) { logger.error("Could not instantiate event serializer.", e); Throwables.propagate(e); } return defaultSerializer; } @Override public Interceptor build() { Preconditions.checkArgument(regex != null, "Regex pattern was misconfigured"); Preconditions.checkArgument(serializerList.size() > 0, "Must supply a valid group match id list"); return new RegexExtractorHeaderInterceptor(regex, serializerList, extractorHeader, extractorHeaderKey); } } static class NameAndSerializer { private final String headerName; private final RegexExtractorInterceptorSerializer serializer; public NameAndSerializer(String headerName, RegexExtractorInterceptorSerializer serializer) { this.headerName = headerName; this.serializer = serializer; } } } 应用配置:
hdp2.sources.s1.interceptors = i2
hdp2.sources.s1.interceptors.i2.type = com.wy.flume.interceptor.RegexExtractorHeaderInterceptor$Builder
hdp2.sources.s1.interceptors.i2.regex = ([^_]+)_(\\d{8}).*
hdp2.sources.s1.interceptors.i2.extractorHeader = true
hdp2.sources.s1.interceptors.i2.extractorHeaderKey = basename
hdp2.sources.s1.interceptors.i2.serializers = s1 s2
hdp2.sources.s1.interceptors.i2.serializers.s1.name = log_type
hdp2.sources.s1.interceptors.i2.serializers.s2.name = log_day
到此,相信大家对"flume-ng怎么自定义拦截器"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
拦截器
内容
学习
实用
更深
代码
兴趣
实用性
实际
操作简单
方法
更多
朋友
网站
频道
应用
查询
配置
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
应用层软件开发是什么意思
c# 2005数据库开发
pomelo 数据库持久化
关于网络安全的有关论述
嘉定区正规软件开发厂家直销
郑州畅想高科服务器质检
天下文枢互联网科技
无线网络安全实训报告
阿里核心技术阿里云数据库芯片
金融消费者网络安全防范
余姚市网络安全
软件管理端在服务器上安装不了
无线网络技术的应用分析
nex5影像数据库文件未就绪
通用互联网科技有限公司
进一步打好网络安全主动仗
翼星求生断开服务器
服务器机柜接地
linux 服务器编程
王者荣耀仅显示可转服务器
布林特网络技术
网络技术投资预算比较
手机行业编程用什么软件开发
nex5影像数据库文件未就绪
开源数据库mysql
物联网 平台 软件开发
派出所学习网络安全法简报
滨江区 软件开发
服务器机柜接地
什么是网络安全的最大威胁