溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hadoop rpc服務端初始化和調用過程舉例分析

發(fā)布時間:2021-12-10 09:25:30 來源:億速云 閱讀:142 作者:iii 欄目:云計算

本篇內容介紹了“hadoop rpc服務端初始化和調用過程舉例分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

rpc服務端的初始化

上面已經提到我們這里主要借用了namenode的遠程服務,先來看看相關代碼:

public class NameNode implements NameNodeStatusMXBean {
public static void main(String argv[]) throws Exception {
		NameNode namenode = createNameNode(argv, null);
}	

protected NameNode(Configuration conf, NamenodeRole role)throws IOException { 
		initialize(conf);
}

protected void initialize(Configuration conf) throws IOException {
		rpcServer = createRpcServer(conf);

		startCommonServices(conf); //相當重要
}

protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException {
		return new NameNodeRpcServer(conf, this);
    }
}

我們的linux的終端執(zhí)行hadoop的啟動命令的時候,最終的命令是調用NameNode的main方法,所以我們追蹤代碼的切入點是NameNode的main方法,方法比較簡單,就是調用NameNode的構造函數創(chuàng)建一個NameNode,然后執(zhí)行初始化方法initialize,這個方法相對來說,是我們關注的重點,包括rpc服務在內的初始化操作都放在這個方法里面。特定于rpc,他執(zhí)行了兩個相關的方法createRpcServer和startCommonServices,第一個方法見名思意,不多說,先簡單介紹下后面的方法,該方法的作用就是啟動namenode的rpc服務,稍后我給出代碼。好的,從上面的代碼可以看到,我們的rpcServer功能都放在了類NameNodeRpcServer里面,現(xiàn)在讓我們來看看這個類里面相關的代碼:

class NameNodeRpcServer implements NamenodeProtocols {
public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {    
    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
        ProtobufRpcEngine.class);

    ClientNamenodeProtocolServerSideTranslatorPB 
       clientProtocolServerTranslator = 
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
     BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); // fs.defaultFS
    String bindHost = nn.getRpcServerBindHost(conf);
    if (bindHost == null) {
      bindHost = rpcAddr.getHostName();
    }
    LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());

    this.clientRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService).setBindAddress(bindHost)
        .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();

    // Add all the RPC protocols that the namenode implements
    DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
 }
}

在NameNodeRpcServer的構造函數里面最重要的一件事情是實例化clientRpcServer,這里面我最想說明的是,NameNode宣稱自己實現(xiàn)了三個協(xié)議:ClientProtocol、DatanodeProtocol和NamenodeProtocol,在服務端的實現(xiàn)基本上就靠ClientNamenodeProtocolServerSideTranslatorPB之類的類型了,特別在實例化ClientNamenodeProtocolServerSideTranslatorPB的時候有傳入一個形參,這個形參就是NameNodeRpcServer實例,看代碼:

public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server)
      throws IOException {
    this.server = server;
  }

  @Override
  public GetBlockLocationsResponseProto getBlockLocations(
      RpcController controller, GetBlockLocationsRequestProto req)
      throws ServiceException {
    try {
      LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
          req.getLength());
      Builder builder = GetBlockLocationsResponseProto
          .newBuilder();
      if (b != null) {
        builder.setLocations(PBHelper.convert(b)).build();
      }
      return builder.build();
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

上面代碼中的getBlockLocations也一定程度上說明了剛才的觀點。

現(xiàn)在讓我們回過頭看看NameNode中initialize方法中執(zhí)行的startCommonServices方法,這個方法用來啟動clientRpcServer下面的線程,包括listener,handler、response,具體看代碼: 

public class NameNode implements NameNodeStatusMXBean {
private void startCommonServices(Configuration conf) throws IOException {
	rpcServer.start();
}
}

class NameNodeRpcServer implements NamenodeProtocols {
 void start() {
    clientRpcServer.start();
    if (serviceRpcServer != null) {
      serviceRpcServer.start();      
    }
  }
}

public abstract class Server {
  public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }
}

代碼看到這里,啟動過程中rpc相關的代碼就結束了。

rpc服務端的調用過程

現(xiàn)在讓我們來看看rpc被調用的過程,先來認識下Server的關鍵結構:

public abstract class Server {
  private Listener listener = null;
  private Responder responder = null;
  private Handler[] handlers = null;

  private class Responder extends Thread {

  }

  private class Listener extends Thread {

  }

  private class Handler extends Thread {

  }
}

在初始化的時候,就啟動listener、responder和handlers下面的所有線程。

其中l(wèi)istener線程里面啟動了一個socker服務,專門用來接受客戶端的請求,handler下面的線程用來處理具體的請求,responder寫請求結果,具體過程可以看下下面的代碼:

public abstract class Server {
  private Listener listener = null;
  private Responder responder = null;
  private Handler[] handlers = null;

  private class Listener extends Thread {
public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

public void run() {
		while (running) {
			doAccept(key);
		}
}

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
        Reader reader = getReader();
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
    }

private class Reader extends Thread {
	public void run() {
		doRunLoop();
	}

	private synchronized void doRunLoop() {
		while (running) {
			Connection conn = pendingConnections.take();
              	conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
		}
		readSelector.select();
		doRead(key);
	}

	void doRead(SelectionKey key) throws InterruptedException {
		Connection c = (Connection)key.attachment();
		count = c.readAndProcess();
	}
}
  }

  public class Connection {
public int readAndProcess(){
	processOneRpc(data.array());
}

private void processOneRpc(byte[] buf){
	processRpcRequest(header, dis);
}

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
	 Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
              .getClientId().toByteArray());
      callQueue.put(call);
}
  }

  private class Handler extends Thread {
public void run() {
	final Call call = callQueue.take();
	value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                           call.timestamp);

	setupResponse(buf, call, returnStatus, detailedErr, 
                value, errorClass, error);

	responder.doRespond(call);
}
  }

  private class Responder extends Thread {
void doRespond(Call call) throws IOException {
	processResponse(call.connection.responseQueue, true);
}

private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {
	int numBytes = channelWrite(channel, call.rpcResponse);

	done = true;
}
  }
}

這里給出了一個比較完整版Server的rpc調用過程,從listener都構造函數開始,在他的構造函數中起了幾個reader線程,當監(jiān)聽器收到訪問請求的時候,由reader請請求中讀取數據,reader中實際上調用的是connection的readAndProcess方法,在這個方法中,會往RPC server中的callQueue添加call對象,之后,handler這個家伙從隊列中取出當前call,具體的處理過程,用到了Server類的call方法,這地方有些玄機,仔細跟過代碼的人才知道,因為server的實例類不再是org.apache.hadoop.ipc.Server,而是Protobuf的一個實現(xiàn)類,org.apache.hadoop.ipc.RPC.Server,而且call方法是被重寫過的,代碼如下:

@Override
    public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
    }

繼續(xù)追蹤下,差不多就可以到底了:

public class ProtobufRpcEngine implements RpcEngine {
public static class Server extends RPC.Server {
	static class ProtoBufRpcInvoker implements RpcInvoker {
		public Writable call(RPC.Server server, String protocol,
          		Writable writableRequest, long receiveTime) throws Exception {
        	ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,clientVersion);
        	BlockingService service = (BlockingService) protocolImpl.protocolImpl;

          	result = service.callBlockingMethod(methodDescriptor, null, param);
          
        	return new RpcResponseWrapper(result);
	}
}
}

這部分的代碼也正是hadoop rpc與protobuf結合的地方,這地方在補充一點,protbufImpl就是NameNodeRpcServer初始化的時候,已經準備了,而且看懂ProtoBufRpcInvoker下的call方法,確實也是需要結合NameNodeRpcServer初始化過程來理解的。我朦朦朧朧的懂了。而且這地方的深入會讓你看到一些本質的東西,舉例的話,你會跟蹤到ClientNamenodeProtocolServerSideTranslatorPB,然后是NameNodeRpcServer,再然后是FSNamesystem,最后你發(fā)現(xiàn),服務端對文件系統(tǒng)的操作出自FSNamesystem。

繼續(xù)回到handler中的run方法,call方法調用完了,就輪到Responder處理返回結果了。

“hadoop rpc服務端初始化和調用過程舉例分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI