溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm流方式的統(tǒng)計系統(tǒng)怎么實現(xiàn)

發(fā)布時間:2021-12-23 14:13:42 來源:億速云 閱讀:129 作者:iii 欄目:云計算

本篇內(nèi)容主要講解“Storm流方式的統(tǒng)計系統(tǒng)怎么實現(xiàn)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Storm流方式的統(tǒng)計系統(tǒng)怎么實現(xiàn)”吧!

1: 初期硬件準備:

                1 如果條件具備:請保證您安裝好了 redis集群

                2 配置好您的Storm開發(fā)環(huán)境

                3 保證好您的開發(fā)環(huán)境的暢通: 主機與主機之間,Storm與redis之間

2:業(yè)務背景的介紹:

                1  在這里我們將模擬一個   流方式的數(shù)據(jù)處理過程

                 2 數(shù)據(jù)的源頭保存在我們的redis 集群之中

                 3  發(fā)射的數(shù)據(jù)格式為: ip,url,client_key

數(shù)據(jù)發(fā)射器

package storm.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.tuple.Fields;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;
import org.apache.log4j.Logger;

/**
 * click Spout 從redis中間讀取所需要的數(shù)據(jù)
 */
public class ClickSpout extends BaseRichSpout {

	private static final long serialVersionUID = -6200450568987812474L;

	public static Logger LOG = Logger.getLogger(ClickSpout.class);

	// 對于redis,我們使用的是jedis客戶端
	private Jedis jedis;

	// 主機
	private String host;

	// 端口
	private int port;

	// Spout 收集器
	private SpoutOutputCollector collector;

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

	
	        // 這里,我們發(fā)射的格式為
	        // IP,URL,CLIENT_KEY
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
				storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
	}

	@Override
	public void open(Map conf, TopologyContext topologyContext,
			SpoutOutputCollector spoutOutputCollector) {

		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		this.collector = spoutOutputCollector;
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
	}

	@Override
	public void nextTuple() {
		String content = jedis.rpop("count");
		if (content == null || "nil".equals(content)) {
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
			}
		} else {

			// 將jedis對象 rpop出來的字符串解析為 json對象
			JSONObject obj = (JSONObject) JSONValue.parse(content);

			String ip = obj.get(storm.cookbook.Fields.IP).toString();
			String url = obj.get(storm.cookbook.Fields.URL).toString();
			String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
					.toString();

			System.out.println("this is a clientKey");

			// List<Object> tuple對象
			collector.emit(new Values(ip, url, clientKey));
		}
	}
}

在這個過程之中,請注意:

1  我們在 OPEN 方法之中初始化   host,port,collector,以及Redis的連接,調(diào)用Connect方法并連接到redis數(shù)據(jù)庫

2 我們在nextTupe 取出數(shù)據(jù),并且將他轉(zhuǎn)換為一個JSON對象,并且拿到 ip,url,clientKey,同時將他們包裝成為一個

Values對象

讓我們來看看數(shù)據(jù)的流向圖:

Storm流方式的統(tǒng)計系統(tǒng)怎么實現(xiàn)

在我們的數(shù)據(jù)從clickSpout 讀取以后,接下來,我們將采用2個bolt

                                    1  : repeatVisitBolt 

                                    2   :  geographyBolt 

共同來讀取同一個數(shù)據(jù)源的數(shù)據(jù):clickSpout

3 細細察看 repeatVisitBolt

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;

public class RepeatVisitBolt extends BaseRichBolt {

	private OutputCollector collector;

	private Jedis jedis;
	private String host;
	private int port;

	@Override
	public void prepare(Map conf, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
		jedis.connect();
	}

	public boolean isConnected() {
		if (jedis == null)
			return false;
		return jedis.isConnected();
	}

	@Override
	public void execute(Tuple tuple) {

		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
		String clientKey = tuple
				.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
		String url = tuple.getStringByField(storm.cookbook.Fields.URL);
		String key = url + ":" + clientKey;

		String value = jedis.get(key);
		
		// redis中取,如果redis中沒有,就插入新的一條訪問記錄。
		if (value == null) {
			jedis.set(key, "visited");
			collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
		} else {
			collector
					.emit(new Values(clientKey, url, Boolean.FALSE.toString()));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
				storm.cookbook.Fields.UNIQUE));
	}
}

  在這里,我們把url 和 clientKey 組合成為 【url:clientKey】的格式組合,并依據(jù)這個對象,在redis中去查找,如果沒有,那那Set到redis中間去,并且判定它為【unique】

4:

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class VisitStatsBolt extends BaseRichBolt {

    private OutputCollector collector;

    private int total = 0;
    private int uniqueCount = 0;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
    	
    	//在這里,我們在上游來判斷這個Fields 是否是獨特和唯一的
        boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
        
        total++;
        if(unique)uniqueCount++;
        collector.emit(new Values(total,uniqueCount));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
        		storm.cookbook.Fields.TOTAL_UNIQUE));
    }
}

第一次出現(xiàn),uv ++ 

5  接下來,看看流水線2 :

package storm.bolt;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.json.simple.JSONObject;

import storm.cookbook.IPResolver;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
 * File | Settings | File Templates.
 */
public class GeographyBolt extends BaseRichBolt {

	// ip解析器
	private IPResolver resolver;

	private OutputCollector collector;

	public GeographyBolt(IPResolver resolver) {
		this.resolver = resolver;
	}

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {

		// 1 從上級的目錄之中拿到我們所要使用的ip
		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);

		// 將ip 轉(zhuǎn)換為json
		JSONObject json = resolver.resolveIP(ip);

		// 將 city和country 組織成為一個新的元祖,在這里也就是我們的Values對象
		String city = (String) json.get(storm.cookbook.Fields.CITY);
		String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);

		collector.emit(new Values(country, city));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

		// 確定了我們這次輸出元祖的格式
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.CITY));
	}
}

以上Bolt,完成了一個Ip到 CITY,COUNTRY 的轉(zhuǎn)換

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class GeoStatsBolt extends BaseRichBolt {

	private class CountryStats {

		//
		private int countryTotal = 0;

		private static final int COUNT_INDEX = 0;
		private static final int PERCENTAGE_INDEX = 1;
		private String countryName;

		public CountryStats(String countryName) {
			this.countryName = countryName;
		}

		private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();

		/**
		 * @param cityName
		 */
		public void cityFound(String cityName) {
			countryTotal++;

			// 已經(jīng)有了值,一個加1的操作
			if (cityStats.containsKey(cityName)) {
				cityStats.get(cityName)
						.set(COUNT_INDEX,
								cityStats.get(cityName).get(COUNT_INDEX)
										.intValue() + 1);
				// 沒有值的時候
			} else {
				List<Integer> list = new LinkedList<Integer>();
				list.add(1);
				list.add(0);
				cityStats.put(cityName, list);
			}

			double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
					/ (double) countryTotal;

			cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);

		}

		/**
		 * @return 拿到的國家總數(shù)
		 */
		public int getCountryTotal() {
			return countryTotal;
		}

		/**
		 * @param cityName  依據(jù)傳入的城市名稱,拿到城市總數(shù)
		 * @return
		 */

		public int getCityTotal(String cityName) {
			return cityStats.get(cityName).get(COUNT_INDEX).intValue();
		}

		
		public String toString() {
			return "Total Count for " + countryName + " is "
					+ Integer.toString(countryTotal) + "\n" + "Cities:  "
					+ cityStats.toString();
		}
	}

	private OutputCollector collector;

	// CountryStats 是一個內(nèi)部類的對象
	private Map<String, CountryStats> stats = new HashMap<String, CountryStats>();

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {
		String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
		String city = tuple.getStringByField(storm.cookbook.Fields.CITY);

		// 如果國家不存在的時候,新增加一個國家,國家的統(tǒng)計
		if (!stats.containsKey(country)) {
			stats.put(country, new CountryStats(country));
		}

		// 這里拿到新的統(tǒng)計,cityFound 是拿到某個城市的值
		stats.get(country).cityFound(city);

		collector.emit(new Values(country,
				stats.get(country).getCountryTotal(), city, stats.get(country)
						.getCityTotal(city)));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.COUNTRY_TOTAL,
				storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
	}
}

有關(guān)地理位置的統(tǒng)計,附帶上程序其他的使用類

package storm.cookbook;

/**
 */
public class Fields {

	public static final String IP = "ip";
	
	public static final String URL = "url";
	
	public static final String CLIENT_KEY = "clientKey";
	
	public static final String COUNTRY = "country";
	
	public static final String COUNTRY_NAME = "country_name";
	
	public static final String CITY = "city";
	
	//唯一的,獨一無二的
	public static final String UNIQUE = "unique";
	
	//城鎮(zhèn)整數(shù)
	public static final String COUNTRY_TOTAL = "countryTotal";
	
	//城市整數(shù)
	public static final String CITY_TOTAL = "cityTotal";
	
	//總共計數(shù)
	public static final String TOTAL_COUNT = "totalCount";
	
	//總共獨一無二的
	public static final String TOTAL_UNIQUE = "totalUnique";




}
package storm.cookbook;

import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;

public class HttpIPResolver implements IPResolver, Serializable {

	static String url = "http://api.hostip.info/get_json.php";

	@Override
	public JSONObject resolveIP(String ip) {
		URL geoUrl = null;
		BufferedReader in = null;
		try {
			geoUrl = new URL(url + "?ip=" + ip);
			URLConnection connection = geoUrl.openConnection();
			in = new BufferedReader(new InputStreamReader(
					connection.getInputStream()));
			String inputLine;

			JSONObject json = (JSONObject) JSONValue.parse(in);

			in.close();

			return json;
		} catch (IOException e) {
			e.printStackTrace();
		} finally {

			// 每當in為空的時候我們不進行如下的close操作,只有在in不為空的時候進行close操作
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
				}
			}
		}
		return null;
	}
}
package storm.cookbook;

import org.json.simple.JSONObject;

/**
 * Created with IntelliJ IDEA.
 * User: admin
 * Date: 2012/12/07
 * Time: 5:29 PM
 * To change this template use File | Settings | File Templates.
 */
public interface IPResolver {

	public JSONObject resolveIP(String ip);
}

到此,相信大家對“Storm流方式的統(tǒng)計系統(tǒng)怎么實現(xiàn)”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI