Storm-Hbase接口怎么用
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!package storm.contrib.hbase.bolts;import
千家信息网最后更新 2025年12月02日Storm-Hbase接口怎么用
这篇文章主要介绍Storm-Hbase接口怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/* 一个读取Hbase的Bolt,不断的从Hbase中读取表中的行KEY,和列,通过tuples来发送 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt { private static final long serialVersionUID = 1L; private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null; private static transient HBaseConnector connector = null; private static transient HBaseConfiguration conf = null; private static transient HBaseCommunicator communicator = null; OutputCollector _collector; /* * Constructor initializes the variables storing the hbase table information and connects to hbase */ public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) { this.tableName = tableName; this.colFamilyName = colFamilyName; this.colName = colName; this.rowKeyField = rowKeyField; connector = new HBaseConnector(); conf = connector.getHBaseConf(hbaseXmlLocation); communicator = new HBaseCommunicator(conf); } /* * emits the value of the column with name @colName and rowkey @rowKey * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { String rowKey = input.getStringByField(this.rowKeyField); columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName); collector.emit(tuple(rowKey, columnValue)); } public void prepare(Map confMap, TopologyContext context, OutputCollector collector) { _collector = collector; } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rowKey", "columnValue")); } public Map getComponentConfiguration() { Map map = null; return map; } public void prepare(Map stormConf, TopologyContext context) { }} package storm.contrib.hbase.bolts;import static backtype.storm.utils.Utils.tuple;import java.util.Map;import org.apache.hadoop.hbase.HBaseConfiguration;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import storm.contrib.hbase.utils.HBaseCommunicator;import storm.contrib.hbase.utils.HBaseConnector;/* * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */public class HBaseColumnValueLookUpBolt implements IBasicBolt { private static final long serialVersionUID = 1L; private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null; private static transient HBaseConnector connector = null; private static transient HBaseConfiguration conf = null; private static transient HBaseCommunicator communicator = null; OutputCollector _collector; /* * Constructor initializes the variables storing the hbase table information and connects to hbase */ public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) { this.tableName = tableName; this.colFamilyName = colFamilyName; this.colName = colName; this.rowKeyField = rowKeyField; connector = new HBaseConnector(); conf = connector.getHBaseConf(hbaseXmlLocation); communicator = new HBaseCommunicator(conf); } /* * emits the value of the column with name @colName and rowkey @rowKey * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { String rowKey = input.getStringByField(this.rowKeyField); //通过指定我们的 表名,行键,列族,列名,直接通过communitor拿到列的值。 columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName); collector.emit(tuple(rowKey, columnValue)); } public void prepare(Map confMap, TopologyContext context, OutputCollector collector) { _collector = collector; } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rowKey", "columnValue")); } public Map getComponentConfiguration() { Map map = null; return map; } public void prepare(Map stormConf, TopologyContext context) { }} Rowkey
package storm.contrib.hbase.spouts;import backtype.storm.topology.OutputFieldsDeclarer;import java.util.Map;import java.util.UUID;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.Random;import org.apache.log4j.Logger;/*这个Spout主要是用来发射 Hbase的RowKey,rowkey的集合为自己设置的。 * Spout emitting tuples containing the rowkey of the hbase table */public class RowKeyEmitterSpout implements IRichSpout { private static final long serialVersionUID = 6814162766489261607L; public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public RowKeyEmitterSpout() { this(true); } public RowKeyEmitterSpout(boolean isDistributed) { _isDistributed = isDistributed; } public boolean isDistributed() { return _isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); Thread.yield(); final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word), UUID.randomUUID()); } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public void activate() { } public void deactivate() { } public Map getComponentConfiguration() { return null; }} // 我们用来简单的测试系统的代码,测试接口是否正确
package storm.contrib.hbase.spouts;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class TestSpout implements IRichSpout { SpoutOutputCollector _collector; Random _rand; int count = 0; public boolean isDistributed() { return true; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { Utils.sleep(1000); String[] words = new String[] { "hello", "tiwari", "indore", "jayati"}; Integer[] numbers = new Integer[] { 1,2,3,4,5 }; if(count == numbers.length -1) { count = 0; } count ++; int number = numbers[count]; String word = words[count]; int randomNum = (int) (Math.random()*1000); _collector.emit(new Values(word, number)); } public void close() { } public void ack(Object id) { } public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "number")); } public void activate() { } public void deactivate() { } public Map getComponentConfiguration() { return null; }} 比较简单,也就不做解释了,Storm-hbase的接口并没有像Storm-kafka的接口那样,自身去处理轮询,自身去处理连接的问题。只是简单的构造了一个Hbase的连接,在连接的过程之中,直接构造了一个Connector就可以了。
以上是"Storm-Hbase接口怎么用"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!
接口
内容
篇文章
处理
测试
不断
之中
代码
价值
兴趣
只是
小伙
小伙伴
更多
测试系统
知识
系统
行业
资讯
资讯频道
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
德国 红警2 服务器
内存常驻不能关闭数据库
软件开发公司专业的有哪些
网络技术对校园的影响
每日科技互联网报
dnf赛季服务器
档案软件开发合同
ftp是数据库吗
数据从前台存储到数据库的过程
wow六零数据库
网络安全大学生案例
2种非关系型数据库
科讯软件开发
计算机网络技术江西大学
冒险岛数据库技术支持
上海助力智慧工厂软件开发
洛阳ppp项目数据库
学网络安全必须学英语吗
键值数据库中的键
阿尔法发掘数据库怎样购买
连接电脑数据库
数据库信息导出
数据库日志库的意义
vr软件开发公司
文件数据库链接地址
网络安全牛人
开展网络安全自查的情况报告
云栋网络技术有限公司
字节跳动手机软件开发工资
软件开发培训学校黄