您好,登錄后才能下訂單哦!
hadoop作為分布式的系統(tǒng), 集群機(jī)器之間的通信是最基本,最常見的需求。
這種需求本質(zhì)上是IPC, 即進(jìn)程間通信。 按照傳統(tǒng)的UINX編程模型,進(jìn)程間通信無非是如下的幾種方式:
管道, FIFO, 消息隊(duì)列, 信號(hào)量, 共享存儲(chǔ), 套接字。只有套接字是可以跨機(jī)器的網(wǎng)絡(luò)通信, 能滿足hadoop的需求。
通常情況下, 網(wǎng)絡(luò)通信的程序使用顯式網(wǎng)絡(luò)編程(即直接使用java.net包)。比如Web瀏覽器, Web服務(wù)器等。
但也有另一部分程序使用隱式網(wǎng)絡(luò)編程, 比如利用hadoop RPC這種封裝了底層通信細(xì)節(jié)的工具包。
這樣做使得底層的網(wǎng)絡(luò)通信對(duì)于程序員透明。一則減輕了程序員的負(fù)擔(dān), 二則抽象了功能模塊, 使得模塊之間職責(zé)更清晰, 便于維護(hù)。
首先展示一個(gè)hadoop RPC功能demo, 了解hadoop RPC的用法。
step 1: 在pom.xml文件中添加依賴項(xiàng)
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency>
step 2: 在src/main/resources中添加log4j.properties
log4j.rootLogger=DEBUG,console log4j.additivity.org.apache=true # (console) log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.Threshold=INFO log4j.appender.console.ImmediateFlush=true log4j.appender.console.Target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n
step 3: 定義RPC協(xié)議
package hadooprpc.demo; import java.io.IOException; interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol { // 版本號(hào),默認(rèn)情況下,不同版本號(hào)的 RPC Client 和 Server 之間不能相互通信 public static final long versionID = 1L; String echo(String value) throws IOException; int add(int v1, int v2) throws IOException; }
step 4: 實(shí)現(xiàn)RPC協(xié)議
package hadooprpc.demo; import java.io.IOException; import org.apache.hadoop.ipc.ProtocolSignature; public class ClientProtocolImpl implements ClientProtocol { // 重載的方法,用于獲取自定義的協(xié)議版本號(hào), public long getProtocolVersion(String protocol, long clientVersion) { return ClientProtocol.versionID; } // 重載的方法,用于獲取協(xié)議簽名 public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int hashcode) { return new ProtocolSignature(ClientProtocol.versionID, null); } public String echo(String value) throws IOException { return value; } public int add(int v1, int v2) throws IOException { return v1 + v2; } }
step 5; 構(gòu)造并啟動(dòng) RPC Server
package hadooprpc.demo; import java.io.IOException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; public class RPCServer { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Configuration conf = new Configuration(); Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class) .setInstance(new ClientProtocolImpl()).setBindAddress("localhost").setPort(8097) .setNumHandlers(5).build(); server.start(); } }
step 6: 構(gòu)造 RPC Client 并發(fā)送 RPC 請(qǐng)求
package hadooprpc.demo; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; public class RPCClient { public static void main(String[] args) throws IOException { ClientProtocol client = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, new InetSocketAddress(8097), new Configuration()); int result = client.add(5, 6); System.out.println(result); String echoResult = client.echo("result"); System.out.println(echoResult); RPC.stopProxy(client); -- 關(guān)閉連接 } }
step 7: 啟動(dòng)RPC Server, 然后執(zhí)行 RPC Client.
通過上面的例子可以發(fā)現(xiàn), 通過這種編程方式,不用考慮網(wǎng)絡(luò)層的細(xì)節(jié),只需要編寫接口和接口實(shí)現(xiàn)即可。
問渠那得清如許? 通過hadoop RPC的源碼, 或許可以管中窺豹。
hadoop RPC的實(shí)現(xiàn)原理很簡(jiǎn)單。
Client:
1. 通過動(dòng)態(tài)代理,獲取到調(diào)用接口的方法,參數(shù)類型。
2. 將調(diào)用信息編碼,發(fā)送到服務(wù)器
3. 獲取服務(wù)器的返回值, 并解碼。
4. 返回調(diào)用方法的返回值。
Server:
1. 啟動(dòng)服務(wù)器, 并監(jiān)聽客戶端。
2. 獲取客戶端發(fā)送過來的調(diào)用方法, 參數(shù)。
3. 執(zhí)行實(shí)現(xiàn)類中相關(guān)的方法。
4. 將返回值發(fā)送到客戶端。
理解原理后, 自己動(dòng)手實(shí)現(xiàn)了一個(gè)非常粗糙, 只具備演示功能的RPC框架。
package srpc; import java.io.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; /** * Created by shgy on 17-5-7. * 創(chuàng)建一個(gè)簡(jiǎn)單的PRC框架, 參數(shù)和返回值只支持基礎(chǔ)類型 */ // 接口 interface MyProtocol{ String echo(String msg); int add(int a,int b); } // 實(shí)現(xiàn) class MyProtocolImp implements MyProtocol{ @Override public String echo(String msg) { return msg; } @Override public int add(int a,int b){ return a+b; } } public class ImitateRPC { private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>(); static { PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); PRIMITIVE_NAMES.put("byte", Byte.TYPE); PRIMITIVE_NAMES.put("char", Character.TYPE); PRIMITIVE_NAMES.put("short", Short.TYPE); PRIMITIVE_NAMES.put("int", Integer.TYPE); PRIMITIVE_NAMES.put("long", Long.TYPE); PRIMITIVE_NAMES.put("float", Float.TYPE); PRIMITIVE_NAMES.put("double", Double.TYPE); PRIMITIVE_NAMES.put("void", Void.TYPE); } public static class Server{ private Class<?> protocolClass; private Object protocolImpl; private ServerSocket server; public Server(Class<?> protocolClass, Object protocolImpl) throws IOException { if(protocolImpl == null){ throw new IllegalArgumentException("protocolImpl is not set"); } if (protocolClass == null) { throw new IllegalArgumentException("protocolClass is not set"); } else { if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) { throw new IOException("protocolClass "+ protocolClass + " is not implemented by protocolImpl which is of class " + protocolImpl.getClass()); } } this.protocolClass = protocolClass; this.protocolImpl = protocolImpl; } public void start(){ System.out.println("start server"); try{ this.server = new ServerSocket(8189); listen(); }catch(Exception e){ e.printStackTrace(); } } public void close(){ System.out.println("close server"); try { this.server.close(); } catch (IOException e) {} } private void listen(){ new Thread(){ @Override public void run(){ while (!server.isClosed()){ Socket incoming = null; try { incoming = server.accept(); DataInputStream inStream = new DataInputStream(incoming.getInputStream()); // 從客戶端讀取出 調(diào)用方法的信息 int dataLen = inStream.readInt(); byte[] data = new byte[dataLen]; inStream.read(data,0, dataLen); DataInputStream contentStream = new DataInputStream(new ByteArrayInputStream(data)); String methodName = contentStream.readUTF(); int paramCount = contentStream.readInt(); Class<?>[] paramTypes = new Class<?>[paramCount]; Object[] args = new Object[paramCount]; for(int i=0;i<paramCount;i++){ String className = contentStream.readUTF(); Class<?> declaredClass = PRIMITIVE_NAMES.get(className); if(declaredClass == null){ declaredClass = String.class; } paramTypes[i] = declaredClass; if(declaredClass == String.class){ args[i] = contentStream.readUTF(); }else if (declaredClass.isPrimitive()) { // primitive types if (declaredClass == Boolean.TYPE) { // boolean args[i] = Boolean.valueOf(contentStream.readBoolean()); } else if (declaredClass == Character.TYPE) { // char args[i] = Character.valueOf(contentStream.readChar()); } else if (declaredClass == Byte.TYPE) { // byte args[i] = Byte.valueOf(contentStream.readByte()); } else if (declaredClass == Short.TYPE) { // short args[i] = Short.valueOf(contentStream.readShort()); } else if (declaredClass == Integer.TYPE) { // int args[i] = Integer.valueOf(contentStream.readInt()); } else if (declaredClass == Long.TYPE) { // long args[i] = Long.valueOf(contentStream.readLong()); } else if (declaredClass == Float.TYPE) { // float args[i] = Float.valueOf(contentStream.readFloat()); } else if (declaredClass == Double.TYPE) { // double args[i] = Double.valueOf(contentStream.readDouble()); } else if (declaredClass == Void.TYPE) { // void args[i] = null; } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } } } Method method = protocolClass.getMethod(methodName, paramTypes); Object obj = method.invoke(protocolImpl, args); Class retType = method.getReturnType(); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(buffer); out.writeUTF(retType.getCanonicalName()); if(retType == String.class){ out.writeUTF((String)obj); }else if (retType.isPrimitive()) { // primitive types if (retType == Boolean.TYPE) { // boolean out.writeBoolean((Boolean)obj); } else if (retType == Character.TYPE) { // char out.writeChar((Character) obj); } else if (retType == Byte.TYPE) { // byte out.writeBoolean((Boolean)obj); } else if (retType == Short.TYPE) { // short out.writeShort((Short) obj); } else if (retType == Integer.TYPE) { // int out.writeInt((Integer) obj); } else if (retType == Long.TYPE) { // long out.writeLong((Long) obj); } else if (retType == Float.TYPE) { // float out.writeFloat((Float) obj); } else if (retType == Double.TYPE) { // double out.writeDouble((Double) obj); } else if (retType == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: "+retType); } } byte[] array = buffer.toByteArray(); //將返回結(jié)果寫回到客戶端 DataOutputStream outStream = new DataOutputStream(incoming.getOutputStream()); outStream.writeInt(array.length); outStream.write(array); outStream.flush(); }catch (SocketException e){ }catch (Exception e){ e.printStackTrace(); }finally { try { if(incoming!=null){ incoming.close(); } }catch (Exception e){} } } } }.start(); } } public static <T> T getProxy(Class<T> protocol){ T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?>[] paramTypes = method.getParameterTypes(); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(buffer); out.writeUTF(method.getName()); // 寫入方法名 out.writeInt(paramTypes.length); // 寫入?yún)?shù)個(gè)數(shù) // 寫入?yún)?shù) for (int i = 0; i < paramTypes.length; i++) { Class<?> declaredClass = paramTypes[i]; Object instance = args[i]; String paramTypeName = declaredClass.getCanonicalName(); out.writeUTF(paramTypeName); if (declaredClass == String.class) { out.writeUTF((String)instance); }else if (declaredClass.isPrimitive()) { // primitive type if (declaredClass == Boolean.TYPE) { // boolean out.writeBoolean(((Boolean) instance).booleanValue()); } else if (declaredClass == Character.TYPE) { // char out.writeChar(((Character) instance).charValue()); } else if (declaredClass == Byte.TYPE) { // byte out.writeByte(((Byte) instance).byteValue()); } else if (declaredClass == Short.TYPE) { // short out.writeShort(((Short) instance).shortValue()); } else if (declaredClass == Integer.TYPE) { // int out.writeInt(((Integer) instance).intValue()); } else if (declaredClass == Long.TYPE) { // long out.writeLong(((Long) instance).longValue()); } else if (declaredClass == Float.TYPE) { // float out.writeFloat(((Float) instance).floatValue()); } else if (declaredClass == Double.TYPE) { // double out.writeDouble(((Double) instance).doubleValue()); } else if (declaredClass == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: " + declaredClass); } }else{ throw new IOException("Can't write: "+instance+" as "+declaredClass); } } // 發(fā)送到服務(wù)器端 byte[] array = buffer.toByteArray(); Socket client = new Socket("127.0.0.1", 8189); try{ DataOutputStream outStream = new DataOutputStream(client.getOutputStream()); outStream.writeInt(array.length); outStream.write(array); outStream.flush(); // 讀取服務(wù)器的應(yīng)答 DataInputStream inStream = new DataInputStream(client.getInputStream()); int bufSize = inStream.readInt(); byte[] retArray = new byte[bufSize]; inStream.read(retArray,0, bufSize); DataInputStream contentStream = new DataInputStream(new ByteArrayInputStream(retArray)); String retType = contentStream.readUTF(); Class retTypeClass = PRIMITIVE_NAMES.get(retType); if(retTypeClass == null){ retTypeClass = String.class; } if(retTypeClass == String.class){ return contentStream.readUTF(); }else if (retTypeClass.isPrimitive()) { // primitive types if (retTypeClass == Boolean.TYPE) { // boolean return Boolean.valueOf(contentStream.readBoolean()); } else if (retTypeClass == Character.TYPE) { // char return Character.valueOf(contentStream.readChar()); } else if (retTypeClass == Byte.TYPE) { // byte return Byte.valueOf(contentStream.readByte()); } else if (retTypeClass == Short.TYPE) { // short return Short.valueOf(contentStream.readShort()); } else if (retTypeClass == Integer.TYPE) { // int return Integer.valueOf(contentStream.readInt()); } else if (retTypeClass == Long.TYPE) { // long return Long.valueOf(contentStream.readLong()); } else if (retTypeClass == Float.TYPE) { // float return Float.valueOf(contentStream.readFloat()); } else if (retTypeClass == Double.TYPE) { // double return Double.valueOf(contentStream.readDouble()); } else if (retTypeClass == Void.TYPE) { // void return null; } else { throw new IllegalArgumentException("Not a primitive: "+retTypeClass); } } }finally { try { client.close(); }catch (Exception e){} } return null; } }); return proxy; } public static void main(String[] args) throws IOException { // start server Server server = new Server(MyProtocol.class, new MyProtocolImp()); server.start(); // call rpc MyProtocol client = ImitateRPC.getProxy(MyProtocol.class); System.out.println(client.echo("hello world")); System.out.println(client.add(1, 4)); server.close(); } }
上面的代碼只有300多行, 而且在編碼/解碼部分有重疊的代碼, 可以優(yōu)化縮減, 功能也不完善。 但是仿照Hadoop RPC, 演示了RPC的原理。 對(duì)照著Hadoop RPC的源碼, 自己編碼實(shí)現(xiàn), 對(duì)于源碼中各個(gè)類的功能, 會(huì)有更清晰的理解。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。