千家信息网

storm实时排序TopN怎么使用

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,本篇内容介绍了"storm实时排序TopN怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!阅读
千家信息网最后更新 2025年12月03日storm实时排序TopN怎么使用

本篇内容介绍了"storm实时排序TopN怎么使用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

阅读背景:1 您需要了解TOP 使用的场景

2 您需要了解当前的TOPN 处理,和定时区间处理的区别

看代码说话

package com.cc.storm;import com.cc.storm.bolt.MergeBolt;import com.cc.storm.bolt.RankBolt;import com.cc.storm.bolt.RollingAllCountBolt;import com.cc.storm.bolt.RollingCountBolt;import com.cc.storm.spout.RandomEmitSpout;import com.cc.storm.spout.RedisPubSubSpout;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;/** * ToPN是一种常见模式,是对流式数据进行"Streaming topN"的计算: * 比如要计算的是最近一段时间内的热门话题,热门点击图片,热门商品浏览,热门商品购买 *  * 既然敢要实时的处理,【】【】【】【】【】[] 【】 【】【】【】【】 [] 【】【】【】【】 【】 [] *  * @author Yin Shuai */public class TOP10 {        public static void main(String[] args) throws AlreadyAliveException,                        InvalidTopologyException, InterruptedException {                final int TOP_N = 10;                final int time = 1;                TopologyBuilder builder = new TopologyBuilder();                builder.setSpout("$datasource$", new RandomEmitSpout(), 1);                builder.setBolt("$count$", new RollingCountBolt(3, time), 1)                                .fieldsGrouping("$datasource$", new Fields("merchandiseIDS"));                builder.setBolt("$rank$", new RankBolt(TOP_N), 2).fieldsGrouping(                                "$count$", new Fields("merchandiseID"));                builder.setBolt("$merge$", new MergeBolt(TOP_N)).globalGrouping(                                "$rank$");                Config conf = new Config();                conf.setDebug(false);                conf.setNumWorkers(2);                conf.setMaxSpoutPending(5000);                LocalCluster cluster = new LocalCluster();                cluster.submitTopology("Getting-Started-Toplogie", conf,                                builder.createTopology());                Thread.sleep(5000);        }}

整个处理的流程如图:

"storm实时排序TopN怎么使用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0