Yahoo! Finance

Yahoo! Finance是互联网领先的商业新闻和金融数据网站,它是Yahoo!的一部分,并提供金融新闻,市场统计数据,国际市场数据,和任何人都可以访问的金融资源的其他信息。如果您是注册的Yahoo! 用户,那么你可以自定义Yahoo! Finance利用其某些产品。Yahoo! Finance API用于查询来自Yahoo!的金融数据,不过这个API只能显示15分钟前的数据,并每隔1分钟更新一次数据库,以访问当前股票相关信息。 现在让我们来看一个公司的实时情况,看看股票价值低于100时如何发出警报。

创建Spout

Spout的目的是获取上市公司的详细信息并将股票价格发送给Bolt 可以使用以下程序代码创建Spout

代码

YahooFinanceSpout.java 

import java.util.*;

import java.io.*;

import java.math.BigDecimal;

 

//import yahoofinace packages

import yahoofinance.YahooFinance;

import yahoofinance.Stock;

 

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

 

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

 

public class YahooFinanceSpout implements IRichSpout {

   private SpoutOutputCollector collector;

   private boolean completed = false;

   private TopologyContext context;

      

   @Override

   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){

      this.context = context;

      this.collector = collector;

   }

 

   @Override

   public void nextTuple() {

      try {

         Stock stock = YahooFinance.get("INTC");

         BigDecimal price = stock.getQuote().getPrice();

 

         this.collector.emit(new Values("INTC", price.doubleValue()));

         stock = YahooFinance.get("GOOGL");

         price = stock.getQuote().getPrice();

 

         this.collector.emit(new Values("GOOGL", price.doubleValue()));

         stock = YahooFinance.get("AAPL");

         price = stock.getQuote().getPrice();

 

         this.collector.emit(new Values("AAPL", price.doubleValue()));

      } catch(Exception e) {}

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("company", "price"));

   }

 

   @Override

   public void close() {}

      

   public boolean isDistributed() {

      return false;

   }

 

   @Override

   public void activate() {}

 

   @Override

   public void deactivate() {}

 

   @Override

   public void ack(Object msgId) {}

 

   @Override

   public void fail(Object msgId) {}

 

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

创建Bolt 

这里Bolt的目的是当股票价格低于100时给出警报。当股价跌破100时,它使用Java Map对象将临界(cutoff)价格限制警报设置为true; 否则为false 完整的程序代码如下:

代码

PriceCutOffBolt.java

import java.util.HashMap;

import java.util.Map;

 

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

 

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

 

import org.apache.storm.tuple.Tuple;

 

public class PriceCutOffBolt implements IRichBolt {

   Map<String, Integer> cutOffMap;

   Map<String, Boolean> resultMap;

      

   private OutputCollector collector;

 

   @Override

   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

      this.cutOffMap = new HashMap <String, Integer>();

      this.cutOffMap.put("INTC", 100);

      this.cutOffMap.put("AAPL", 100);

      this.cutOffMap.put("GOOGL", 100);

 

      this.resultMap = new HashMap<String, Boolean>();

      this.collector = collector;

   }

 

   @Override

   public void execute(Tuple tuple) {

      String company = tuple.getString(0);

      Double price = tuple.getDouble(1);

 

      if(this.cutOffMap.containsKey(company)){

         Integer cutOffPrice = this.cutOffMap.get(company);

 

         if(price < cutOffPrice) {

            this.resultMap.put(company, true);

         } else {

            this.resultMap.put(company, false);

         }

      }

            

      collector.ack(tuple);

   }

 

   @Override

   public void cleanup() {

      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){

         System.out.println(entry.getKey()+" : " + entry.getValue());

      }

   }

 

   @Override

   public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("cut_off_price"));

   }

      

   @Override

   public Map<String, Object> getComponentConfiguration() {

      return null;

   }

}

提交Topology

代码

YahooFinanceStorm.java

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

 

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.topology.TopologyBuilder;

 

public class YahooFinanceStorm {

   public static void main(String[] args) throws Exception{

      Config config = new Config();

      config.setDebug(true);

            

      TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

 

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())

         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));

                   

      LocalCluster cluster = new LocalCluster();

      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());

      Thread.sleep(10000);

      cluster.shutdown();

   }

}

编译并运行应用程序 

完成的应用程序包括3Java类:

  • YahooFinanceSpout.java

  • PriceCutOffBolt.java

  • YahooFinanceStorm.java

编译命令:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

运行命令: 

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.

YahooFinanceStorm

输出

输出类似以下信息:

GOOGL : false

AAPL : false

INTC : true



关注微信服务号,手机看文章
关注微信服务号,手机看文章