flume中hdfssink如何自定义EventSerializer序列化类
发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。因为之前做了h
千家信息网最后更新 2025年12月03日flume中hdfssink如何自定义EventSerializer序列化类
这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
因为之前做了hbasesink的序列化类,觉得写hdfs的应该会很简单,可是没想到竟然不一样。hdfs并没有直接配置序列化类的选项需要根据fileType来选择对相应序列化类,我们使用的datastream的类型,对应的类是HDFSDataStream,这个类默认的序列化类TEXT(这是个枚举类型)
serializerType = context.getString("serializer", "TEXT");枚举的类如下:
public enum EventSerializerType { TEXT(BodyTextEventSerializer.Builder.class), HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class), AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class), CUSTOM(CUSTOMEventSerializer.Builder.class),//自定义的序列化类 OTHER(null); private final Class extends EventSerializer.Builder> builderClass; EventSerializerType(Class extends EventSerializer.Builder> builderClass) { this.builderClass = builderClass; } public Class extends EventSerializer.Builder> getBuilderClass() { return builderClass; }}在里面加了自定义的类型和枚举,在配置agent的时候配置好filetype和serializer即可,同样需要编译上传。
自定义的序列化类如下:
public class CUSTOMEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class); private final String SPLITCHAR = "\001";//列分隔符 // for legacy reasons, by default, append a newline to each event written // out private final String APPEND_NEWLINE = "appendNewline"; private final boolean APPEND_NEWLINE_DFLT = true; private final OutputStream out; private final boolean appendNewline; private CUSTOMEventSerializer(OutputStream out, Context ctx) { this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override public boolean supportsReopen() { return true; } @Override public void afterCreate() { // noop } @Override public void afterReopen() { // noop } @Override public void beforeClose() { // noop } @Override public void write(Event e) throws IOException { // 获取日志信息 String log = new String(e.getBody(), StandardCharsets.UTF_8); logger.info("-----------logs-------" + log); // headers包含日志中项目编号和host信息 Map headers = e.getHeaders(); String parsedLog = parseJson2Value(log, headers); out.write(parsedLog.getBytes()); logger.info("-----------values-------" + parsedLog); logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes()); out.write('\n'); } /** * * @Title: parseJson2Value * @Description: 解析出json日志中的value。 * @param log json格式日志 * @param headers event头信息 * @return * @return String 解析后的日志 * @throws */ private String parseJson2Value(String log, Map headers) { log.replace("\\", "/"); String time = ""; String path = ""; Object value = ""; StringBuilder values = new StringBuilder(); ObjectMapper objectMapper = new ObjectMapper(); try { Map m = objectMapper.readValue(log, Map.class); for(String key:m.keySet()){ value = m.get(key); if (key.equals("uri")){ //解析访问路径 path = pasreUriToPath(value.toString()); } if(key.equals("time")){ time = value.toString().substring(10); } values.append(value).append(this.SPLITCHAR); } } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 解析headers中的项目编号和服务host String pcode = headers.get("pcode"); String host = headers.get("host"); values.append(path).append(this.SPLITCHAR). append(pcode).append(this.SPLITCHAR). append(host).append(this.SPLITCHAR). append(time).append(this.SPLITCHAR); //value字符串 return values.toString(); } @Override public void flush() throws IOException { // noop } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context); return s; } } /** * 把请求uri转换成具体的访问路径 * * @param uri 请求uri * @return 访问路径 */ protected String pasreUriToPath(String uri){ if(uri == null || "".equals(uri.trim())){ return uri; } int index = uri.indexOf("/"); if(index > -1){ uri = uri.substring(index); } index = uri.indexOf("?"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(";"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(" HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf("HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } return uri; }} 关于"flume中hdfssink如何自定义EventSerializer序列化类"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
序列
日志
信息
篇文章
类型
路径
配置
更多
项目
不错
实用
没想到
内容
分隔符
字符
字符串
文章
时候
格式
知识
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
警察大学网络安全与执法学什么
软件开发类公司实习内容
国产服务器内存条厂家
河南配件管理软件开发公司
北京标准软件开发发展现状
找不到电子邮箱服务器
金苗系统如何更改数据库ip地址
上海数据网络技术来电咨询
个人注册数据库多少钱
导入数据库dmp字符集
网站数据库图片保存方案
海珠app软件开发定制
签软件开发合同注意事项
网络安全培训学习反思
湖北时代网络技术创新服务
荔湾网络安全选哪家
传奇数据库转
我的世界1.16版本红石服务器推荐
全词匹配数据库
浙江省网络安全测评
核动力研究软件开发
数据库四大范式整理
广东互联网开发科技有限排名
网络技术与应用考查试卷
普洱公司app软件开发多少钱
铁塔网络技术
不能连接到系统数据库服务器
同一张表查不同列匹配数据库
施云波无线传感网络技术概论
婕斯租根服务器了吧