RPC,这个名词在我上学及找工作的的几年前是比较少能谈及到的,我也是工作之后才第一次接触到RPC框架Dubbo(其实也不能这么说,Http也可以算作RPC)。
匆匆几年,不知什么时候RPC这个名词已经是招聘JD里面的常客了?
RPC是什么?
RPC是什么,第一次用Dubbo的时候我也是很懵逼的。
受当时的知识面所局限,我在最开始的接触的Dubbo时一直很不理解为什么我只是用定义的接口调用了一个方法,远端的服务就能接收到我的请求并处理然后响应结果。
后来使用了Debug大法才恍然大悟,原来还能这么玩。
下面引用一段网上经典的描述及图片来对RPC做一个简单的理解。
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC事件顺序
- 客户端(client function)调用客户端存根(client stub,可以理解为一个代理)。该调用是本地过程调用。
- 客户端存根(client stub)将参数打包到消息中,并进行系统调用以发送消息。
- 客户端的操作系统将消息从客户端计算机发送到服务器计算机。
- 服务器的操作系统将传入的数据包传递到服务器存根(server stub,可以理解为一个代理)。
- 服务器存根(server stub)解包消息得到调用参数。
- 服务器存根(server stub)调用真实的服务(server function)获得结果并以相同的方式反向返回结果。
RPC框架的基本实现
根据上面的理论我们大致知道一个RPC框架该怎么去实现。
对于一个0.0.1版本的RPC框架,我们的需求不多,只要能够调用成功服务端并能成功返回结果就OK。
RPC服务端的实现
首先我们需要写一个服务端,暂时使用BIO来处理网络通信,并且支持多线程处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| public class RpcServer {
private String host;
private int port;
private ServerSocket server;
private final ConcurrentMap<String, Object> serviceMappings = new ConcurrentHashMap<>();
private volatile boolean running = false;
public RpcServer(String host, int port) { this.host = host; this.port = port; }
public void startServer() throws IOException { Runtime.getRuntime().addShutdownHook(new Thread(RpcServer.this::stop)); server = new ServerSocket(); server.bind(new InetSocketAddress(this.host, this.port)); System.out.println("服务器启动成功,绑定地址:" + this.server.getLocalSocketAddress()); running = true; while (running) { try { Socket socket = this.server.accept(); System.out.println("接收到客户端连接:" + socket); new Thread(new RequestHandler(socket), "RequestHandler[" + socket.getRemoteSocketAddress().toString() + "] Thread").start(); } catch (IOException e) { System.out.println("接收连接异常:" + e); } } }
public void stop() { System.out.println("服务器关闭"); running = false; try { this.server.close(); } catch (Exception e) { System.out.println("服务器关闭异常:" + e); } }
public void addServiceMapping(Class<?> interfaceClass, Object serviceObject) { if (null != serviceMappings.get(interfaceClass.getName())) { throw new RpcException("接口[" + interfaceClass.getName() + "]已经存在一个实现"); } System.out.println("注册RPC服务[" + interfaceClass.getName() + " => " + serviceObject + "]"); this.serviceMappings.put(interfaceClass.getName(), serviceObject); }
private final class RequestHandler implements Runnable { } }
|
上面的代码可能有人会疑惑RpcServer#addServiceMapping()
方法是干什么用的,我们先暂时放下RequestHandler
这个类的实现,看看这个RpcServer
我们应该怎么来启动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public interface HelloService {
String sayHello(String name);
}
public class HelloServiceImpl implements HelloService {
@Override public String sayHello(String name) { return "Hello, " + name; } }
public class Startup {
public static void main(String[] args) throws IOException { RpcServer rpcServer = new RpcServer("localhost", 10210); rpcServer.addServiceMapping(HelloService.class, new HelloServiceImpl()); rpcServer.startServer(); }
}
|
通过Startup#main()
中的三个步骤就完成了RpcServer的启动,很显然RpcServer#addServiceMapping()
就是用来将所有需要提供服务的接口添加一个mapping。
在RpcServer
的代码中我们用RequestHander
来处理服务端接收到Socket连接,RequestHandler
主要职责有三:读取请求数据,调用服务目标方法,写出响应数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
private final class RequestHandler implements Runnable {
private Socket socket;
private ObjectInputStream in;
private ObjectOutputStream out;
RequestHandler(Socket socket) { this.socket = socket; }
@Override public void run() { RpcResponse rpcResponse = new RpcResponse(); try { RpcRequest rpcRequest = readData(); Object result = invokeService(rpcRequest); rpcResponse.setResponse(result); writeData(rpcResponse); } catch (IOException e) { System.out.println("从Socket中读取或写入数据异常:" + e); } catch (Exception e) { rpcResponse.setThrowable(e); try { writeData(rpcResponse); } catch (IOException ie) { System.out.println("服务端响应数据异常:" + ie); } } finally { IOUtils.close(this.socket, this.in, this.out); } }
private void writeData(RpcResponse response) throws IOException { out = new ObjectOutputStream(this.socket.getOutputStream()); out.writeObject(response); out.flush(); }
private RpcRequest readData() throws IOException, ClassNotFoundException { this.in = new ObjectInputStream(this.socket.getInputStream()); return (RpcRequest) in.readObject(); }
private Object invokeService(RpcRequest rpcRequest) { Object serviceObject = serviceMapping.get(rpcRequest.getInterfaceClassName()); if (null == serviceObject) { throw new NotFoundServiceException("服务类[" + rpcRequest.getInterfaceClassName() + "]没有找到可用的服务"); } try { Method method = serviceObject.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); return method.invoke(serviceObject, rpcRequest.getArguments()); } catch (NoSuchMethodException | SecurityException e) { throw new NotFoundServiceException("服务类[" + rpcRequest.getInterfaceClassName() + "]没有找到目标方法", e); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { throw new NotFoundServiceException("服务类[" + rpcRequest.getInterfaceClassName() + "]调用目标方法异常", e); } } }
|
RPC客户端的实现
如何实现客户端,让调用方在无感知的情况下就能实现RPC远程调用呢?我们可以使用JDK提供的动态代理来实现调用方的无感知的远程调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
public class RpcInvocationHandler implements InvocationHandler {
private RpcInvoker<?> invoker;
public RpcInvocationHandler(RpcInvoker<?> invoker) { this.invoker = invoker; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(methodName, parameterTypes, args); }
}
|
RpcInvocationHandler
有个细节要处理,排除掉所有定义在Object.class中的方法,比如Object#wait()
、Object#notify()
等,还要排除invoker
中重写的toString
,hashCode
,equals
方法。
在上面的RpcInvocationHandler
构造方法中我们传入了一个叫做RpcInvoker
(RPC调用者?)的对象,其实它封装了一系列的RPC调用的内部处理细节(当然现在里面其实没做太多的事情。。。)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
public class RpcInvoker<T> {
private RpcClient rpcClient;
private Class<T> interfaceClass;
public RpcInvoker(String host, int port, Class<T> interfaceClass) { this.rpcClient = new RpcClient(host, port); this.interfaceClass = interfaceClass; }
public Object invoke(String methodName, Class<?>[] parameterTypes, Object[] args) throws Throwable { RpcResponse rpcResponse = null; try { RpcRequest rpcRequest = new RpcRequest().newRpcRequest(this.interfaceClass.getName(), methodName, parameterTypes, args); this.rpcClient.writeData(rpcRequest); rpcResponse = (RpcResponse) this.rpcClient.readData(); } finally { this.rpcClient.close(); } if (null == rpcResponse) { throw new RpcException("RPC调用为获取到结果"); } if (null != rpcResponse.getThrowable()) { throw rpcResponse.getThrowable(); } return rpcResponse.getResponse(); }
}
public class RpcClient {
private Socket socket;
private ObjectOutputStream out;
private ObjectInputStream in;
public RpcClient(String host, int port) { this.socket = new Socket(); try { this.socket.connect(new InetSocketAddress(host, port), 3000); } catch (Exception e) { throw new RpcNetworkException("连接服务器[" + host + ":" + port + "]异常", e); } }
public void writeData(Object object) throws IOException { this.out = new ObjectOutputStream(this.socket.getOutputStream()); this.out.writeObject(object); this.out.flush(); }
public Object readData() throws IOException, ClassNotFoundException { this.in = new ObjectInputStream(this.socket.getInputStream()); return this.in.readObject(); }
public void close() { IOUtils.close(this.in, this.out, this.socket); }
}
|
我们以HelloService这个RPC接口服务为例来使用一下这个简单的RPC实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class Startup {
public static void main(String[] args) { RpcInvoker<HelloService> invoker = new RpcInvoker<>("localhost", 10210, HelloService.class); HelloService helloService = (HelloService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{HelloService.class}, new RpcInvocationHandler(invoker)); final String result = helloService.sayHello("Simple RPC"); System.out.println("结果:" + result); } }
|
数据是怎样交互的
从上面RequestHandler
中可以看出,读取数据的时用的ObjectInputStream读取,直接将对象装换成一个RpcRequest,写出数据时也是将结果包装成一个RpcResponse用ObjectOutputStream写出,那么这两个类具体有什么呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
public class RpcRequest implements Serializable { private static final long serialVersionUID = -9213158787762981233L;
private String interfaceClassName;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] arguments; }
public class RpcResponse implements Serializable {
private static final long serialVersionUID = -5573289211420999714L;
private Object response;
private Throwable throwable;
public Object getResponse() { return response; }
public void setResponse(Object response) { this.response = response; }
public Throwable getThrowable() { return throwable; }
public void setThrowable(Throwable throwable) { this.throwable = throwable; }
}
|
RpcRequest
中的interfaceClassName
、methodName
、parameterTypes
三个成员属性就能确定一个服务类的方法,这三者可被称作“RPC服务三元组”。而arguments
则承载着这个方法的调用参数。
RpcResponse
则有repsonse
、throwable
两个成员属性,前者是RPC服务真实的调用结果,后者则包括了一系列的异常,如客户端异常、服务端异常、RPC服务抛出的异常。
客户端将RpcRequest
序列化后通过网络传输到服务端,服务端接收到RpcRequest
,通过“RPC服务三元组”找到目标服务方法,执行此方法得到结果,并将调用结果包装成RpcResponse
序列化后写回给客户端,客户端接收数据后获取调用结果,就能得到RPC调用的结果。
据我有限的知道的几款RPC框架如Dubbo、Motan、sofa-rpc等都是基于此种最基本的形式来完成远程调用。
总结
通过上面的代码我们基本上完成了一个脆弱的RPC框架
当然这是一个v0.0.1版本的RPC框架,我们还有很多事情需要做,比如NIO、服务发现、异常处理、资源控制、优雅关机等等一系列问题。
这些问题容我来慢慢的各个击破。
下次再见。。。