溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm中進(jìn)行簡單的壓力測試代碼怎么寫

發(fā)布時間:2021-12-01 14:29:47 來源:億速云 閱讀:114 作者:柒染 欄目:云計(jì)算

本篇文章給大家分享的是有關(guān)   Storm中進(jìn)行簡單的壓力測試代碼怎么寫,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

代碼比較簡單,看圖說話:

package storm.benchmark;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;

public class ThroughputTest {
    public static class GenSpout extends BaseRichSpout {
        private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'};
        
        SpoutOutputCollector _collector;
        int _size;
        Random _rand;
        String _id;
        String _val;
        
        public GenSpout(int size) {
            _size = size;
        }
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            _rand = new Random();
            _id = randString(5);
            _val = randString(_size);
        }

        @Override
        public void nextTuple() {
            _collector.emit(new Values(_id, _val));
            
        }

        private String randString(int size) {
            StringBuffer buf = new StringBuffer();
            for(int i=0; i<size; i++) {
                buf.append(CHARS[_rand.nextInt(CHARS.length)]);
            }
            return buf.toString();
        }
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "item"));
        }        
    }
    
    public static class IdentityBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "item"));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            collector.emit(tuple.getValues());
        }        
    }

    public static class CountBolt extends BaseBasicBolt {
        int _count;
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("count"));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            _count+=1;
            collector.emit(new Values(_count));
        }        
    }
    
    public static class AckBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
        }       
    }
    
    
    //storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000
    public static void main(String[] args) throws Exception {
        int size = Integer.parseInt(args[1]);
        int workers = Integer.parseInt(args[2]);
        int spout = Integer.parseInt(args[3]);
        int bolt = Integer.parseInt(args[4]);        
        int maxPending = Integer.parseInt(args[5]);
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new GenSpout(size), spout);
//        builder.setBolt("count", new CountBolt(), bolt)
//                .fieldsGrouping("bolt", new Fields("id"));
//        builder.setBolt("bolt", new IdentityBolt(), bolt)
//                .shuffleGrouping("spout");
        builder.setBolt("bolt2", new AckBolt(), bolt)
               .shuffleGrouping("spout");
//        builder.setBolt("count2", new CountBolt(), bolt)
//                .fieldsGrouping("bolt2", new Fields("id"));
        
        Config conf = new Config();
        conf.setNumWorkers(workers);
        //conf.setMaxSpoutPending(maxPending);
        conf.setNumAckers(0);
        conf.setStatsSampleRate(0.0001);
        //topology.executor.receive.buffer.size: 8192 #batched
        //topology.executor.send.buffer.size: 8192 #individual messages
        //topology.transfer.buffer.size: 1024 # batched
        
        //conf.put("topology.executor.send.buffer.size", 1024);
        //conf.put("topology.transfer.buffer.size", 8);
        //conf.put("topology.receiver.buffer.size", 8);
        //conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n");
        
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
}

以上就是   Storm中進(jìn)行簡單的壓力測試代碼怎么寫,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI