Storm如何接收数据
发表于:2025-12-02 作者:千家信息网编辑
千家信息网最后更新 2025年12月02日,这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!简要的模拟如何接收数据:packa
千家信息网最后更新 2025年12月02日Storm如何接收数据
这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!
简要的模拟如何接收数据:
package com.cc.storm.spout;import java.io.IOException;import java.util.Map;import java.util.Random;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RandomEmitSpout extends BaseRichSpout { private Random _random; private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RandomEmitSpout.class); private SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _random = new Random(); } @Override public void nextTuple() { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } String[] userIds = { "1", "2", "3", "4" }; String[] merchandiseIDS = { "1" }; _collector.emit(new Values(userIds[_random.nextInt(userIds.length)], merchandiseIDS[_random.nextInt(merchandiseIDS.length)])); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { }}plus: 如果您采用的是Redis
那么:
package com.cc.storm.spout;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RedisPubSubSpout extends BaseRichSpout { /** * @Fields serialVersionUID : TODO */ private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RedisPubSubSpout.class); private SpoutOutputCollector _collector; private final String host; private final int port; private final String pattern; LinkedBlockingQueue queue; JedisPool pool; public RedisPubSubSpout(String host, int port, String pattern) { // TODO Auto-generated constructor stub this.host = host; this.port = port; this.pattern = pattern; } // 监听线程,从redis订阅的兴趣事件中获取数据 class ListenerThread extends Thread { private LinkedBlockingQueue queue; JedisPool pool; String pattern; public ListenerThread(LinkedBlockingQueue queue, JedisPool pool, String pattern) { // TODO Auto-generated constructor stub this.queue = queue; this.pool = pool; this.pattern = pattern; } @Override public void run() { JedisPubSub listener = new JedisPubSub() { @Override public void onUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } @Override public void onMessage(String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } }; Jedis jedis = pool.getResource(); try { jedis.psubscribe(listener, pattern); } finally { pool.returnResource(jedis); } } } @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub _collector = collector; // 队列最大支持1000个 queue = new LinkedBlockingQueue(1000); JedisPoolConfig config = new JedisPoolConfig(); // error pool = null; ListenerThread listener = new ListenerThread(queue, pool, pattern); // 启动线程 listener.start(); } @Override public void nextTuple() { // TODO Auto-generated method stub String ret = queue.poll(); if (null == ret) { // 如果队列中暂无数据可取,休息500ms Utils.sleep(500); } else { // 数据格式为 "userID:merchandiseID",可以依据需求更改此处 String[] s = ret.split(":"); _collector.emit(new Values(s[0], s[1])); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { // TODO Auto-generated method stub pool.destroy(); }} 感谢各位的阅读,以上就是"Storm如何接收数据"的内容了,经过本文的学习后,相信大家对Storm如何接收数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
学习
内容
线程
队列
最大
事件
兴趣
就是
思路
情况
文章
更多
格式
知识
知识点
简要
篇文章
跟着
问题
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
王者荣耀能互转服务器吗
笔记本PHP源码无数据库
网络安全工程师证考试
网络安全周宣传折页
7万服务器
华三服务器raid配置手册
软件开发要用到独立显卡吗
什么是网络技术怎么样
岳阳软件开发培训班
青少年网络安全必要性
全国信贷联盟数据库是什么
重庆vivo软件开发薪资
人身安全等于网络安全宣传周
hpDL360G7服务器不显示
保山互联网科技
中信银行软件开发中心怎样
spss筛选完整数据库
基因组学数据库
计算机网络技术女的学的多吗
莱西app定制软件开发
淘客系统软件开发公司
推流地址是rtmp服务器吗
广西税控盘服务器连接异常
计算机网络安全技术现状
我的世界红石服务器怎么自创
极品飞车服务器维护
数据库服务器管理系统
创业导航软件开发
手机连接数据库文件
网络安全产业园 海淀