Flume怎么自定义Event Serializer序列化类
发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍"Flume怎么自定义Event Serializer序列化类",在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资
千家信息网最后更新 2025年12月01日Flume怎么自定义Event Serializer序列化类
这篇文章主要介绍"Flume怎么自定义Event Serializer序列化类",在日常操作中,相信很多人在Flume怎么自定义Event Serializer序列化类问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flume怎么自定义Event Serializer序列化类"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
把日志从flume打到hbase中,但是我们的日志由于前期是存到MongoDb中的,所以都是Json格式的日志,这时候使用flume自带的SimpleHbaseEventSerializer和RegexHbaseEventSerializer这样的就不行了,于是开始痛苦的看源码,自己写序列化的类(这里需要注意,如果是在flume的hbasesink包下编写的代码,License信息一定要加上。就是最上面那段英文,要不然在运行的时候会报错),比较简单,编写好类之后,编译打包,传到flume的lib目录下,然后在配置agent的时候指定Serializer的类为编写的类即可。下面是代码(类注释没贴出来,见谅哈):
public class PRTMSAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table;//hbase表 private byte[] cf;//列簇 private byte[][] payload;//列集合 private byte[][] payloadColumn;//列值 private byte[] incrementColumn; private String rowSuffix;//roykey后缀 private String rowPrefix;//rowkey前缀 private byte[] incrementRow; private KeyType keyType;//rowkey后缀类型 private static final Logger logger = LoggerFactory.getLogger(PRTMSAsyncHbaseEventSerializer.class); @Override public void configure(Context context) { // TODO Auto-generated method stub //设置主键后缀类型,这里使用时间戳 keyType = KeyType.TS; if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub } @Override public void initialize(byte[] table, byte[] cf) { // TODO Auto-generated method stub this.table = table; this.cf = cf; } /** * * @Title: setEvent * @Description: 获取日志信息,并解析出HBase的列以及列的value值 * @param event * @throws * @see org.apache.flume.sink.hbase.AsyncHbaseEventSerializer#setEvent(org.apache.flume.Event) */ @Override public void setEvent(Event event) { // TODO Auto-generated method stub //获取日志信息 String log = new String(event.getBody(), StandardCharsets.UTF_8); //headers包含日志中项目编号和host信息 Map headers = event.getHeaders(); JsonReader jsonReader = new JsonReader(new StringReader(log)); String name = ""; String value = ""; String path = ""; Map kv = new HashMap(); try { //解析日志中的键值对缓存到map中 jsonReader.beginObject(); while (jsonReader.hasNext()) { name = jsonReader.nextName(); value = jsonReader.nextString(); if(name.equals("uri")) path = value.split(" ")[1]; kv.put(name, value); } jsonReader.endObject(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //解析headers中的项目id和服务host、路径 if(path.contains("?")){ path = path.substring(0, path.indexOf("?")); } String pcode = headers.get("pcode"); String host = headers.get("host"); //将项目编号和服务器host添加到map中 kv.put("pcode",pcode); kv.put("host", host); //初始化列和value数组 this.payloadColumn = new byte[kv.keySet().size()][]; this.payload = new byte[kv.keySet().size()][]; int i = 0; //给hbase的列和value赋值 for (String key : kv.keySet()) { this.payloadColumn[i] = key.getBytes(); this.payload[i] = kv.get(key).getBytes(); i++; } //设置rowkey的前缀 格式是项目编号+路径 this.rowSuffix = new StringBuilder(pcode).reverse().toString() + ":"+path+":"+kv.get("time"); } @Override public List getActions() { // TODO Auto-generated method stub List actions = new ArrayList(); if (payloadColumn != null) { byte[] rowKey; try { rowKey = rowSuffix.getBytes(); // for 循环,提交所有列和对于数据的put请求。 for (int i = 0; i < this.payload.length; i++) { PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn[i], payload[i]); actions.add(putRequest); } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } @Override public List getIncrements() { // TODO Auto-generated method stub List actions = new ArrayList(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub }} 到此,关于"Flume怎么自定义Event Serializer序列化类"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
日志
序列
信息
项目
学习
后缀
代码
前缀
时候
更多
格式
类型
路径
帮助
服务
不行
实用
痛苦
接下来
下编
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
惠州支付软件开发报价
连不上产品服务器怎么办
网络安全相关的课题项目实现
路由器中叠加网络安全模块
显示数据库表内容是
c 连接本地sql数据库
软件开发要做什么准备工作
服务器上的资料丢失如何找回
c 修改数据库
数据库查询员工信息和工资
南阳网络安全进校园
网络服务器地址
服务器管理口和网口位置
浙江磐石网络技术公司
服务器80和443端口是什么
茂名通信软件开发价钱
cmml5级认证网络安全公司
软件开发公司如何找到精准客户
管家婆自动登录服务器
家用网络安全性是什么
百惠科技 精彩互联网电视
涉密甲级软件开发
红茶传奇3数据库编辑工具
北大青鸟软件开发培训班
谈谈对数据库技术应用的理解
交易原油软件开发
郑州选哪个dns服务器
家用宽带搭建服务器
静安区上门软件开发电话多少
四年级网络安全作文100