千家信息网

Storm分布式RPC怎么配置

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,这篇文章主要介绍"Storm分布式RPC怎么配置",在日常操作中,相信很多人在Storm分布式RPC怎么配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm分布
千家信息网最后更新 2025年12月03日Storm分布式RPC怎么配置

这篇文章主要介绍"Storm分布式RPC怎么配置",在日常操作中,相信很多人在Storm分布式RPC怎么配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm分布式RPC怎么配置"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容

drpc.servers:
- "192.168.1.118"

之后通过storm drpc启动分布式RPC服务。

之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:

public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + "!"));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}

}

public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);

Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
}
}

从main函数开始,简单解释一下:

首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。

之后我们加入一个自己的bolt,并行数量为3

之后用StormSubmitter把这个topology提交上去就行了。

代码完成之后,打一个jar包,用storm jar把topology提交到集群上。

客户端调用,非常简单

DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("exclamation", "china");
System.out.println(result);

到此为止,一个最简单的DRPC调用的工作已经完成了。

等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。

源码上有这么一行:

Trident subsumes the functionality provided by this class, so it's deprecated

大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。

trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。

那么上第二份代码:

public class TridentDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
}

public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();

topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")).
groupBy(new Fields("word")).
aggregate(new One(), new Fields("one")).
aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
return topology.build();
}

public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}

public static class One implements CombinerAggregator {
@Override
public Integer init(TridentTuple tuple) {
return 1;
}

@Override
public Integer combine(Integer val1, Integer val2) {
return 1;
}

@Override
public Integer zero() {
return 1;
}
}
}

这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。

topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). //用空格分词
groupBy(new Fields("word")). //分组
aggregate(new One(), new Fields("one")). //给每组的数量设定为1
aggregate(new Fields("one"), new Sum(), new Fields("word-count")); //sum计算总和

这样的方式看起来跟spark当中对RDD的操作是有些像的。

好了,还是打包,提交。

然后是客户端测试:

DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
System.out.println(result);

到此,关于"Storm分布式RPC怎么配置"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

分布式 配置 学习 代码 东西 函数 功能 客户 客户端 就是 意思 数量 方法 更多 问题 集群 帮助 不同 复杂 实用 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 江苏电脑软件开发大概多少钱 云服务器二级等保申请流程 网络安全法知识竞赛和答案 新华三国服务器市场分析 数据库分析可视化工具 服务器没有权限访问互联网 第五空间网络安全电视 控制台连接数据库 服务器显示屏 紫光服务器登录管理系统 武汉纺织大学计算机网络技术 信息网络技术基础课程 北京软件开发客服有什么要求 连接到反向服务器错误 上海创值网络技术 文献检索的网站和数据库有哪些 国家知识产权局数据库如何查专利 为什么腾讯服务器很少 南山坪中学网络安全教育 文件服务器共享端口 全国计算机三级数据库技术 黑龙江省ctf网络安全挑战赛 广州市易医通互联网科技 上海新能源网络技术销售厂 控制数据库优化的方法 新乡市子扬网络技术有限公司 centos远程服务器管理 网络安全排查整改情况汇报 网络安全open 服务器启动找不到启动盘
0