Storm流方式的统计系统怎么实现

本篇内容主要讲解“Storm流方式的统计系统怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm流方式的统计系统怎么实现”吧!

公司主营业务:网站制作、网站设计、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。成都创新互联公司是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。成都创新互联公司推出邵阳免费做网站回馈大家。

1: 初期硬件准备:

                1 如果条件具备:请保证您安装好了 redis集群

                2 配置好您的Storm开发环境

                3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间

2:业务背景的介绍:

                1  在这里我们将模拟一个   流方式的数据处理过程

                 2 数据的源头保存在我们的redis 集群之中

                 3  发射的数据格式为: ip,url,client_key

数据发射器

package storm.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.tuple.Fields;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;
import org.apache.log4j.Logger;

/**
 * click Spout 从redis中间读取所需要的数据
 */
public class ClickSpout extends BaseRichSpout {

	private static final long serialVersionUID = -6200450568987812474L;

	public static Logger LOG = Logger.getLogger(ClickSpout.class);

	// 对于redis,我们使用的是jedis客户端
	private Jedis jedis;

	// 主机
	private String host;

	// 端口
	private int port;

	// Spout 收集器
	private SpoutOutputCollector collector;

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

	
	        // 这里,我们发射的格式为
	        // IP,URL,CLIENT_KEY
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
				storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
	}

	@Override
	public void open(Map conf, TopologyContext topologyContext,
			SpoutOutputCollector spoutOutputCollector) {

		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		this.collector = spoutOutputCollector;
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
	}

	@Override
	public void nextTuple() {
		String content = jedis.rpop("count");
		if (content == null || "nil".equals(content)) {
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
			}
		} else {

			// 将jedis对象 rpop出来的字符串解析为 json对象
			JSONObject obj = (JSONObject) JSONValue.parse(content);

			String ip = obj.get(storm.cookbook.Fields.IP).toString();
			String url = obj.get(storm.cookbook.Fields.URL).toString();
			String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
					.toString();

			System.out.println("this is a clientKey");

			// List tuple对象
			collector.emit(new Values(ip, url, clientKey));
		}
	}
}

在这个过程之中,请注意:

1  我们在 OPEN 方法之中初始化   host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库

2 我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个

Values对象

让我们来看看数据的流向图:

Storm流方式的统计系统怎么实现

在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt

                                    1  : repeatVisitBolt 

                                    2   :  geographyBolt 

共同来读取同一个数据源的数据:clickSpout

3 细细察看 repeatVisitBolt

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;

public class RepeatVisitBolt extends BaseRichBolt {

	private OutputCollector collector;

	private Jedis jedis;
	private String host;
	private int port;

	@Override
	public void prepare(Map conf, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
		jedis.connect();
	}

	public boolean isConnected() {
		if (jedis == null)
			return false;
		return jedis.isConnected();
	}

	@Override
	public void execute(Tuple tuple) {

		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
		String clientKey = tuple
				.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
		String url = tuple.getStringByField(storm.cookbook.Fields.URL);
		String key = url + ":" + clientKey;

		String value = jedis.get(key);
		
		// redis中取,如果redis中没有,就插入新的一条访问记录。
		if (value == null) {
			jedis.set(key, "visited");
			collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
		} else {
			collector
					.emit(new Values(clientKey, url, Boolean.FALSE.toString()));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
				storm.cookbook.Fields.UNIQUE));
	}
}

  在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】

4:

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class VisitStatsBolt extends BaseRichBolt {

    private OutputCollector collector;

    private int total = 0;
    private int uniqueCount = 0;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
    	
    	//在这里,我们在上游来判断这个Fields 是否是独特和唯一的
        boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
        
        total++;
        if(unique)uniqueCount++;
        collector.emit(new Values(total,uniqueCount));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
        		storm.cookbook.Fields.TOTAL_UNIQUE));
    }
}

第一次出现,uv ++ 

5  接下来,看看流水线2 :

package storm.bolt;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.json.simple.JSONObject;

import storm.cookbook.IPResolver;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
 * File | Settings | File Templates.
 */
public class GeographyBolt extends BaseRichBolt {

	// ip解析器
	private IPResolver resolver;

	private OutputCollector collector;

	public GeographyBolt(IPResolver resolver) {
		this.resolver = resolver;
	}

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {

		// 1 从上级的目录之中拿到我们所要使用的ip
		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);

		// 将ip 转换为json
		JSONObject json = resolver.resolveIP(ip);

		// 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象
		String city = (String) json.get(storm.cookbook.Fields.CITY);
		String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);

		collector.emit(new Values(country, city));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

		// 确定了我们这次输出元祖的格式
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.CITY));
	}
}

以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class GeoStatsBolt extends BaseRichBolt {

	private class CountryStats {

		//
		private int countryTotal = 0;

		private static final int COUNT_INDEX = 0;
		private static final int PERCENTAGE_INDEX = 1;
		private String countryName;

		public CountryStats(String countryName) {
			this.countryName = countryName;
		}

		private Map> cityStats = new HashMap>();

		/**
		 * @param cityName
		 */
		public void cityFound(String cityName) {
			countryTotal++;

			// 已经有了值,一个加1的操作
			if (cityStats.containsKey(cityName)) {
				cityStats.get(cityName)
						.set(COUNT_INDEX,
								cityStats.get(cityName).get(COUNT_INDEX)
										.intValue() + 1);
				// 没有值的时候
			} else {
				List list = new LinkedList();
				list.add(1);
				list.add(0);
				cityStats.put(cityName, list);
			}

			double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
					/ (double) countryTotal;

			cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);

		}

		/**
		 * @return 拿到的国家总数
		 */
		public int getCountryTotal() {
			return countryTotal;
		}

		/**
		 * @param cityName  依据传入的城市名称,拿到城市总数
		 * @return
		 */

		public int getCityTotal(String cityName) {
			return cityStats.get(cityName).get(COUNT_INDEX).intValue();
		}

		
		public String toString() {
			return "Total Count for " + countryName + " is "
					+ Integer.toString(countryTotal) + "\n" + "Cities:  "
					+ cityStats.toString();
		}
	}

	private OutputCollector collector;

	// CountryStats 是一个内部类的对象
	private Map stats = new HashMap();

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {
		String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
		String city = tuple.getStringByField(storm.cookbook.Fields.CITY);

		// 如果国家不存在的时候,新增加一个国家,国家的统计
		if (!stats.containsKey(country)) {
			stats.put(country, new CountryStats(country));
		}

		// 这里拿到新的统计,cityFound 是拿到某个城市的值
		stats.get(country).cityFound(city);

		collector.emit(new Values(country,
				stats.get(country).getCountryTotal(), city, stats.get(country)
						.getCityTotal(city)));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.COUNTRY_TOTAL,
				storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
	}
}

有关地理位置的统计,附带上程序其他的使用类

package storm.cookbook;

/**
 */
public class Fields {

	public static final String IP = "ip";
	
	public static final String URL = "url";
	
	public static final String CLIENT_KEY = "clientKey";
	
	public static final String COUNTRY = "country";
	
	public static final String COUNTRY_NAME = "country_name";
	
	public static final String CITY = "city";
	
	//唯一的,独一无二的
	public static final String UNIQUE = "unique";
	
	//城镇整数
	public static final String COUNTRY_TOTAL = "countryTotal";
	
	//城市整数
	public static final String CITY_TOTAL = "cityTotal";
	
	//总共计数
	public static final String TOTAL_COUNT = "totalCount";
	
	//总共独一无二的
	public static final String TOTAL_UNIQUE = "totalUnique";




}
package storm.cookbook;

import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;

public class HttpIPResolver implements IPResolver, Serializable {

	static String url = "http://api.hostip.info/get_json.php";

	@Override
	public JSONObject resolveIP(String ip) {
		URL geoUrl = null;
		BufferedReader in = null;
		try {
			geoUrl = new URL(url + "?ip=" + ip);
			URLConnection connection = geoUrl.openConnection();
			in = new BufferedReader(new InputStreamReader(
					connection.getInputStream()));
			String inputLine;

			JSONObject json = (JSONObject) JSONValue.parse(in);

			in.close();

			return json;
		} catch (IOException e) {
			e.printStackTrace();
		} finally {

			// 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
				}
			}
		}
		return null;
	}
}
package storm.cookbook;

import org.json.simple.JSONObject;

/**
 * Created with IntelliJ IDEA.
 * User: admin
 * Date: 2012/12/07
 * Time: 5:29 PM
 * To change this template use File | Settings | File Templates.
 */
public interface IPResolver {

	public JSONObject resolveIP(String ip);
}

到此,相信大家对“Storm流方式的统计系统怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


本文题目:Storm流方式的统计系统怎么实现
文章源于:http://ybzwz.com/article/pphhgs.html