简介

TridentStorm的延伸,也是由Twitter开发的。开发Trident的主要原因是在Storm之上提供高级抽象以及有状态流处理和低延迟分布式查询。

Trident也使用SpoutBolt,但是这些底层组件在执行前由Trident自动生成。 Trident有函数(function),过滤器(filter),连接(join),分组(grouping)和聚合(aggregation)。

Trident用一系列的事务(transaction)对流进行批处理。 通常,这些事务的大小要处理数千或数百万个Tuple,取决于输入流。 这样,TridentStorm不同的是,它执行Tuple-by-Tuple处理。

批处理概念与数据库事务非常相似。 每个事务都被分配一个事务ID 一旦所有处理完成,事务即被视为成功。 但是,任何一个事务里的Tuple处理失败将导致整个事务被重新执行。 对于每个批次,Trident将在事务开始时调用beginCommit,并在结束时进行提交(commit)。

Trident Topology 

Trident API暴露了一个使用“TridentTopology”类创建Trident Topology的简单选项。基本上,Trident Topology接收来自Spout的输入流,并在该流上执行有序的操作序列(过滤,聚合,分组等)。Storm TupleTrident Tuple取代,Bolt被操作(operation)取代。 一个简单的Trident Topology可以用以下代码创建:

TridentTopology topology = new TridentTopology();

Trident Tuple 

Trident Tuple是一个已命名的值列表。TridentTuple接口是Trident Topology的数据模型,它是Trident Topology处理的基本数据单元。

Trident Spout 

Trident SpoutStorm Spout类似,另外还有使用Trident功能的附加选项。实际上,我们仍然可以使用Storm TopologyIRichSpout,但因为它本质上不具有事务性,我们将无法利用Trident的优势。

具有使用Trident所有功能的最基本的Spout是“ITridentSpout”,它支持事务性和不透明事务语义。其他可用的Spout接口有IBatchSpoutIPartitionedTridentSpoutIOpaquePartitionedTridentSpout

除了这些通用Spout之外,Trident还提供了很多Trident Spout的实现类,例如FeederBatchSpout,我们可以使用它轻松发送Trident Tuple的命名列表,而无需担心批处理,并行性等问题。 

FeederBatchSpout的创建和数据输入可以按照如下所示完成:

TridentTopology topology = new TridentTopology();

FeederBatchSpout testSpout = new FeederBatchSpout(

   ImmutableList.of("fromMobileNumber", "toMobileNumber", duration”));

topology.newStream("fixed-batch-spout", testSpout)

testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operation

Trident依靠“Trident Operation”来处理Trident Tuple的输入流,Trident API具有许多内置操作来进行从简单到复杂的流处理,这些操作的范围可以从简单验证到复杂的Trident Tuple分组和聚合。 让我们来看看最重要和最常用的操作。 

过滤器(Filter

过滤器是用于执行输入验证任务的对象。Trident过滤器获取Trident Tuple字段的子集作为输入,并根据某些条件是否已满足来返回truefalse 如果返回true,则Tuple保存在输出流中; 否则,该Tuple将从流中移除。过滤器继承自BaseFilter类并实现isKeep方法。以下是过滤器操作的示例实现:

public class MyFilter extends BaseFilter {

   public boolean isKeep(TridentTuple tuple) {

      return tuple.getInteger(1) % 2 == 0;

   }

}

 

input

 

[1, 2]

[1, 3]

[1, 4]

 

output

 

[1, 2]

[1, 4]

可以使用“each”方法在Topology中调用过滤器函, Fields”类可用于指定输入(Trident Tuple的子集)。 示例代码如下 

TridentTopology topology = new TridentTopology();

topology.newStream("spout", spout)

.each(new Fields("a", "b"), new MyFilter()) 

函数(Function 

函数是用于在单个Trident Tuple上执行简单操作的对象,它处理Trident Tuple字段的子集并发出零个或更多新的Trident Tuple字段。

函数从BaseFunction类继承并实现了execute方法。 下面给出了一个示例实现:

public class MyFunction extends BaseFunction {

   public void execute(TridentTuple tuple, TridentCollector collector) {

      int a = tuple.getInteger(0);

      int b = tuple.getInteger(1);

      collector.emit(new Values(a + b));

   }

}

 

input

 

[1, 2]

[1, 3]

[1, 4]

 

output

 

[1, 2, 3]

[1, 3, 4]

[1, 4, 5]

就像过滤器操作一样,可以使用each方法在Topology中调用函数操作。示例代码如下:

TridentTopology topology = new TridentTopology();

topology.newStream("spout", spout)

   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合(Aggregation 

聚集是用于对批量(batch)输入或分区(partition)输入或流(stream)输入执行聚合操作的对象。Trident有三种类型的聚合:

  •  aggregate 分别聚合每批Trident Tuple 在聚合过程中,Tuple最初使用全局分组重新分区,以将同一批次的所有分区合并到一个分区中。

  • partitionAggregate 聚合每个分区,而不是整个批次的Trident Tuple。分区聚合的输出完全替换了输入的Tuple,这个输出包含单个字段Tuple

  • persistentaggregate 聚合所有批次的所有Trident Tuple并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

 

// aggregate operation

topology.newStream("spout", spout)

   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

   .aggregate(new Count(), new Fields(“count”))

      

// partitionAggregate operation

topology.newStream("spout", spout)

   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

   .partitionAggregate(new Count(), new Fields(“count"))

      

// persistentAggregate – saving the count to memory

topology.newStream("spout", spout)

   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregatorReducerAggregator或通用Aggregator接口创建聚合操作。上面例子中使用的“count”聚合器是内置聚合器之一,它实现了“CombinerAggregator”接口,具体实现如下

public class Count implements CombinerAggregator<Long> {

   @Override

   public Long init(TridentTuple tuple) {

      return 1L;

   }

      

   @Override

   public Long combine(Long val1, Long val2) {

      return val1 + val2;

   }

      

   @Override

   public Long zero() {

      return 0L;

   }

}

分组(Grouping

分组操作是一种内置操作,可以通过groupBy方法调用。groupBy方法通过在指定的字段上执行partitionBy来重新对流分区,然后在每个分区内将组字段相等的Tuple分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获得分组聚合。示例代码如下:

TridentTopology topology = new TridentTopology();

 

// persistentAggregate – saving the count to memory

topology.newStream("spout", spout)

   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))

   .groupBy(new Fields(“d”)

   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和连接(Merging and Joining

合并和连接可以分别使用“merge”和“join”方法完成。合并把一个或多个流合在一起,连接类似于合并,除了连接使用来自双方的Trident Tuple字段来检查和连接两个流。 而且,连接只能在批量(batch)级别下工作。示例代码如下:

TridentTopology topology = new TridentTopology();

topology.merge(stream1, stream2, stream3);

topology.join(stream1, new Fields("key"), stream2, new Fields("x"),

   new Fields("key", "a", "b", "c"));

状态维护(State Maintenance

Trident提供了状态维护机制。 状态信息可以存储在Topology本身中,也可以将其存储在单独的数据库中。提供状态维护的原因是,如果任何一个Tuple没有被成功处理,就会重新处理这个Tuple。这在更新状态时会产生问题,因为您不确定此Tuple的状态是否已更新过。 如果Tuple在更新状态之前失败了,那么重试这个Tuple将使状态稳定。 但是,如果Tuple在更新状态后失败,那么重试这个Tuple将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤来确保只处理一次消息

  • 小批量的处理Tuple

  • 为每个批次分配一个唯一的ID 如果一个批次被重试了,它被赋予相同的唯一ID

  • 状态更新在批次中排序。例如,只有第一批次的状态更新完成后,才能进行第二批次的状态更新。

分布式RPC

分布式RPC用于查询和检索Trident Topology的结果。Storm有一个内置的分布式RPC服务器,这个服务器接收来自客户端的RPC请求并将其传递给TopologyTopology处理请求并将结果发送回分布式RPC服务器,分布式RPC服务器又把结果发送到客户端。Trident的分布式RPC查询像普通的RPC查询一样执行,除了这些查询是并行运行的。

什么时候需要用Trident

和许多用例一样,如果需求是只处理查询一次,我们可以通过在Trident中编写Topology来实现,相反,在Storm中很难实现一次处理。 因此Trident对那些需要精确处理一次的用例非常有用。 Trident并非针对所有用例,特别是高性能用例,因为它增加了Storm的复杂性并管理了状态。

Trident的例子

我们把前一章中制定的通话记录日志分析器应用程序转换为Trident框架。由于提供了更高层次的API,用Trident实现这个应用比纯Storm更容易一些,因为Storm需要执行Trident中的FunctionFilterAggregateGroupByJoinMerge操作中的任何一个。 最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。

格式化通话信息 

FormatCall类的用途是格式化包含“主叫号码”和“被叫号码”的通话信息。完整的程序代码如下:

代码:

FormatCall.java

import org.apache.storm.tuple.Values;

 

import org.apache.storm.trident.operation.BaseFunction;

import org.apache.storm.trident.operation.TridentCollector;

import org.apache.storm.trident.tuple.TridentTuple;

 

public class FormatCall extends BaseFunction {

   @Override

   public void execute(TridentTuple tuple, TridentCollector collector) {

      String fromMobileNumber = tuple.getString(0);

      String toMobileNumber = tuple.getString(1);

      collector.emit(new Values(fromMobileNumber + " – " + toMobileNumber));

   }

}

CSVSplit

CSVSplit类的用途是根据“逗号(,)”分割输入字符串并发送字符串中的每个单词,该函数用于解析分布式查询的输入参数。完整的代码如下:

代码:CSVSplit.java

import org.apache.storm.tuple.Values;

 

import org.apache.storm.trident.operation.BaseFunction;

import org.apache.storm.trident.operation.TridentCollector;

import org.apache.storm.trident.tuple.TridentTuple;

 

public class CSVSplit extends BaseFunction {

   @Override

   public void execute(TridentTuple tuple, TridentCollector collector) {

      for(String word: tuple.getString(0).split(",")) {

         if(word.length() > 0) {

            collector.emit(new Values(word));

         }

      }

   }

}

Log Analyzer

这是主要的应用程序。一开始,应用程序将使用FeederBatchSpout初始化TridentTopology并提供通话信息。Trident Topology流可以使用TridentTopology类的newStream方法创建,同样,可以使用TridentTopology类的newDRCPStream方法创建Trident Topology DRPC流。一个简单的DRCP服务器可以使用LocalDRPC类创建。 LocalDRPC有搜索某个关键字的execute方法。完整的代码如下:

代码:LogAnalyserTrident.java

import java.util.*;

 

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.LocalDRPC;

import org.apache.storm.utils.DRPCClient;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

import org.apache.storm.trident.TridentState;

import org.apache.storm.trident.TridentTopology;

import org.apache.storm.trident.tuple.TridentTuple;

 

import org.apache.storm.trident.operation.builtin.FilterNull;

import org.apache.storm.trident.operation.builtin.Count;

import org.apache.storm.trident.operation.builtin.Sum;

import org.apache.storm.trident.operation.builtin.MapGet;

import org.apache.storm.trident.operation.builtin.Debug;

import org.apache.storm.trident.operation.BaseFilter;

 

import org.apache.storm.trident.testing.FixedBatchSpout;

import org.apache.storm.trident.testing.FeederBatchSpout;

import org.apache.storm.trident.testing.Split;

import org.apache.storm.trident.testing.MemoryMapState;

 

import com.google.common.collect.ImmutableList;

 

public class LogAnalyserTrident {

   public static void main(String[] args) throws Exception {

      System.out.println("Log Analyser Trident");

      TridentTopology topology = new TridentTopology();

            

      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",

         "toMobileNumber", "duration"));

 

      TridentState callCounts = topology

         .newStream("fixed-batch-spout", testSpout)

         .each(new Fields("fromMobileNumber", "toMobileNumber"),

         new FormatCall(), new Fields("call"))

         .groupBy(new Fields("call"))

         .persistentAggregate(new MemoryMapState.Factory(), new Count(),

         new Fields("count"));

 

      LocalDRPC drpc = new LocalDRPC();

 

      topology.newDRPCStream("call_count", drpc)

         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

 

      topology.newDRPCStream("multiple_call_count", drpc)

         .each(new Fields("args"), new CSVSplit(), new Fields("call"))

         .groupBy(new Fields("call"))

         .stateQuery(callCounts, new Fields("call"), new MapGet(),

         new Fields("count"))

         .each(new Fields("call", "count"), new Debug())

         .each(new Fields("count"), new FilterNull())

         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

 

      Config conf = new Config();

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("trident", conf, topology.build());

      Random randomGenerator = new Random();

      int idx = 0;

            

      while(idx < 10) {

         testSpout.feed(ImmutableList.of(new Values("1234123401",

            "1234123402", randomGenerator.nextInt(60))));

 

         testSpout.feed(ImmutableList.of(new Values("1234123401",

            "1234123403", randomGenerator.nextInt(60))));

 

         testSpout.feed(ImmutableList.of(new Values("1234123401",

            "1234123404", randomGenerator.nextInt(60))));

 

         testSpout.feed(ImmutableList.of(new Values("1234123402",

            "1234123403", randomGenerator.nextInt(60))));

 

         idx = idx + 1;

      }

 

      System.out.println("DRPC : Query starts");

      System.out.println(drpc.execute("call_count","1234123401 – 1234123402"));

      System.out.println(drpc.execute("multiple_call_count", "1234123401 –

         1234123402,1234123401 – 1234123403"));

      System.out.println("DRPC : Query ends");

 

      cluster.shutdown();

      drpc.shutdown();

 

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);

   }

}

编译并运行应用程序

完整的应用程序有三个Java类:

  • FormatCall.java

  • CSVSplit.java

  • LogAnalyerTrident.java

编译代码:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

运行:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident


输出

应用程序启动后,应用程序将输出关于Cluster启动过程,操作处理,DRPC服务器和客户端信息以及Cluster关闭过程的完整详细信息。该输出将显示在控制台上,如下所示:

DRPC : Query starts

[["1234123401 – 1234123402",10]]

DEBUG: [1234123401 – 1234123402, 10]

DEBUG: [1234123401 – 1234123403, 10]

[[20]]

DRPC : Query ends



关注微信服务号,手机看文章
关注微信服务号,手机看文章