内容
场景 – 移动电话通话记录日志分析器
移动通话和每次通话的持续时间将被作为输入提供给Apache Storm,Storm将处理并分组(group)同一呼叫者和接收者之间的通话记录,并得出这些通话的总数。
创建Spout
Spout是用于数据生成的组件。 基本上,一个Spout将实现一个IRichSpout接口。 “IRichSpout”接口有以下重要方法:
-
open – 为Spout提供执行环境。 Executor将运行此方法来初始化Spout。
-
nextTuple – 通过收集器(collector)发射(emit)生成的数据。
-
close – 关闭Spout。
-
declareOutputFields – 声明Tuple的输出格式。
-
ack – 确认一个特定的Tuple已经被处理完了。
-
fail – 表明一个特定的Tuple没有被处理并且不被重新处理。
open
open 方法的签名如下:
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf – 为Spout提高Storm配置
-
context – 提供关于Topology中Spout位置,任务ID,输入和输出信息的完整信息。
-
collector – 发射被Bolt处理的Tuple
nextTuple
nextTuple方法的签名如下:
nextTuple()
nextTuple()方法在循环中被周期性的调用,该循环也会调用ack()和fail()方法。当没有工作要做时,nextTuple()必须释放对线程的控制,以便其他方法有机会被调用。所以nextTuple的第一行会检查处理是否完成,如果完成了,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。
close
close方法的签名如下:
close()
declareOutputFields
declareOutputFields方法的签名如下:
declareOutputFields(OutputFieldsDeclarer declarer)
-
declare – 用来声明输出流的ID,输出字段等等。
这个方法用来指定Tuple的输出结构。
ack
ack方法的签名如下:
ack(Object msgId)
该方法确认已经处理了特定的Tuple。
fail
fail方法的签名如下:
ack(Object msgId)
此方法通知Storm某个特定的Tuple还没有被完全处理。 Storm将重新处理这个特定的Tuple。
FakeCallLogReaderSpout
在这个示例中,我们需要收集通话记录详细信息。 通话记录的信息包含:
-
主叫号码
-
被叫号码
-
通话时长
由于我们没有实时的通话记录,我们将生成一个虚假(fake)的通话记录。 假的通话记录将使用Random类生成。 完整的程序代码如下:
代码: FakeCallLogReaderSpout.java
import java.util.*;
//import storm tuple packages
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//import Spout interface packages
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
//Create a class FakeLogReaderSpout which implement IRichSpout interface
to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
//Create instance for SpoutOutputCollector which passes tuples to bolt.
private SpoutOutputCollector collector;
private boolean completed = false;
//Create instance for TopologyContext which contains topology data.
private TopologyContext context;
//Create instance for Random class.
private Random randomGenerator = new Random();
private Integer idx = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
if(this.idx <= 1000) {
List<String> mobileNumbers = new ArrayList<String>();
mobileNumbers.add("1234123401");
mobileNumbers.add("1234123402");
mobileNumbers.add("1234123403");
mobileNumbers.add("1234123404");
Integer localIdx = 0;
while(localIdx++ < 100 && this.idx++ < 1000) {
String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
while(fromMobileNumber == toMobileNumber) {
toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
}
Integer duration = randomGenerator.nextInt(60);
this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("from", "to", "duration"));
}
//Override all the interface methods
@Override
public void close() {}
public boolean isDistributed() {
return false;
}
@Override
public void activate() {}
@Override
public void deactivate() {}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
创建Bolt
Bolt是一个以Tuple作为输入,处理Tuple并生成新的Tuple作为输出的组件,Bolts需要实现(implement)IRichBolt接口。 在这个例子中,我们使用两个Bolt类CallLogCreatorBolt和CallLogCounterBolt来执行操作。
IRichBolt接口有以下方法:
-
prepare – 为Bolt提供执行环境。 Executor将运行此方法来初始化Spout。
-
execute – 处理一个Tuple
-
cleanup – 当Bolt被关闭时调用
-
declareOutputFields – 声明输出的Tuple的结构
prepare
prepare方法的签名如下:
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf – 为Bolt提供Storm的配置
-
context – 提供Topology中Bolt的位置,任务ID,输入、输出信息等
-
collector – 发射被处理过的 Tuple
execute
execute方法的签名如下:
execute(Tuple tuple)
这里的Tuple是要处理的Tuple(被Spout或别的Bolt发射过来的)
execute方法一次处理一个Tuple,Tuple的数据可以通过Tuple类的getValue方法访问。 可以延迟处理输入的Tuple。多个输入Tuple可以被一起处理,并生成一个新的Tuple输出。 处理过的Tuple可以通过使用OutputCollector类发射出去(给其他Bolt作为输入)。
cleanup
cleanup方法的签名如下:
cleanup()
declareOutputFields
declareOutputFields方法的签名如下:
declareOutputFields(OutputFieldsDeclarer declarer)
这里参数declare用于声明输出流ID,输出字段等。
这个方法用来指定Tuple的输出结构。
通话记录创建器Bolt
通话记录创建器Bolt接收通话记录Tuple,通话记录Tuple存有主叫号码,被叫号码和通话时长。通过组合主叫方号码和被叫方号码,这个Bolt创建了一个新值,新值的格式为“主叫号码 – 被叫号码”,并将其命名为新字段“call”。 完整的代码如下:
代码: CallLogCreatorBolt.java
//import util packages
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
//import Storm IRichBolt package
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
//Create instance for OutputCollector which collects and emits tuples to produce output
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String from = tuple.getString(0);
String to = tuple.getString(1);
Integer duration = tuple.getInteger(2);
collector.emit(new Values(from + " – " + to, duration));
}
@Override
public void cleanup() {}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call", "duration"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
通话记录计数器Bolt
通话记录计数器Bolt接收通话及其通话时长作为Tuple,它在prepare方法中初始化一个字典(Map)对象。 在execute方法中,它检查Tuple并为每个Tuple中新的“call”值在字典对象(Map)中创建一个新的Key,并把对应的值(Value)设置为1。 对于字典中已有的Key,它只是递增其值(Value)。 简单地说,这个Bolt将call和它的计数保存在字典对象(Map)中。 当然,我们也可以将call和计数保存到数据源中,而不是字典中。 完整的程序代码如下所示:
代码: CallLogCounterBolt.java
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
public class CallLogCounterBolt implements IRichBolt {
Map<String, Integer> counterMap;
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.counterMap = new HashMap<String, Integer>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String call = tuple.getString(0);
Integer duration = tuple.getInteger(1);
if(!counterMap.containsKey(call)){
counterMap.put(call, 1);
}else{
Integer c = counterMap.get(call) + 1;
counterMap.put(call, c);
}
collector.ack(tuple);
}
@Override
public void cleanup() {
for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
System.out.println(entry.getKey()+" : " + entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("call"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
创建Topology
Storm Topology基本上是一个Thrift结构。 TopologyBuilder类提供了简单的方法来创建复杂的Topology,TopologyBuilder类具有设置Spout(setSpout)和设置Bolt(setBolt)的方法,最后,TopologyBuilder用createTopology方法来创建Topology。 使用下面的代码片段来创建一个Topology:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法能帮助Spout和Bolt进行流分组(Stream Grouping)。
本地Cluster
出于开发目的,我们可以使用“LocalCluster”类创建一个本地Cluster,然后使用“LocalCluster”类的“submitTopology”方法提交Topology来运行。“submitTopology”的一个参数是“Config”类的一个实例,在提交Topology之前,“Config”类用于设置配置选项。 该配置选项将在运行时与Cluster的配置合并,然后通过prepare方法发送给所有任务(Spout和Bolt)。将Topology提交到Cluster后,我们将等待10秒钟让Cluster计算这个提交的Topology,然后使用“LocalCluster”的“shutdown”方法关闭Cluster。 完整的程序代码如下所示:
代码: LogAnalyserStorm.java
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
//import storm configuration packages
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
public static void main(String[] args) throws Exception{
//Create Config instance for cluster configuration
Config config = new Config();
config.setDebug(true);
//
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
.shuffleGrouping("call-log-reader-spout");
builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
Thread.sleep(10000);
//Stop the topology
cluster.shutdown();
}
}
编译并运行应用程序
以上所示完整的应用程序包含4个类:
-
FakeCallLogReaderSpout.java
-
CallLogCreaterBolt.java
-
CallLogCounterBolt.java
-
LogAnalyerStorm.java
可以用一下命令编译这4个类:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
然后用以下命令运行这个应用程序:
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
输出
一旦应用程序启动,它将输出有关Cluster启动过程,Spout和Bolt处理数据,以及最后Cluster关闭过程的完整详细信息。在“CallLogCounterBolt”类中,我们打印了通话记录及其计数的详细信息,这些信息将如下显示在控制台上
1234123402 – 1234123401 : 78
1234123402 – 1234123404 : 88
1234123402 – 1234123403 : 105
1234123401 – 1234123404 : 74
1234123401 – 1234123403 : 81
1234123401 – 1234123402 : 81
1234123403 – 1234123404 : 86
1234123404 – 1234123401 : 63
1234123404 – 1234123402 : 82
1234123403 – 1234123402 : 83
1234123404 – 1234123403 : 86
1234123403 – 1234123401 : 93
非Java编程语言
Storm Topology通过Thrift接口实现,所以很容易任何语言来提交Topology,Storm支持Ruby,Python和许多其他语言。我们来看看python的例子。
Python绑定
Python是一种通用的解释型,交互式,面向对象的编程语言。Storm支持Python来实现Topology。Python支持发射(emitting),锚定(anchoring),确认(acking)和记录(logging)操作。
Bolt可以用任何语言来定义,用另一种语言编写的Bolt作为子进程执行,Storm通过标准输入(stdin)/标准输出(stdout)用JSON消息与这些子进程通信。 首先用一个支持Python绑定的Bolt WordSplit为例。
public static class WordSplit implements IRichBolt {
public WordSplit() {
super("python", "splitword.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
这里的WordSplit类实现了IRichBolt接口,但是它的业务逻辑是在一个Python文件“splitword.py”完成的,这个“splitword.py”作为参数传给WordSplit的super方法来被Storm调用。现在创建一个Python文件“splitword.py”。
import storm
class WordSplitBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
WordSplitBolt().run()
这是用Python实现的分割一个给定句子中的单词的例子,也可以用其他支持的编程语言来实现。