Twitter

Twitter是一种在线社交网络服务,提供发送和接收用户推文(tweet)的平台。 注册用户可以阅读和发布推文,但未注册的用户只能阅读推文。 Hashtag用于按关键字对推文进行分类,方法是在相关关键字前附加#。 现在让我们来看一个在每个主题中查看最常用的hashtag的实时场景。

创建Spout

Spout的目的是尽快得到用户提交的推文(tweet)。 Twitter提供了“Twitter Streaming API”,这是一种基于web service的工具,用于实时检索用户提交的推文。 Twitter Streaming API可以用任何编程语言访问。

twitter4j是一个开源的非官方Java库,它提供了一个基于Java的模块来轻松访问Twitter Streaming API twitter4j提供了一个基于监听器(listener-based)的框架来访问推文。 要访问Twitter Streaming API,我们需要以Twitter开发者帐户登录Twitter并获得以下OAuth身份验证需要的详细信息。

  • Customerkey

  • CustomerSecret

  • AccessToken

  • AccessTookenSecret

Storm提供了一个twitter spoutTwitterSampleSpout,我们将使用它来获取推文。 Spout需要OAuth验证信息和至少一个关键字,Spout将根据关键字发射实时推文。 完整的程序代码如下:

代码 

TwitterSampleSpout.java

import java.util.Map;

import java.util.concurrent.LinkedBlockingQueue;

 

import twitter4j.FilterQuery;

import twitter4j.StallWarning;

import twitter4j.Status;

import twitter4j.StatusDeletionNotice;

import twitter4j.StatusListener;

 

import twitter4j.TwitterStream;

import twitter4j.TwitterStreamFactory;

import twitter4j.auth.AccessToken;

import twitter4j.conf.ConfigurationBuilder;

 

import org.apache.storm.Config;

import org.apache.storm.spout.SpoutOutputCollector;

 

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

import org.apache.storm.utils.Utils;

 

@SuppressWarnings("serial")

public class TwitterSampleSpout extends BaseRichSpout {

   SpoutOutputCollector _collector;

   LinkedBlockingQueue<Status> queue = null;

   TwitterStream _twitterStream;

            

   String consumerKey;

   String consumerSecret;

   String accessToken;

   String accessTokenSecret;

   String[] keyWords;

            

   public TwitterSampleSpout(String consumerKey, String consumerSecret,

      String accessToken, String accessTokenSecret, String[] keyWords) {

         this.consumerKey = consumerKey;

         this.consumerSecret = consumerSecret;

         this.accessToken = accessToken;

         this.accessTokenSecret = accessTokenSecret;

         this.keyWords = keyWords;

   }

            

   public TwitterSampleSpout() {

      // TODO Auto-generated constructor stub

   }

            

   @Override

   public void open(Map conf, TopologyContext context,

      SpoutOutputCollector collector) {

         queue = new LinkedBlockingQueue<Status>(1000);

         _collector = collector;

         StatusListener listener = new StatusListener() {

            @Override

            public void onStatus(Status status) {

               queue.offer(status);

            }

                                

            @Override

            public void onDeletionNotice(StatusDeletionNotice sdn) {}

                                

            @Override

            public void onTrackLimitationNotice(int i) {}

                                

            @Override

            public void onScrubGeo(long l, long l1) {}

                                

            @Override

            public void onException(Exception ex) {}

                                

            @Override

            public void onStallWarning(StallWarning arg0) {

               // TODO Auto-generated method stub

            }

         };

                         

         ConfigurationBuilder cb = new ConfigurationBuilder();

                         

         cb.setDebugEnabled(true)

            .setOAuthConsumerKey(consumerKey)

            .setOAuthConsumerSecret(consumerSecret)

            .setOAuthAccessToken(accessToken)

            .setOAuthAccessTokenSecret(accessTokenSecret);

                                

         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

         _twitterStream.addListener(listener);

                         

         if (keyWords.length == 0) {

            _twitterStream.sample();

         }else {

            FilterQuery query = new FilterQuery().track(keyWords);

            _twitterStream.filter(query);

         }

   }

                   

   @Override

   public void nextTuple() {

      Status ret = queue.poll();

                         

      if (ret == null) {

         Utils.sleep(50);

      } else {

         _collector.emit(new Values(ret));

      }

   }

                   

   @Override

   public void close() {

      _twitterStream.shutdown();

   }

                   

   @Override

   public Map<String, Object> getComponentConfiguration() {

      Config ret = new Config();

      ret.setMaxTaskParallelism(1);

      return ret;

   }

                   

   @Override

   public void ack(Object id) {}

                   

   @Override

   public void fail(Object id) {}

                   

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("tweet"));

   }

}

Hashtag 解析Bolt 

Spout发出的推文将被转发到HashtagReaderBolt,它将处理推文并发出所有可用的hashtag HashtagReaderBolt使用twitter4j提供的getHashTagEntities方法,getHashTagEntities读取推文并返回hashtag列表。 完整的程序代码如下: 

代码

HashtagReaderBolt.java

import java.util.HashMap;

import java.util.Map;

import twitter4j.*;

import twitter4j.conf.*;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Tuple;

 

public class HashtagReaderBolt implements IRichBolt {

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      Status tweet = (Status) tuple.getValueByField("tweet");

      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {

         System.out.println("Hashtag: " + hashtage.getText());

         this.collector.emit(new Values(hashtage.getText()));

      }

   }

 

   @Override

   public void cleanup() {}

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("hashtag"));

   }

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

Hashtag 计数器Bolt 

发出的hashtag将被转发到HashtagCounterBolt HashtagCounterBolt处理所有的hashtag,并使用Java Map对象将每个hashtag及其计数保存在内存中。 完整的程序代码如下:

代码

HashtagCounterBolt.java

import java.util.HashMap;

import java.util.Map;

 

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

 

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Tuple;

 

public class HashtagCounterBolt implements IRichBolt {

   Map<String, Integer> counterMap;

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.counterMap = new HashMap<String, Integer>();

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      String key = tuple.getString(0);

 

      if(!counterMap.containsKey(key)){

         counterMap.put(key, 1);

      }else{

         Integer c = counterMap.get(key) + 1;

         counterMap.put(key, c);

      }

            

      collector.ack(tuple);

   }

 

   @Override

   public void cleanup() {

      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){

         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());

      }

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("hashtag"));

   }

      

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

      

}

提交一个Topology

Twitter TopologyTwitterSampleSpoutHashtagReaderBoltHashtagCounterBolt组成。 以下程序代码显示如何提交拓扑:

代码

TwitterHashtagStorm.java

import java.util.*;

 

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.topology.TopologyBuilder;

 

public class TwitterHashtagStorm {

   public static void main(String[] args) throws Exception{

      String consumerKey = args[0];

      String consumerSecret = args[1];

            

      String accessToken = args[2];

      String accessTokenSecret = args[3];

            

      String[] arguments = args.clone();

      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);

            

      Config config = new Config();

      config.setDebug(true);

            

      TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,

         consumerSecret, accessToken, accessTokenSecret, keyWords));

 

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())

         .shuffleGrouping("twitter-spout");

 

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())

         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));

                   

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("TwitterHashtagStorm", config,

         builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();

   }

}

编译并运行应用程序 

完整的应用程序包括4Java类:

  • TwitterSampleSpout.java

  • HashtagReaderBolt.java

  • HashtagCounterBolt.java

  • TwitterHashtagStorm.java

编译类文件: 

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

运行应用程序: 

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.

TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>

<keyword1> <keyword2> … <keywordN>

输出 

程序运行成功后会在控制台上输出hashtag和对应的计数:

Result: jazztastic : 1

Result: foodie : 1

Result: Redskins : 1

Result: Recipe : 1

Result: cook : 1

Result: android : 1

Result: food : 2

Result: NoToxicHorseMeat : 1

Result: Purrs4Peace : 1

Result: livemusic : 1

Result: VIPremium : 1

Result: Frome : 1

Result: SundayRoast : 1

Result: Millennials : 1

Result: HealthWithKier : 1

Result: LPs30DaysofGratitude : 1

Result: cooking : 1

Result: gameinsight : 1

Result: Countryfile : 1

Result: androidgames : 1



关注微信服务号,手机看文章
关注微信服务号,手机看文章