溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Hadoop 學(xué)習(xí)之RPC的使用與實(shí)現(xiàn)原理

發(fā)布時(shí)間:2020-06-28 23:01:17 來源:網(wǎng)絡(luò) 閱讀:3948 作者:sbp810050504 欄目:建站服務(wù)器

hadoop作為分布式的系統(tǒng), 集群機(jī)器之間的通信是最基本,最常見的需求。
這種需求本質(zhì)上是IPC, 即進(jìn)程間通信。 按照傳統(tǒng)的UINX編程模型,進(jìn)程間通信無非是如下的幾種方式:
管道, FIFO, 消息隊(duì)列, 信號(hào)量, 共享存儲(chǔ), 套接字。只有套接字是可以跨機(jī)器的網(wǎng)絡(luò)通信, 能滿足hadoop的需求。

通常情況下, 網(wǎng)絡(luò)通信的程序使用顯式網(wǎng)絡(luò)編程(即直接使用java.net包)。比如Web瀏覽器, Web服務(wù)器等。
但也有另一部分程序使用隱式網(wǎng)絡(luò)編程, 比如利用hadoop RPC這種封裝了底層通信細(xì)節(jié)的工具包。

這樣做使得底層的網(wǎng)絡(luò)通信對(duì)于程序員透明。一則減輕了程序員的負(fù)擔(dān), 二則抽象了功能模塊, 使得模塊之間職責(zé)更清晰, 便于維護(hù)。

首先展示一個(gè)hadoop RPC功能demo, 了解hadoop RPC的用法。

step 1: 在pom.xml文件中添加依賴項(xiàng)

<dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-common</artifactId>
     <version>2.6.0</version>
 </dependency>

step 2: 在src/main/resources中添加log4j.properties

log4j.rootLogger=DEBUG,console
log4j.additivity.org.apache=true
# (console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

step 3: 定義RPC協(xié)議

package hadooprpc.demo;

import java.io.IOException;

interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
   // 版本號(hào),默認(rèn)情況下,不同版本號(hào)的 RPC Client 和 Server 之間不能相互通信
   public static final long versionID = 1L;
   String echo(String value) throws IOException;
   int add(int v1, int v2) throws IOException;
}

step 4: 實(shí)現(xiàn)RPC協(xié)議

package hadooprpc.demo;

import java.io.IOException;

import org.apache.hadoop.ipc.ProtocolSignature;

public  class ClientProtocolImpl implements ClientProtocol {
    // 重載的方法,用于獲取自定義的協(xié)議版本號(hào),
   public long getProtocolVersion(String protocol, long clientVersion) {
      return ClientProtocol.versionID;
   }

   // 重載的方法,用于獲取協(xié)議簽名
   public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
   int hashcode) {
      return new ProtocolSignature(ClientProtocol.versionID, null);
   }

   public String echo(String value) throws IOException {
      return value;
   }

   public int add(int v1, int v2) throws IOException {
      return v1 + v2;
   }
}

step 5; 構(gòu)造并啟動(dòng) RPC Server

package hadooprpc.demo;

import java.io.IOException;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;

public class RPCServer {
   public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
      Configuration conf = new Configuration();
      Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)
            .setInstance(new ClientProtocolImpl()).setBindAddress("localhost").setPort(8097)
            .setNumHandlers(5).build();
            server.start();
   }
}

step 6: 構(gòu)造 RPC Client 并發(fā)送 RPC 請(qǐng)求

package hadooprpc.demo;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class RPCClient {
   public static void main(String[] args) throws IOException {
      ClientProtocol client = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID,
                                                           new InetSocketAddress(8097), new Configuration());
      int result = client.add(5, 6);
      System.out.println(result);
      String echoResult = client.echo("result");
      System.out.println(echoResult);
      RPC.stopProxy(client); -- 關(guān)閉連接
   }
}

step 7: 啟動(dòng)RPC Server, 然后執(zhí)行 RPC Client.

通過上面的例子可以發(fā)現(xiàn), 通過這種編程方式,不用考慮網(wǎng)絡(luò)層的細(xì)節(jié),只需要編寫接口和接口實(shí)現(xiàn)即可。
問渠那得清如許?  通過hadoop RPC的源碼, 或許可以管中窺豹。

hadoop RPC的實(shí)現(xiàn)原理很簡(jiǎn)單。
Client:
1. 通過動(dòng)態(tài)代理,獲取到調(diào)用接口的方法,參數(shù)類型。
2. 將調(diào)用信息編碼,發(fā)送到服務(wù)器
3. 獲取服務(wù)器的返回值, 并解碼。
4. 返回調(diào)用方法的返回值。
Server:
1. 啟動(dòng)服務(wù)器, 并監(jiān)聽客戶端。
2. 獲取客戶端發(fā)送過來的調(diào)用方法, 參數(shù)。
3. 執(zhí)行實(shí)現(xiàn)類中相關(guān)的方法。
4. 將返回值發(fā)送到客戶端。

理解原理后, 自己動(dòng)手實(shí)現(xiàn)了一個(gè)非常粗糙, 只具備演示功能的RPC框架。

package srpc;

import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by shgy on 17-5-7.
 * 創(chuàng)建一個(gè)簡(jiǎn)單的PRC框架, 參數(shù)和返回值只支持基礎(chǔ)類型
 */

// 接口
interface MyProtocol{

    String echo(String msg);
    int add(int a,int b);
}

// 實(shí)現(xiàn)
class MyProtocolImp implements MyProtocol{

    @Override
    public String echo(String msg) {
        return msg;
    }

    @Override
    public int add(int a,int b){
        return a+b;
    }
}

public class ImitateRPC {
    private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
    static {
        PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
        PRIMITIVE_NAMES.put("byte", Byte.TYPE);
        PRIMITIVE_NAMES.put("char", Character.TYPE);
        PRIMITIVE_NAMES.put("short", Short.TYPE);
        PRIMITIVE_NAMES.put("int", Integer.TYPE);
        PRIMITIVE_NAMES.put("long", Long.TYPE);
        PRIMITIVE_NAMES.put("float", Float.TYPE);
        PRIMITIVE_NAMES.put("double", Double.TYPE);
        PRIMITIVE_NAMES.put("void", Void.TYPE);
    }
    public static class Server{


        private Class<?> protocolClass;

        private Object protocolImpl;

        private ServerSocket server;

        public  Server(Class<?> protocolClass, Object protocolImpl) throws IOException {

            if(protocolImpl == null){
                throw new IllegalArgumentException("protocolImpl is not set");
            }
            if (protocolClass == null) {
                throw new IllegalArgumentException("protocolClass is not set");

            } else {
                if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) {
                    throw new IOException("protocolClass "+ protocolClass +
                            " is not implemented by protocolImpl which is of class " +
                            protocolImpl.getClass());
                }
            }

            this.protocolClass = protocolClass;
            this.protocolImpl = protocolImpl;


        }

        public void start(){
            System.out.println("start server");
            try{
                this.server = new ServerSocket(8189);
                listen();
            }catch(Exception e){
                e.printStackTrace();
            }

        }


        public void close(){
            System.out.println("close server");
            try {
                this.server.close();
            } catch (IOException e) {}
        }

        private void listen(){
            new Thread(){
                @Override
                public void run(){
                    while (!server.isClosed()){

                        Socket incoming = null;
                        try {
                            incoming = server.accept();
                            DataInputStream inStream = new DataInputStream(incoming.getInputStream());

                            // 從客戶端讀取出 調(diào)用方法的信息
                            int dataLen = inStream.readInt();
                            byte[] data = new byte[dataLen];
                            inStream.read(data,0, dataLen);

                            DataInputStream contentStream = new DataInputStream(new ByteArrayInputStream(data));
                            String methodName = contentStream.readUTF();
                            int paramCount = contentStream.readInt();
                            Class<?>[] paramTypes = new Class<?>[paramCount];
                            Object[] args = new Object[paramCount];
                            for(int i=0;i<paramCount;i++){
                                String className = contentStream.readUTF();
                                Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
                                if(declaredClass == null){
                                    declaredClass = String.class;
                                }
                                paramTypes[i] = declaredClass;
                                if(declaredClass == String.class){
                                    args[i] = contentStream.readUTF();
                                }else if (declaredClass.isPrimitive()) {            // primitive types

                                    if (declaredClass == Boolean.TYPE) {             // boolean
                                        args[i] = Boolean.valueOf(contentStream.readBoolean());
                                    } else if (declaredClass == Character.TYPE) {    // char
                                        args[i] = Character.valueOf(contentStream.readChar());
                                    } else if (declaredClass == Byte.TYPE) {         // byte
                                        args[i] = Byte.valueOf(contentStream.readByte());
                                    } else if (declaredClass == Short.TYPE) {        // short
                                        args[i] = Short.valueOf(contentStream.readShort());
                                    } else if (declaredClass == Integer.TYPE) {      // int
                                        args[i] = Integer.valueOf(contentStream.readInt());
                                    } else if (declaredClass == Long.TYPE) {         // long
                                        args[i] = Long.valueOf(contentStream.readLong());
                                    } else if (declaredClass == Float.TYPE) {        // float
                                        args[i] = Float.valueOf(contentStream.readFloat());
                                    } else if (declaredClass == Double.TYPE) {       // double
                                        args[i] = Double.valueOf(contentStream.readDouble());
                                    } else if (declaredClass == Void.TYPE) {         // void
                                        args[i] = null;
                                    } else {
                                        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
                                    }
                                }
                            }

                            Method method = protocolClass.getMethod(methodName, paramTypes);
                            Object obj = method.invoke(protocolImpl, args);
                            Class retType = method.getReturnType();

                            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                            DataOutput out = new DataOutputStream(buffer);
                            out.writeUTF(retType.getCanonicalName());
                            if(retType == String.class){
                                out.writeUTF((String)obj);
                            }else if (retType.isPrimitive()) {            // primitive types

                                if (retType == Boolean.TYPE) {             // boolean
                                    out.writeBoolean((Boolean)obj);
                                } else if (retType == Character.TYPE) {    // char
                                    out.writeChar((Character) obj);
                                } else if (retType == Byte.TYPE) {         // byte
                                    out.writeBoolean((Boolean)obj);
                                } else if (retType == Short.TYPE) {        // short
                                    out.writeShort((Short) obj);
                                } else if (retType == Integer.TYPE) {      // int
                                    out.writeInt((Integer) obj);
                                } else if (retType == Long.TYPE) {         // long
                                    out.writeLong((Long) obj);
                                } else if (retType == Float.TYPE) {        // float
                                    out.writeFloat((Float) obj);
                                } else if (retType == Double.TYPE) {       // double
                                    out.writeDouble((Double) obj);
                                } else if (retType == Void.TYPE) {         // void
                                } else {
                                    throw new IllegalArgumentException("Not a primitive: "+retType);
                                }
                            }
                            byte[] array = buffer.toByteArray();
                            //將返回結(jié)果寫回到客戶端
                            DataOutputStream outStream = new DataOutputStream(incoming.getOutputStream());
                            outStream.writeInt(array.length);
                            outStream.write(array);
                            outStream.flush();
                        }catch (SocketException e){
                        }catch (Exception e){
                            e.printStackTrace();
                        }finally {
                            try {
                                if(incoming!=null){ incoming.close();  }
                            }catch (Exception e){}
                        }

                    }
                }
            }.start();
        }
    }



    public static <T> T getProxy(Class<T> protocol){
        T proxy = (T)  Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol},
                new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                Class<?>[] paramTypes = method.getParameterTypes();
                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                DataOutput out = new DataOutputStream(buffer);
                out.writeUTF(method.getName());  //  寫入方法名
                out.writeInt(paramTypes.length); // 寫入?yún)?shù)個(gè)數(shù)

                // 寫入?yún)?shù)
                for (int i = 0; i < paramTypes.length; i++) {
                    Class<?> declaredClass = paramTypes[i];
                    Object instance = args[i];
                    String paramTypeName = declaredClass.getCanonicalName();
                    out.writeUTF(paramTypeName);
                    if (declaredClass == String.class) {
                        out.writeUTF((String)instance);
                    }else if (declaredClass.isPrimitive()) {     // primitive type

                        if (declaredClass == Boolean.TYPE) {        // boolean
                            out.writeBoolean(((Boolean) instance).booleanValue());
                        } else if (declaredClass == Character.TYPE) { // char
                            out.writeChar(((Character) instance).charValue());
                        } else if (declaredClass == Byte.TYPE) {    // byte
                            out.writeByte(((Byte) instance).byteValue());
                        } else if (declaredClass == Short.TYPE) {   // short
                            out.writeShort(((Short) instance).shortValue());
                        } else if (declaredClass == Integer.TYPE) { // int
                            out.writeInt(((Integer) instance).intValue());
                        } else if (declaredClass == Long.TYPE) {    // long
                            out.writeLong(((Long) instance).longValue());
                        } else if (declaredClass == Float.TYPE) {   // float
                            out.writeFloat(((Float) instance).floatValue());
                        } else if (declaredClass == Double.TYPE) {  // double
                            out.writeDouble(((Double) instance).doubleValue());
                        } else if (declaredClass == Void.TYPE) {    // void
                        } else {
                            throw new IllegalArgumentException("Not a primitive: " + declaredClass);
                        }
                    }else{
                        throw new IOException("Can't write: "+instance+" as "+declaredClass);
                    }
                }

                // 發(fā)送到服務(wù)器端
                byte[] array = buffer.toByteArray();

                Socket client = new Socket("127.0.0.1", 8189);
                try{
                   DataOutputStream outStream = new DataOutputStream(client.getOutputStream());
                    outStream.writeInt(array.length);
                    outStream.write(array);
                    outStream.flush();

                    // 讀取服務(wù)器的應(yīng)答
                    DataInputStream inStream = new DataInputStream(client.getInputStream());
                    int bufSize = inStream.readInt();
                    byte[] retArray = new byte[bufSize];
                    inStream.read(retArray,0, bufSize);

                    DataInputStream contentStream = new DataInputStream(new ByteArrayInputStream(retArray));
                    String retType = contentStream.readUTF();
                    Class retTypeClass = PRIMITIVE_NAMES.get(retType);
                    if(retTypeClass == null){
                        retTypeClass = String.class;
                    }
                    if(retTypeClass == String.class){
                        return contentStream.readUTF();
                    }else if (retTypeClass.isPrimitive()) {            // primitive types

                        if (retTypeClass == Boolean.TYPE) {             // boolean
                            return Boolean.valueOf(contentStream.readBoolean());
                        } else if (retTypeClass == Character.TYPE) {    // char
                            return Character.valueOf(contentStream.readChar());
                        } else if (retTypeClass == Byte.TYPE) {         // byte
                            return Byte.valueOf(contentStream.readByte());
                        } else if (retTypeClass == Short.TYPE) {        // short
                            return Short.valueOf(contentStream.readShort());
                        } else if (retTypeClass == Integer.TYPE) {      // int
                            return Integer.valueOf(contentStream.readInt());
                        } else if (retTypeClass == Long.TYPE) {         // long
                            return Long.valueOf(contentStream.readLong());
                        } else if (retTypeClass == Float.TYPE) {        // float
                            return Float.valueOf(contentStream.readFloat());
                        } else if (retTypeClass == Double.TYPE) {       // double
                            return Double.valueOf(contentStream.readDouble());
                        } else if (retTypeClass == Void.TYPE) {         // void
                            return null;
                        } else {
                            throw new IllegalArgumentException("Not a primitive: "+retTypeClass);
                        }
                    }
                }finally {
                    try {
                        client.close();
                    }catch (Exception e){}
                }


                return null;
            }
        });
        return proxy;
    }

    public static void main(String[] args) throws IOException {
        // start server
        Server server = new Server(MyProtocol.class, new MyProtocolImp());
        server.start();

        // call rpc
        MyProtocol client = ImitateRPC.getProxy(MyProtocol.class);
        System.out.println(client.echo("hello world"));
        System.out.println(client.add(1, 4));


        server.close();
    }
}

上面的代碼只有300多行, 而且在編碼/解碼部分有重疊的代碼, 可以優(yōu)化縮減, 功能也不完善。 但是仿照Hadoop RPC, 演示了RPC的原理。 對(duì)照著Hadoop RPC的源碼, 自己編碼實(shí)現(xiàn), 對(duì)于源碼中各個(gè)類的功能, 會(huì)有更清晰的理解。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI