溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

windows如何安裝storm eclipse調(diào)試TopN實例

發(fā)布時間:2021-11-12 11:48:02 來源:億速云 閱讀:138 作者:小新 欄目:云計算

這篇文章將為大家詳細(xì)講解有關(guān)windows如何安裝storm eclipse調(diào)試TopN實例,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

一:安裝JD

配置Java環(huán)境變量 JAVA_HOME、Path、CLASSPATH三個值分別為(按照自己安裝狀況設(shè)置,此處供參考):

D:\java\jdk1.8

%JAVA_HOME%/bin;%JAVA_HOME%/jre/bin

.;%JAVA_HOME%/lib/dt.jar;%JAVA_HOME%/lib/tools.jar (要加.表示當(dāng)前路徑)

二:安裝 Python

這是為了測試安裝效果,我們將部署 storm-starter project案例中word coun程序,用的是python寫的multi-lang bolt,使用python 2.7.11,安裝路徑在:

C:\Python27\

三:安裝并運行ZooKeeper

Download Apache Zookeeper 3.4.8 ,解壓配置:

> cd zookeeper-3.4.8 
> copy conf\zoo_sample.cfg conf\zoo.cfg 
> .\bin\zkServer.cmd

四:安裝Storm

Storm的windows官方版還沒有釋放,here.下載,源碼here下載。

注意1:

源碼一定要用這個版本,否則啟動會報各種錯誤,而這些錯誤和 jdk、python、zookeeper、eclipse 版本都無關(guān)。

http://dl.dropboxusercontent.com/s/iglqz73chkul1tu/storm-0.9.1-incubating-SNAPSHOT-12182013.zip

配置Storm環(huán)境變量

  • Storm需要STORM_HOME和JAVA_HOME,比如STORM_HOME為:

C:\storm-0.9.1-incubating-SNAPSHOT-12182013\

  • 在PATH中加入:

%STORM_HOME%\bin;C:\Python27\Lib\site-packages\;C:\Python27\Scripts\

此處與參考文章略有不同,下圖是參考文章給出的配置

windows如何安裝storm eclipse調(diào)試TopN實例

 JAVA_HOME已經(jīng)在安裝JDK時手動配置了環(huán)境變量,而Python好像是默認(rèn)自動就會配置好環(huán)境變量的,

我的Python目錄下沒有Scripts文件夾,暫時這樣配置就可以了,不影響下面的使用。

五:啟動Nimbus, Supervisor, and Storm UI Daemons

  • Nimbus

注意2:

一定要在 STORM_HOME 目錄下執(zhí)行后續(xù)命令,否則會報錯。

ERROR backtype.storm.event - Error when processing event
java.lang.RuntimeException: java.io.InvalidClassException: clojure.lang.APersistentMap; local class incompatible: stream classdesc serialVersionUID = 8648225932767613808, local class serialVersionUID = 270281984708184947
        at backtype.storm.utils.Utils.deserialize(Utils.java:86) ~[storm-core-0.9.1-incubating-SNAPSHOT-12182013.jar:na]

> cd %STORM_HOME%      

> storm nimbus

  • Supervisor

> cd %STORM_HOME% 

> storm supervisor

  • Storm UI                          # 可選,也可以用 storm list 查看所有 storm 任務(wù)

> cd %STORM_HOME% 

> storm ui

瀏覽器打開http://localhost:8080/ 可看到Storm運行。

六:部署 Word count

下載download a pre-built jar。

部署這個jar在本地:

> storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost

如果你刷新 Storm UI頁面,會看到 “WordCount” topology顯示列出,點按鏈接確認(rèn)它處理數(shù)據(jù)。

七:eclipse 調(diào)試 TopN 實例

storm 求 csdn 密碼庫中密碼出現(xiàn)的 topN,并直接在 eclipse 中調(diào)試運行:

package com.bj.test.top10;

/** 
* @Author:tester 
* @DateTime:2016年6月21日 下午7:58:45 
* @Description:  Spout作為數(shù)據(jù)源,它實現(xiàn)了IRichSpout接口,功能是讀取一個文本文件并把它的每一行內(nèi)容發(fā)送給bolt。
* @Version:1.0
*/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class PasswdSpout extends BaseRichSpout {

	private SpoutOutputCollector collector;
	private FileReader fileReader;
	private boolean completed = false;

	public void ack(Object msgId) {
		System.out.println("==============OK:" + msgId);
	}

	public void close() {
	}

	public void fail(Object msgId) {
		System.out.println("++++++++++++++FAIL:" + msgId);
	}

	/** 
     * 這是Spout最主要的方法,在這里我們讀取文本文件,并把它的每一行發(fā)射出去(給bolt) 
     * 這個方法會不斷被調(diào)用,為了降低它對CPU的消耗,當(dāng)任務(wù)完成時讓它sleep一下 
     * **/  
	public void nextTuple() {
		/**
		 * The nextuple it is called forever, so if we have been readed the file
		 * we will wait and then return
		 */
		if (completed) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// Do nothing
			}
			return;
		}
		String line;
		// Open the reader
		BufferedReader reader = new BufferedReader(fileReader);
		try {
			// Read all lines
			while ((line = reader.readLine()) != null) {
				String[] words = line.split("#");
				String passwd = words[1].trim();
				// Emit the word  
                collector.emit(new Values(passwd));
		        /*for(String word : words){
		            word = word.trim();
		            if(!word.isEmpty()){
		                word = word.toLowerCase();
		                // Emit the word  
		                collector.emit(new Values(word));
		            }
		        }*/
			}
		} catch (Exception e) {
			throw new RuntimeException("Error reading tuple", e);
		} finally {
			completed = true;
		}
	}

	/** 
     * 這是第一個方法,里面接收了三個參數(shù),第一個是創(chuàng)建Topology時的配置, 
     * 第二個是所有的Topology數(shù)據(jù),第三個是用來把Spout的數(shù)據(jù)發(fā)射給bolt 
     * **/  
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		try {
			//獲取創(chuàng)建Topology時指定的要讀取的文件路徑 
			this.fileReader = new FileReader(conf.get("wordsFile").toString());
		} catch (FileNotFoundException e) {
			throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]");
		}
		//初始化發(fā)射器  
		this.collector = collector;
	}

	/**
	 * Declare the output field "word"
	 */
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}


/////////////////////////////////////////////////////////////////////////////////////////////

package com.bj.test.top10;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

import static com.bj.test.top10.SortMapByValue.*;

public class Top10Bolt extends BaseBasicBolt {

	Integer id;
	String name;
	NavigableMap<String, Integer> counters;

	/** 
     * Topology執(zhí)行完畢的清理工作,比如關(guān)閉連接、釋放資源等操作都會寫在這里 
     * 因為這只是個Demo,我們用它來打印我們的計數(shù)器 
     * */  
	@Override
	public void cleanup() {
		System.out.println(">>>>>>>>>>>> Word Counter ["+name+"-"+id+"] <<<<<<<<<<<");
		/*for(Map.Entry<String, Integer> entry : counters.entrySet()){
			System.out.println(entry.getKey()+": "+entry.getValue());
		}*/
		printMap(list2Map(sortMapByValuesTopN(counters, 10)));
	}

	/**
	 * On create 
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		this.counters = new TreeMap<String, Integer>().descendingMap();
		this.name = context.getThisComponentId();
		this.id = context.getThisTaskId();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {}

	//	Bolt中最重要的是execute方法,每當(dāng)一個tuple傳過來時它便會被調(diào)用
	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String word = input.getString(0);
		/**
		 * If the word dosn't exist in the map we will create
		 * this, if not We will add 1 
		 */
		if(!counters.containsKey(word)){
			counters.put(word, 1);
		}else{
			Integer count = counters.get(word) + 1;
			counters.put(word, count);
		}
	}
}


/////////////////////////////////////////////////////////////////////////////////////////////

package com.bj.test.top10;

/** 
* @Author:tester 
* @DateTime:2016年6月21日 下午7:52:32 
* @Description: 
* @Version:1.0
*/
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TopologyMain {
	public static void main(String[] args) throws InterruptedException {

		// 定義一個Topology
		TopologyBuilder builder = new TopologyBuilder();
		// executor的數(shù)目, set parallelism hint to 4
		builder.setSpout("PasswdSpout", new PasswdSpout(), 1);
		// set tasks number to 4
		builder.setBolt("Top10Bolt", new Top10Bolt(), 1).setNumTasks(1).fieldsGrouping("PasswdSpout",
				new Fields("word"));

		// 配置
		Config conf = new Config();
		conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.100.sql");
		//		conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.sql");
		conf.setDebug(false);
		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
		// use two worker processes
		// conf.setNumWorkers(4);

		// 創(chuàng)建一個本地模式cluster
		LocalCluster cluster = new LocalCluster();
		// 提交Topology
		cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
		Thread.sleep(1000);
		cluster.shutdown();
	}
}

windows如何安裝storm eclipse調(diào)試TopN實例

關(guān)于“windows如何安裝storm eclipse調(diào)試TopN實例”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI