溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

hadoop rpc客戶端初始化和調(diào)用過程怎么實現(xiàn)

發(fā)布時間:2021-12-10 09:26:33 來源:億速云 閱讀:100 作者:iii 欄目:云計算

這篇文章主要介紹“hadoop rpc客戶端初始化和調(diào)用過程怎么實現(xiàn)”,在日常操作中,相信很多人在hadoop rpc客戶端初始化和調(diào)用過程怎么實現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”hadoop rpc客戶端初始化和調(diào)用過程怎么實現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

DFSClient的初始化

DFSClient的初始化主要看其構(gòu)造函數(shù),其中rpc部分我們主要關(guān)注屬性final ClientProtocol namenode,DFSClient的文件系統(tǒng)操作都是由他代理完成,構(gòu)造函數(shù)中的關(guān)鍵代碼如下:

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {
	proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class);
 	this.dtService = proxyInfo.getDelegationTokenService();
 	this.namenode = proxyInfo.getProxy();
}

顯然,DFSClient中的namenode是一個代理類。

接著NameNodeProxies類的createProxy方法,下面給出了NameNodeProxies中需要用到的一些方法:

public class NameNodeProxies {
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
      	URI nameNodeUri, Class<T> xface) throws IOException {
		return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
          		UserGroupInformation.getCurrentUser(), true);
}

public static <T> ProxyAndInfo<T> createNonHAProxy(
      	Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
      		UserGroupInformation ugi, boolean withRetries) throws IOException {
		proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
		return new ProxyAndInfo<T>(proxy, dtService);
}

/**
	這部分是重點
*/
private static ClientProtocol createNNProxyWithClientProtocol(
      	InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
      		boolean withRetries) throws IOException {
		ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
       		ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        			NetUtils.getDefaultSocketFactory(conf),
        				org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy).getProxy();

		proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
          		ClientNamenodeProtocolPB.class,
          			new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
              			ClientNamenodeProtocolPB.class, proxy),
          		methodNameToPolicyMap,
          		defaultPolicy);

		return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}

該類中前面兩個方法做跳轉(zhuǎn)用,直接看createNNProxyWithClientProtocol方法,這里兩行很關(guān)鍵的代碼,proxy實例的初始化,這里先提示注意前一行中的getProxy() 對于這個方法是需要注意的,這樣也保證了類型的一致。

這時候就不得不調(diào)出RPC這個類來看看他是怎么生成proxy的實例的了,看代碼:ProtobufRpcEngineProtobufRpcEngineProtobufRpcEngineProtobufRpcEngine

public class RPC {
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
                                long clientVersion,
                                InetSocketAddress addr,
                                UserGroupInformation ticket,
                                Configuration conf,
                                SocketFactory factory,
                                int rpcTimeout,
                                RetryPolicy connectionRetryPolicy) throws IOException {    
    	if (UserGroupInformation.isSecurityEnabled()) {
      	SaslRpcServer.init(conf);
    	}
   	return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
  }
}


RPC中還是需要進一步的跳轉(zhuǎn),但是這里需要注意,getProtocolEngine這個方法,這里做一個說明,查看
RpcEngine的依賴,看圖: hadoop rpc客戶端初始化和調(diào)用過程怎么實現(xiàn) 在我的2.4.1的hadoop的版本中,hadoop的序列化框架已經(jīng)用了Protobuf,所以getProtocolEngine方法得到的是ProtobufRpcEngine類的一個實例,那好,我們進一步跟蹤ProtobufRpcEngine類的getProxy方法,看代碼:

public class ProtobufRpcEngine implements RpcEngine {
	public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
      	InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
      	SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
      	) throws IOException {
    	final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
        	rpcTimeout, connectionRetryPolicy);
    	return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
        	protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
  	}
}

對java的動態(tài)代理有點了解的人看到Proxy.newProxyInstance這個方法應(yīng)該都很清楚這就是生成一個遠(yuǎn)程代理類實例(特別注意:在NameNodeProxies類的createNNProxyWithClientProtocol方法中g(shù)etProxy方法拿到的對象也就是這個對象),其中的invoker參數(shù),確實我們不能忽略的,因為他暗藏玄機,java的動態(tài)代理中,invoker的類需要實現(xiàn)InvocationHandler接口,該接口只聽過一個方法invoke,共代理類使用,及通過Proxy.newProxyInstance生成的代理類,在使用的時候是通過InvocationHandler的invoke方法來起作用的。好吧,現(xiàn)在我們可以順便看看在ProtobufRpcEngine類的getProxy方法中invoker局部變量的類依賴圖:,顯然有剛才提到的實現(xiàn)關(guān)系,現(xiàn)在再讓我們看看Invoker的內(nèi)部,包括構(gòu)造函數(shù)和invoke方法:

private Invoker(Class<?> protocol, Client.ConnectionId connId,
        Configuration conf, SocketFactory factory) {
      this.remoteId = connId;
      this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
      this.protocolName = RPC.getProtocolName(protocol);
      this.clientProtocolVersion = RPC
          .getProtocolVersion(protocol);
    }

public Object invoke(Object proxy, Method method, Object[] args)
        throws ServiceException {
	   val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
}

在構(gòu)造函數(shù)請注意一個屬性client,他的類型正式 org.apache.hadoop.ipc.Client,而且在invoke方法中發(fā)起遠(yuǎn)程調(diào)用的正是這個client屬性,能夠讀到這里的同學(xué),相信應(yīng)該比較清楚了,在DFSClient中發(fā)起遠(yuǎn)程訪問的就是這個Client類的實例。

關(guān)于DFSClient的初始化階段中關(guān)于rpc的部分,總結(jié)一句,就是創(chuàng)建一個namenode的代理對象,供后續(xù)的文件系統(tǒng)操作調(diào)用。

DFSClient的getFileLinkInfo方法

DFSClient提供了相當(dāng)豐富的API供客戶端操作hadoop的文件系統(tǒng),這里以 getFileLinkInfo為例,講解rpc客戶端的調(diào)用過程。注意:如果是FileSystem類的話,請使用方法getFileLinkStatus,他對DFSClient提供的getFileLinkInfo做了一層包裝,僅此而已。

直接看DFSClient中的代碼:

public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
    checkOpen();
    try {
      return namenode.getFileLinkInfo(src);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     UnresolvedPathException.class);
     }
   }


很簡答的一行代碼,通過namenode屬性的調(diào)用操作完成,看了DFSClient的初始化過程,我們很容易知道namenode的實例化類是ClientNamenodeProtocolTranslatorPB,繼續(xù)看調(diào)用過程,代碼轉(zhuǎn)到了ClientNamenodeProtocolTranslatorPB中:

@Override
  public HdfsFileStatus getFileLinkInfo(String src)
      throws AccessControlException, UnresolvedLinkException, IOException {
    GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
        .setSrc(src).build();
    try {
      GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);
      return result.hasFs() ?  
          PBHelper.convert(rpcProxy.getFileLinkInfo(null, req).getFs()) : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }
  }


這時候我們會發(fā)現(xiàn)一個屬性rpcProxy,再回過頭看看NameNodeProxies類的createProxy方法,我們就可以很清楚的知道,rpcProxy就是那個能發(fā)起遠(yuǎn)程調(diào)用的代理類,它封裝了Invoker對象,當(dāng)然就也有了使用Client類的能力,很好,這里我們稍微總結(jié)下,在DFSClient類中,調(diào)用getFileLinkInfo方法,最終就是通過Client的call方法,發(fā)起遠(yuǎn)程訪問,獲取數(shù)據(jù)。

這時候,我們可以進一步來探討下Hadoop中RPC的Client類了,下面我把Client類主要的部分抽取出來了,看下面的代碼:

public class Client {
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
    		return new Call(rpcKind, rpcRequest);
    }

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      	ConnectionId remoteId, int serviceClass) throws IOException {
		final Call call = createCall(rpcKind, rpcRequest);

    		Connection connection = getConnection(remoteId, call, serviceClass);

		connection.sendRpcRequest(call);                 // send the rpc request

		return call.getRpcResponse();
}

private class Connection extends Thread {
		private void receiveRpcResponse() {
			
		}

		public void sendRpcRequest(final Call call)
        		throws InterruptedException, IOException {

		}
}
}

看了DFSclient的初始化部分,我們就可以知道,DFSClient的遠(yuǎn)程調(diào)用,是通過Client的call方法起作用的。其實Client的call方法已經(jīng)很能夠說明問題了,先封裝一個call,然后獲取連接,再得到結(jié)果。簡單的說Client就是這樣了??梢栽谏晕?fù)雜一點,在Client的call方法中,封裝了call后,getConnection的方法不僅是獲取一個連接,同時會啟動連接代表的線程,這個線程的作用就是等待請求的完成,完成后,將結(jié)果寫到call中(該過程天內(nèi)各國Connection的receiveRpcRespoce方法完成),在call方法中獲取連接后,會發(fā)送請求的參數(shù)到namenode的服務(wù)端,等待namenode處理完畢,Connection的receiveRpcRespoce方法寫返回結(jié)果,最后call方法中返回結(jié)果。大概的過程就是這個樣子了。

到此,關(guān)于“hadoop rpc客戶端初始化和調(diào)用過程怎么實現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI