Storm分布式RPC怎么配置
这篇文章主要介绍“Storm分布式RPC怎么配置”,在日常操作中,相信很多人在Storm分布式RPC怎么配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm分布式RPC怎么配置”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
首先需要在storm集群上把DRPC的环境准备好,在storm.yaml当中增加如下内容
drpc.servers:
- "192.168.1.118"
之后通过storm drpc启动分布式RPC服务。
之后,跟其他的topology并没有什么不同,我们需要写点代码,我这边直接从storm的例子当中找了个:
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input "!"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "result"));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar("DRCP-TEST", conf, builder.createRemoteTopology());
}
}
从main函数开始,简单解释一下:
首先new一个LinearDRPCTopologyBuilder对象,其中的参数【exclamation】就是我们在执行rpc调用时候的方法名。
之后我们加入一个自己的bolt,并行数量为3
之后用StormSubmitter把这个topology提交上去就行了。
代码完成之后,打一个jar包,用storm jar把topology提交到集群上。
客户端调用,非常简单
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("exclamation", "china");
System.out.println(result);
到此为止,一个最简单的DRPC调用的工作已经完成了。
等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是0.9.3)。
源码上有这么一行:
Trident subsumes the functionality provided by this class, so it's deprecated
大概意思就是trident这个东西已经包含了LinearDRPCTopologyBuilder 当中的功能。
trident是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。
那么上第二份代码:
public class TridentDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar("word-count", conf, buildTopology());
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")).
groupBy(new Fields("word")).
aggregate(new One(), new Fields("one")).
aggregate(new Fields("one"), new Sum(), new Fields("word-count"));
return topology.build();
}
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static class One implements CombinerAggregator<Integer> {
@Override
public Integer init(TridentTuple tuple) {
return 1;
}
@Override
public Integer combine(Integer val1, Integer val2) {
return 1;
}
@Override
public Integer zero() {
return 1;
}
}
}
这个topology的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main函数当中非常简单,提交一个topology。而这个topology的构建过程是在buildTopology当中完成的。
topology.newDRPCStream("word-count").
each(new Fields("args"), new Split(), new Fields("word")). //用空格分词
groupBy(new Fields("word")). //分组
aggregate(new One(), new Fields("one")). //给每组的数量设定为1
aggregate(new Fields("one"), new Sum(), new Fields("word-count")); //sum计算总和
这样的方式看起来跟spark当中对RDD的操作是有些像的。
好了,还是打包,提交。
然后是客户端测试:
DRPCClient client = new DRPCClient("192.168.1.118", 3772);
String result = client.execute("word-count", "mywife asdf asdf asdfasdfasfweqw saaa weweew");
System.out.println(result);
到此,关于“Storm分布式RPC怎么配置”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注美国cn2网站,小编会继续努力为大家带来更多实用的文章!