溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm中Thrift如何使用

發(fā)布時間:2021-08-05 17:29:19 來源:億速云 閱讀:157 作者:Leah 欄目:云計算

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)Storm中Thrift如何使用,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1 IDL

首先是storm.thrift, 作為IDL里面定義了用到的數(shù)據(jù)結(jié)構(gòu)和service 
然后backtype.storm.generated, 存放從IDL通過Thrift自動轉(zhuǎn)化成的Java代碼

比如對于nimbus service 
在IDL的定義為,

service Nimbus {

  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: stringjsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2:InvalidTopologyException ite);

  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3:string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyExceptionite);

  void killTopology(1: string name) throws (1: NotAliveException e);

  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);

  void activate(1: string name) throws (1: NotAliveException e);

  void deactivate(1: string name) throws (1: NotAliveException e);

  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyExceptionite);

  // need to add functions for asking aboutstatus of storms, what nodes they're running on, looking at task logs

string beginFileUpload();

  void uploadChunk(1: string location, 2: binary chunk);

  void finishFileUpload(1: string location);

string beginFileDownload(1: string file);

  //can stop downloading chunks when receive0-length byte array back

binary downloadChunk(1: string id);

  // returns json

string getNimbusConf();

  // stats functions

ClusterSummary getClusterInfo();

TopologyInfo getTopologyInfo(1: string id) throws (1:NotAliveException e);

  //returns json

string getTopologyConf(1: string id) throws (1:NotAliveException e);

  StormTopologygetTopology(1: string id) throws (1: NotAliveException e);

StormTopology getUserTopology(1: string id) throws (1:NotAliveException e);

}

而對應(yīng)在Nimbus.java的Java代碼如下,

public class Nimbus {

  public interface Iface {

    public void submitTopology(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException;

    public void killTopology(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void killTopologyWithOpts(String name,KillOptions options) throws NotAliveException,org.apache.thrift7.TException;

    public void activate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void deactivate(String name) throws NotAliveException, org.apache.thrift7.TException;

    public void rebalance(String name, RebalanceOptionsoptions) throws NotAliveException, InvalidTopologyException,org.apache.thrift7.TException;

    public String beginFileUpload() throwsorg.apache.thrift7.TException;

    public void uploadChunk(String location, ByteBufferchunk) throws org.apache.thrift7.TException;

    public void finishFileUpload(String location) throws org.apache.thrift7.TException;

    public String beginFileDownload(String file) throws org.apache.thrift7.TException;

    public ByteBuffer downloadChunk(String id) throws org.apache.thrift7.TException;

    public String getNimbusConf() throwsorg.apache.thrift7.TException;

    public ClusterSummary getClusterInfo() throwsorg.apache.thrift7.TException;

    public TopologyInfo getTopologyInfo(String id) throws NotAliveException, org.apache.thrift7.TException;

    public String getTopologyConf(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

    public StormTopology getUserTopology(String id) throws NotAliveException, org.apache.thrift7.TException;

  }

2 Client

1. 首先Get Client,

NimbusClient client =NimbusClient.getConfiguredClient(conf);

看看backtype.storm.utils下面的client.getConfiguredClient的邏輯, 
只是從配置中取出nimbus的host:port, 并new NimbusClient

    public static NimbusClient getConfiguredClient(Map conf) {

       try {

           String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);

           int nimbusPort =Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));

           return new NimbusClient(conf, nimbusHost, nimbusPort);

       } catch (TTransportException ex) {

           throw new RuntimeException(ex);

       }

    }

NimbusClient 繼承自ThriftClient, public class NimbusClient extends ThriftClient 
ThriftClient又做了什么? 關(guān)鍵是怎么進(jìn)行數(shù)據(jù)序列化和怎么將數(shù)據(jù)傳輸?shù)絩emote 
這里看出Thrift對Transport和Protocol的封裝 
對于Transport, 其實就是對Socket的封裝, 使用TSocket(host, port) 
然后對于protocol, 默認(rèn)使用TBinaryProtocol, 如果你不指定的話

    public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {

       try {

           //locate loginconfiguration

           Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);

           //construct atransport plugin

           ITransportPlugin  transportPlugin= AuthUtils.GetTransportPlugin(storm_conf, login_conf);

           //create a socketwith server

           if(host==null) {

                throw new IllegalArgumentException("host is not set");

           }

           if(port<=0) {

                throw new IllegalArgumentException("invalid port: "+port);

           }            

           TSocket socket = new TSocket(host, port);

           if(timeout!=null) {

                socket.setTimeout(timeout);

           }

           final TTransport underlyingTransport = socket;

           //establishclient-server transport via plugin

           _transport = transportPlugin.connect(underlyingTransport, host);

       } catch (IOException ex) {

           throw new RuntimeException(ex);

       }

       _protocol = null;

        if (_transport != null)

           _protocol = new TBinaryProtocol(_transport);

    }

2. 調(diào)用任意RPC 
那么就看看submitTopologyWithOpts

client.getClient().submitTopologyWithOpts(name,submittedJar, serConf, topology, opts);

可以看出上面的Nimbus的interface里面有這個方法的定義, 而且Thrift不僅僅自動產(chǎn)生java interface, 而且還提供整個RPC client端的實現(xiàn)

    public void submitTopologyWithOpts(String name, StringuploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptionsoptions) throws AlreadyAliveException,InvalidTopologyException, org.apache.thrift7.TException

    {

     send_submitTopologyWithOpts(name, uploadedJarLocation, jsonConf,topology, options);

     recv_submitTopologyWithOpts();

    }

分兩步, 
首先send_submitTopologyWithOpts, 調(diào)用sendBase 
接著, recv_submitTopologyWithOpts, 調(diào)用receiveBase

  protected void sendBase(String methodName, TBase args) throws TException {

   oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL,++seqid_));

   args.write(oprot_);

   oprot_.writeMessageEnd();

   oprot_.getTransport().flush();

  }

  protected void receiveBase(TBase result, String methodName)throws TException {

   TMessage msg = iprot_.readMessageBegin();

    if (msg.type == TMessageType.EXCEPTION) {

     TApplicationException x = TApplicationException.read(iprot_);

      iprot_.readMessageEnd();

      throw x;

    }

    if (msg.seqid != seqid_) {

      throw newTApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName +" failed: out ofsequence response");

    }

   result.read(iprot_);

   iprot_.readMessageEnd();

  }

可以看出Thrift對protocol的封裝, 不需要自己處理序列化, 調(diào)用protocol的接口搞定 

3 Server

Thrift強(qiáng)大的地方是, 實現(xiàn)了整個協(xié)議棧而不光只是IDL的轉(zhuǎn)化, 對于server也給出多種實現(xiàn) 
下面看看在nimbus server端, 是用clojure來寫的 
可見其中使用Thrift封裝的NonblockingServerSocket, THsHaServer,TBinaryProtocol, Proccessor, 非常簡單 
其中processor會使用service-handle來處理recv到的數(shù)據(jù), 所以作為使用者只需要在service-handle中實現(xiàn)Nimbus$Iface, 其他和server相關(guān)的, Thrift都已經(jīng)幫你封裝好了, 這里使用的IDL也在backtype.storm.generated, 因為clojure基于JVM所以IDL只需要轉(zhuǎn)化成Java即可.

(defn launch-server! [conf nimbus]

(validate-distributed-mode! conf)

  (let[service-handler (service-handler conf nimbus)

       options (-> (TNonblockingServerSocket. (int (confNIMBUS-THRIFT-PORT)))

                    (THsHaServer$Args.)

                    (.workerThreads 64)

                    (.protocolFactory (TBinaryProtocol$Factory.))

                    (.processor(Nimbus$Processor. service-handler))

                    )

    (.addShutdownHook(Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stopserver))))

   (log-message "StartingNimbus server...")

   (.serve server)))

上述就是小編為大家分享的Storm中Thrift如何使用了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI