场景 移动电话通话记录日志分析器

移动通话和每次通话的持续时间将被作为输入提供给Apache StormStorm将处理并分组(group)同一呼叫者和接收者之间的通话记录,并得出这些通话的总数。 

创建Spout 

Spout是用于数据生成的组件。 基本上,一个Spout将实现一个IRichSpout接口。 IRichSpout”接口有以下重要方法:

  •   openSpout提供执行环境。 Executor将运行此方法来初始化Spout

  • nextTuple通过收集器(collector)发射(emit)生成的数据。

  •  close 关闭Spout

  •  declareOutputFields 声明Tuple的输出格式

  •  ack确认一个特定的Tuple已经被处理完了。

  • fail表明一个特定的Tuple没有被处理并且不被重新处理。

open

open 方法的签名如下: 

open(Map conf, TopologyContext context, SpoutOutputCollector collector)

  • confSpout提高Storm配置

  •  context提供关于Topology中Spout位置,任务ID,输入和输出信息的完整信息。

  •  collector发射被Bolt处理的Tuple

nextTuple

nextTuple方法的签名如下:

nextTuple()

nextTuple()方法在循环中被周期性的调用,该循环也会调用ack()fail()方法。当没有工作要做时,nextTuple()必须释放对线程的控制,以便其他方法有机会被调用。所以nextTuple的第一行会检查处理是否完成,如果完成了,它应该睡眠至少一毫秒,以在返回之前减少处理器上的负载。

close

close方法的签名如下:

close()

declareOutputFields

declareOutputFields方法的签名如下: 

declareOutputFields(OutputFieldsDeclarer declarer)

  •  declare – 用来声明输出流的ID,输出字段等等

这个方法用来指定Tuple的输出结构。


ack

ack方法的签名如下:

ack(Object msgId)

该方法确认已经处理了特定的Tuple 

fail

fail方法的签名如下:

ack(Object msgId)

此方法通知Storm某个特定的Tuple还没有被完全处理。 Storm将重新处理这个特定的Tuple

FakeCallLogReaderSpout

 在这个示例中,我们需要收集通话记录详细信息。 通话记录的信息包含:

  • 主叫号码

  • 被叫号码

  • 通话时长

由于我们没有实时的通话记录,我们将生成一个虚假(fake)的通话记录。 假的通话记录将使用Random类生成。 完整的程序代码如下:

代码: FakeCallLogReaderSpout.java

import java.util.*;

//import storm tuple packages

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

//import Spout interface packages

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

 

//Create a class FakeLogReaderSpout which implement IRichSpout interface

   to access functionalities

      

public class FakeCallLogReaderSpout implements IRichSpout {

   //Create instance for SpoutOutputCollector which passes tuples to bolt.

   private SpoutOutputCollector collector;

   private boolean completed = false;

      

   //Create instance for TopologyContext which contains topology data.

   private TopologyContext context;

      

   //Create instance for Random class.

   private Random randomGenerator = new Random();

   private Integer idx = 0;

 

   @Override

   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

      this.context = context;

      this.collector = collector;

   }

 

   @Override

   public void nextTuple() {

      if(this.idx <= 1000) {

         List<String> mobileNumbers = new ArrayList<String>();

         mobileNumbers.add("1234123401");

         mobileNumbers.add("1234123402");

         mobileNumbers.add("1234123403");

         mobileNumbers.add("1234123404");

 

         Integer localIdx = 0;

         while(localIdx++ < 100 && this.idx++ < 1000) {

            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

                         

            while(fromMobileNumber == toMobileNumber) {

               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));

            }

                         

            Integer duration = randomGenerator.nextInt(60);

            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));

         }

      }

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("from", "to", "duration"));

   }

 

   //Override all the interface methods

   @Override

   public void close() {}

 

   public boolean isDistributed() {

      return false;

   }

 

   @Override

   public void activate() {}

 

   @Override

   public void deactivate() {}

 

   @Override

   public void ack(Object msgId) {}

 

   @Override

   public void fail(Object msgId) {}

 

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

} 

创建Bolt

Bolt是一个以Tuple作为输入,处理Tuple并生成新的Tuple作为输出的组件,Bolts需要实现(implementIRichBolt接口。 在这个例子中,我们使用两个BoltCallLogCreatorBoltCallLogCounterBolt来执行操作。

IRichBolt接口有以下方法:

  •  prepareBolt提供执行环境。 Executor将运行此方法来初始化Spout

  •  execute处理一个Tuple

  •  cleanup Bolt被关闭时调用

  • declareOutputFields声明输出的Tuple的结构

prepare 

prepare方法的签名如下:

prepare(Map conf, TopologyContext context, OutputCollector collector)

  • confBolt提供Storm的配置

  • context提供TopologyBolt的位置,任务ID,输入、输出信息等

  • collector发射被处理过的 Tuple 

execute

execute方法的签名如下: 

execute(Tuple tuple)

这里的Tuple是要处理的Tuple(被Spout或别的Bolt发射过来的)

execute方法一次处理一个TupleTuple的数据可以通过Tuple类的getValue方法访问。 可以延迟处理输入的Tuple。多个输入Tuple可以被一起处理,并生成一个新的Tuple输出。 处理过的Tuple可以通过使用OutputCollector类发射出去(给其他Bolt作为输入)。

cleanup

cleanup方法的签名如下:

cleanup() 

declareOutputFields

declareOutputFields方法的签名如下:

declareOutputFields(OutputFieldsDeclarer declarer)

这里参数declare用于声明输出流ID,输出字段等。

这个方法用来指定Tuple的输出结构。

通话记录创建器Bolt 

通话记录创建器Bolt接收通话记录Tuple,通话记录Tuple存有主叫号码,被叫号码和通话时长。通过组合主叫方号码和被叫方号码,这个Bolt创建了一个新值,新值的格式为“主叫号码被叫号码”,并将其命名为新字段“call”。 完整的代码如下:

代码: CallLogCreatorBolt.java

//import util packages

import java.util.HashMap;

import java.util.Map;

 

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

 

//import Storm IRichBolt package

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Tuple;

 

//Create a class CallLogCreatorBolt which implement IRichBolt interface

public class CallLogCreatorBolt implements IRichBolt {

   //Create instance for OutputCollector which collects and emits tuples to produce output

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      String from = tuple.getString(0);

      String to = tuple.getString(1);

      Integer duration = tuple.getInteger(2);

      collector.emit(new Values(from + " – " + to, duration));

   }

 

   @Override

   public void cleanup() {}

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("call", "duration"));

   }

      

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

通话记录计数器Bolt

通话记录计数器Bolt接收通话及其通话时长作为Tuple,它在prepare方法中初始化一个字典(Map)对象。 execute方法中,它检查Tuple并为每个Tuple中新的“call”值在字典对象(Map)中创建一个新的Key,并把对应的值(Value)设置为1 对于字典中已有的Key,它只是递增其值(Value)。 简单地说,这个Boltcall和它的计数保存在字典对象(Map)中。 当然,我们也可以将call和计数保存到数据源中,而不是字典中。 完整的程序代码如下所示: 

代码: CallLogCounterBolt.java

import java.util.HashMap;

import java.util.Map;

 

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Tuple;

 

public class CallLogCounterBolt implements IRichBolt {

   Map<String, Integer> counterMap;

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.counterMap = new HashMap<String, Integer>();

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      String call = tuple.getString(0);

      Integer duration = tuple.getInteger(1);

            

      if(!counterMap.containsKey(call)){

         counterMap.put(call, 1);

      }else{

         Integer c = counterMap.get(call) + 1;

         counterMap.put(call, c);

      }

            

      collector.ack(tuple);

   }

 

   @Override

   public void cleanup() {

      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){

         System.out.println(entry.getKey()+" : " + entry.getValue());

      }

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("call"));

   }

      

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

      

}

创建Topology

Storm Topology基本上是一个Thrift结构。 TopologyBuilder类提供了简单的方法来创建复杂的TopologyTopologyBuilder类具有设置SpoutsetSpout)和设置BoltsetBolt)的方法,最后,TopologyBuildercreateTopology方法来创建Topology 使用下面的代码片段来创建一个Topology

TopologyBuilder builder = new TopologyBuilder();

 

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

 

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())

   .shuffleGrouping("call-log-reader-spout");

 

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())

   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping方法能帮助SpoutBolt进行流分组(Stream Grouping)。  

本地Cluster

出于开发目的,我们可以使用“LocalCluster”类创建一个本地Cluster,然后使用“LocalCluster”类的“submitTopology”方法提交Topology来运行。“submitTopology”的一个参数是“Config”类的一个实例,在提交Topology之前,“Config”类用于设置配置选项。 该配置选项将在运行时与Cluster的配置合并,然后通过prepare方法发送给所有任务(SpoutBolt)。将Topology提交到Cluster后,我们将等待10秒钟让Cluster计算这个提交的Topology,然后使用“LocalCluster”的“shutdown”方法关闭Cluster 完整的程序代码如下所示:

代码: LogAnalyserStorm.java

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

//import storm configuration packages

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.topology.TopologyBuilder;

 

//Create main class LogAnalyserStorm submit topology.

public class LogAnalyserStorm {

   public static void main(String[] args) throws Exception{

      //Create Config instance for cluster configuration

      Config config = new Config();

      config.setDebug(true);

            

      //

      TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

 

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())

         .shuffleGrouping("call-log-reader-spout");

 

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())

         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

                   

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());

      Thread.sleep(10000);

            

      //Stop the topology

            

      cluster.shutdown();

   }

}

编译并运行应用程序

以上所示完整的应用程序包含4个类:

  • FakeCallLogReaderSpout.java

  • CallLogCreaterBolt.java

  • CallLogCounterBolt.java

  • LogAnalyerStorm.java

可以用一下命令编译这4个类:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

 

然后用以下命令运行这个应用程序:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

输出

一旦应用程序启动,它将输出有关Cluster启动过程,Spout和Bolt处理数据,以及最后Cluster关闭过程的完整详细信息。在“CallLogCounterBolt”类中,我们打印了通话记录及其计数的详细信息,这些信息将如下显示在控制台上

1234123402 – 1234123401 : 78

1234123402 – 1234123404 : 88

1234123402 – 1234123403 : 105

1234123401 – 1234123404 : 74

1234123401 – 1234123403 : 81

1234123401 – 1234123402 : 81

1234123403 – 1234123404 : 86

1234123404 – 1234123401 : 63

1234123404 – 1234123402 : 82

1234123403 – 1234123402 : 83

1234123404 – 1234123403 : 86

1234123403 – 1234123401 : 93

Java编程语言

Storm Topology通过Thrift接口实现,所以很容易任何语言来提交TopologyStorm支持RubyPython和许多其他语言。我们来看看python的例子。 

Python绑定 

Python是一种通用的解释型,交互式,面向对象的编程语言。Storm支持Python来实现TopologyPython支持发射(emitting),锚定(anchoring),确认(acking)和记录(logging)操作。

Bolt可以用任何语言来定义,用另一种语言编写的Bolt作为子进程执行,Storm通过标准输入(stdin/标准输出(stdout)用JSON消息与这些子进程通信。 首先用一个支持Python绑定的Bolt WordSplit为例。

public static class WordSplit implements IRichBolt {

   public WordSplit() {

      super("python", "splitword.py");

   }

      

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("word"));

   }

 

这里的WordSplit类实现了IRichBolt接口,但是它的业务逻辑是在一个Python文件“splitword.py”完成的,这个“splitword.py”作为参数传给WordSplitsuper方法来被Storm调用。现在创建一个Python文件“splitword.py”。

import storm

   class WordSplitBolt(storm.BasicBolt):

      def process(self, tup):

         words = tup.values[0].split(" ")

         for word in words:

         storm.emit([word])

WordSplitBolt().run()

这是用Python实现的分割一个给定句子中的单词的例子,也可以用其他支持的编程语言来实现。



关注微信服务号,手机看文章
关注微信服务号,手机看文章