这里我完成了最初始的Config层,Proxy层,Codec层,以及Transport层。
具体来说,我使用Netty作为Transport层,并进行了半包和粘包的处理;我自己定义了自己的通信协议:susu协议,协议头待会会介绍;使用Java原生的动态代理作为Proxy;Config层只写了最基础的代码。
GitHub项目地址:susu
这篇博客干货比较多,基本都是代码,前方高能:
一、测试程序 Rpc服务接口:
1 2 3 public interface IService { String say (String name) ; }
Client代码(也叫consumer):
1 2 3 4 5 6 7 8 9 10 11 public class ClientTest { public static void main (String[] args) throws Exception { Reference<IService> reference = new Reference<>(); reference.setInterfaceClass(IService.class); IService service = reference.getRefer(); String result = service.say("zrj" ); System.out.println(result); } }
Server代码(也叫Provider):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ServerTest { public static void main (String[] args) { Exporter<IService> exporter = new Exporter<>(); exporter.setInterfaceClazz(IService.class); exporter.setRef(new IServiceImpl()); exporter.export(); } private static class IServiceImpl implements IService { @Override public String say (String name) { return "from rpc " + name; } } }
这部分代码上篇博客最后已经介绍过了,这篇博客将底层的实现全部完成了,运行结果如下:
二、代码细节 我先介绍两个类:Request和Response,这两个类封装了通信payload中的所有信息,我们的Rpc框架使用这两个类进行网络的信息交换,之后也会基于这两个类做序列化等工作。
Request: 各个字段的说明写在注释里面了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Request { private long requestId; private String interfaceName; private String methodName; private String argsType; private Object[] argsValue; }
Response: 1 2 3 4 5 6 7 8 9 10 11 public class Response { private long requestId; private Object returnValue; private Exception exception; }
1、服务端: Exporter: Exporter代码超简单,调用export时直接打开了一个NettyServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 */ public class Exporter <T > { private T ref; private Class<T> interfaceClazz; public void setInterfaceClazz (Class<T> interfaceClazz) { this .interfaceClazz = interfaceClazz; } public void setRef (T ref) { this .ref = ref; } public void export () { NettyServer nettyServer = new NettyServer(new Provider<>(ref, interfaceClazz)); nettyServer.open(); } }
这里面引入了两个个新的类:NettyServer、Provider,我们先看NettyServer。
NettyServer: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class NettyServer extends ChannelDuplexHandler { private Handler handler; public NettyServer (Handler handler) { this .handler = handler; } public void open () { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1 ); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder" , new NettyDecoder()); pipeline.addLast("encoder" , new NettyEncoder()); NettyChannelHandler channelHandler = new NettyChannelHandler(handler); pipeline.addLast("handler" , channelHandler); } }); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true ); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true ); try { ChannelFuture f = serverBootstrap.bind(20880 ); f.syncUninterruptibly(); } catch (Exception e) { e.printStackTrace(); } } }
一个最基本的Netty服务,里面接受一个参数:handler,这是一个通用接口,用来处理具体逻辑,当NettyServer接受到请求后,最终的处理逻辑会委托给handler。
Handler: 1 2 3 4 5 6 7 8 public interface Handler { Object handle (Object message) ; }
NettyServer代码中出现了几个新类:NettyDecoder、NettyEncoder、NettyChannelHandler,这里先简单介绍一下:NettyDecoder用来解决半包粘包问题,NettyEncoder没什么用,NettyChannelHandler是一个标准的Netty的ChannelHandler,用来处理各种网络事件,待会会仔细的看这几个类,现在先简单介绍一下有个印象。
当我们创建完NettyServer后,会启动20880接口去监听服务并阻塞等待。
NettyChannelHandler: 这个类是一个标准的ChannelHandler类,用来处理网络事件,Client端和Server端共用这个类,之后如果Client端和Server端的代码差异增加,这个类可以拆开成两个ChannelHandler,分别用于Client端和Server端,这里我偷懒就直接写成一个了,继承了ChannelDuplexHandler,这里我们先看怎么处理Request的部分逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class NettyChannelHandler extends ChannelDuplexHandler { private Codec codec; private Handler handler; public NettyChannelHandler (Handler handler) { this .codec = new SusuCodec(); this .handler = handler; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { Object object = codec.decode((byte []) msg); if (!(object instanceof Request) && !(object instanceof Response)) { throw new SusuException("NettyChannelHandler: unsupported message type when encode: " + object.getClass()); } if (object instanceof Request) { processRequest(ctx, (Request) object); } else { processResponse(ctx, (Response) object); } } private void processRequest (ChannelHandlerContext ctx, Request msg) { Object result = handler.handle(msg); Response response = new Response(); response.setRequestId(msg.getRequestId()); if (result instanceof Exception) { response.setException((Exception) result); } else { response.setReturnValue(result); } sendResponse(ctx, response); } private void processResponse (ChannelHandlerContext ctx, Response msg) { handler.handle(msg); } private ChannelFuture sendResponse (ChannelHandlerContext ctx, Response response) { byte [] msg = codec.encode(response); if (ctx.channel().isActive()) { return ctx.channel().writeAndFlush(msg); } return null ; } }
可以看到,当我们收到msg后,直接调用了Codec进行解码,这里msg一定是byte[],这一点是由NettyDecoder保证的,之后在讲半包粘包问题时会介绍到这部分代码。
解码之后,如果是Request,则委托给handler去执行具体的代码,这里的Handle是我们创建NettyServer的时候传进来的,是一个Provider实例,下面会介绍Provider。Provider返回给我们一个结果,我们先判断结果是否是一个异常,并创建一个Response填充具体的字段,最后我们使用sendResponse方法将这个Response返回给客户端,当然返回之前还是要先编码为byte[]。
到这里,一个完整的过程已经很清晰了,我们剩下需要关心的是两件事情,一个是如何编解码,一个是Provider具体干了什么,编解码对于这部分逻辑是透明的,你只用知道byte[]被转换成了Request或者Response就可以了,所以编解码部分最后再讲。
下面我们看看Provider。
Provider: 可以看到,在我们创建NettyServer的时候传入了一个Provider实例,这个实例实现了Handler接口,看看里面干了什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class Provider <T > implements Handler { protected Map<String, Method> methodMap = new ConcurrentHashMap<>(); private T ref; private Class<T> interfaceClazz; public Provider (T ref, Class<T> interfaceClazz) { if (!interfaceClazz.isInterface()) { throw new SusuException("Provider: interfaceClazz is not a interface!" ); } this .ref = ref; this .interfaceClazz = interfaceClazz; List<Method> methods = ReflectUtils.parseMethod(interfaceClazz); for (Method method : methods) { String methodDesc = ReflectUtils.getMethodDesc(method); methodMap.putIfAbsent(methodDesc, method); } } @Override public Object handle (Object message) { if (!(message instanceof Request)) { throw new SusuException("Provider: handle unsupported message type: " + message.getClass()); } Request request = (Request) message; String methodName = ReflectUtils.getMethodDesc(request.getMethodName(), request.getArgsType()); Method method = methodMap.get(methodName); if (method == null ) { return new SusuException("Provider: can't find method: " + methodName); } try { return method.invoke(ref, request.getArgsValue()); } catch (Exception e) { return new SusuException("Provider: exception when invoke method: " + methodName, e); } catch (Error e) { return new SusuException("Provider: error when invoke method: " + methodName, e); } } }
可以看到,构造函数中,首先找到了interfaceClazz的所有可以被调用的Method对象,这里只保留了限定符为public的方法,并将这些方法全部缓存到本地,Key用的是方法签名,方法签名包括参数列表,防止有重载的方法。
当调用handler的时候,将message转成Request类型(这个在调用方保证),然后拿到Request中的调用的方法和参数信息组装成Key,通过Key拿到对应的Methed,然后通过反射调用具体的方法,最后返回调用的结果,如果调用出错则抛出异常。
好了,到这里看看我们完成了什么功能,首先我们使用Netty开了一个端口监听请求,请求到来之后经过最后扔进了Provider中调用具体的方法,并返回调用结果。
当然,Netty传过来的数据是二进制数据,需要反序列化。
其实服务端的代码到这里就介绍完了,是不是很简单!
回顾一下主要是三个类,
NettyServer,用来启动一个Netty服务监听网络。
NettyChannelHandler,用来接收网络请求,并将二进制请求反序列化为Request对象,然后调用Provider获取结果,包装成Response返回给客户端。
Provider,持有服务接口的实现的引用,并使用反射解析服务接口的各种方法,当Request对象到来时,根据Request中的信息得到具体需要调用的方法,使用反射调用后获取结果,返回给NettyChannelHandler。
好了服务端的代码先到这,接下来看看客户端的代码,客户端也就是Rpc的调用方。
2、客户端 Reference: Reference也很简单,使用默认的ProxyFactory创建一个代理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Reference <T > { private ProxyFactory<T> proxyFactory; private Class<T> interfaceClass; public Reference () { this .proxyFactory = new ProxyFactory<>(); } public T getRefer () { return proxyFactory.getProxy(interfaceClass); } public Class<T> getInterfaceClass () { return interfaceClass; } public void setInterfaceClass (Class<T> interfaceClass) { this .interfaceClass = interfaceClass; } }
ProxyFactory: 1 2 3 4 5 6 public class ProxyFactory <T > { @SuppressWarnings("unchecked") public T getProxy (Class<T> clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new ProxyHandler()); } }
创建了一个代理,重点在于ProxyHandler,看看ProxyHandler干了什么。
ProxyHandler: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class ProxyHandler implements InvocationHandler { private NettyClient client; public ProxyHandler () { this .client = new NettyClient(); client.open(); } public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { Request request = new Request(); request.setInterfaceName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setArgsType(getArgsTypeString(args)); request.setArgsValue(args); Response response = client.invoke(request); if (response.getException() != null ) { throw response.getException(); } return response.getReturnValue(); } private String getArgsTypeString (Object[] args) { if (args.length <= 0 ) { return "" ; } StringBuilder sb = new StringBuilder(); for (Object object : args) { sb.append(object.getClass().getName()).append("," ); } if (sb.length() > 0 ) { sb.setLength(sb.length() - "," .length()); } return sb.toString(); } }
可见这个类是整个客户端的重点,首先,ProxyHandler构造时,首先创建了一个NettyClient并持有。当代理类的方法被调用的时候,首先根据调用的运行时信息创建Request,然后调用Response response = client.invoke(request)
获取Response,然后返回具体的结果,抛出异常或者正常返回。
具体的调用工作交给了NettyClient。
NettyClient: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class NettyClient { private io.netty.channel.Channel clientChannel; private Codec codec = new SusuCodec(); private Map<Long, ResponseFuture> currentTask = new ConcurrentHashMap<>(); public Response invoke (Request request) { byte [] msg = codec.encode(request); ResponseFuture response = new DefaultResponseFuture(); currentTask.put(request.getRequestId(), response); clientChannel.writeAndFlush(msg); try { return (Response) response.getValue(); } catch (Exception e) { Response response1 = new Response(); response1.setRequestId(request.getRequestId()); response1.setException(new TransportException("NettyClient: response.getValue interrupted!" )); return response1; } } public void open () { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 ); bootstrap.option(ChannelOption.TCP_NODELAY, true ); bootstrap.option(ChannelOption.SO_KEEPALIVE, true ); bootstrap.group(nioEventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder" , new NettyDecoder()); pipeline.addLast("encoder" , new NettyEncoder()); pipeline.addLast("handler" , new NettyChannelHandler( message -> { Response response = (Response) message; ResponseFuture future = currentTask.remove(response.getRequestId()); future.onSuccess(response); return null ; } )); } } ); new Thread(() -> { try { ChannelFuture future = bootstrap.connect("127.0.0.1" , 20880 ).sync(); clientChannel = future.channel(); clientChannel.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } }).start(); } }
当调用open之后,同样很简单的创建了一个Netty客户端,不过这里 bootstrap.connect("127.0.0.1", 20880).sync();
方法在另外一个线程中调用,不然会阻塞我们的Main方法,之后的代码无法运行。
在里面创建客户端时,同样加入了NettyDecoder,NettyEncoder,NettyChannelHandler以及一个匿名的Handler,这个Handler用来通知ResponseFuture,服务端已经返回结果了。
ResponseFuture是一个Future,用来异步获取Netty服务返回的结果。
ResponseFuture: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class DefaultResponseFuture implements ResponseFuture { private static final int NEW = 0 ; private static final int SUCCESS = 1 ; private static final int CANCEL = 2 ; private static final int FAILED = 3 ; private final Object lock = new Object(); private volatile int status; private Response value; public DefaultResponseFuture () { this .status = NEW; } @Override public void onSuccess (Response response) { synchronized (lock) { value = response; status = SUCCESS; lock.notifyAll(); } } @Override public void onFailure (Response response) { synchronized (lock) { value = response; status = FAILED; lock.notifyAll(); } } @Override public Object getValue () throws InterruptedException { if (status > 0 ) { return value; } synchronized (lock) { if (status > 0 ) { return value; } lock.wait(); } return value; } }
可以看到,当我们调用getValue()方法时,如果结果还没有准备好,会挂起当前调用的线程,直到onSuccess方法被调用,设置进来结果后,才会唤醒所有等待的线程。onSuccess方法刚刚已经看到了,这个方法被注册进了NettyChannelHandler,当Netty服务端返回时,onSuccess就会被回调。
最后看看NettyClient中的invoke方法,这个方法超级简单,首先序列化Request请求,然后创建一个DefaultResponseFuture用来异步获取结果, 并将当前的请求放入本地的缓存中,方便异步返回时配对,然后调用clientChannel.writeAndFlush(msg);
,将请求发给服务端,最后调用DefaultResponseFuture.getValue阻塞等待结果。
3、编解码,susu协议 编解码器的接口:
1 2 3 4 5 6 7 8 9 public interface Codec { byte [] encode(Object message) throws CodecException; Object decode (byte [] data) throws CodecException ; }
susu协议编解码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class SusuCodec implements Codec {
不用多解释了,协议头写在注释里面啦!
至于具体的编解码过程没什么好说的,就是硬编码,感兴趣的朋友们看Github上的源码,GitHub项目地址:susu 。
除了协议头之外,还有对Request和Response对象的序列化,我使用的是FastJson序列化框架:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class FastJsonSerialization implements Serialization { @Override public byte [] serialize(Object object) throws IOException { SerializeWriter out = new SerializeWriter(); JSONSerializer serializer = new JSONSerializer(out); serializer.config(SerializerFeature.WriteEnumUsingToString, true ); serializer.config(SerializerFeature.WriteClassName, true ); serializer.write(object); return out.toBytes("UTF-8" ); } @Override public <T> T deserialize (byte [] bytes, Class<T> clazz) throws IOException { return JSON.parseObject(new String(bytes), clazz); } }
4、半包与粘包 半包与粘包发生在TCP传输中,由于TCP是面向流的协议,TCP本身不知道如何去截断有完整语义的数据包,所以在客户端看来是分离的单独语义的数据包经过TCP传输后可能有的数据包被截断了,有的数据包被连在一起了,这是需要我们自己设计协议去解析数据流,将我们自己定义的数据包从TCP流中识别出来。
最常用的方法就是在我们自定义的协议头中加入content length字段,标识这段数据包有多少数据。
在netty中,netty提供了方便的用于处理半包粘包问题的入口。
我们可以继承ByteToMessageDecoder,每次轮询到TCP中有未读数据后,会调用decode方法,decode方法会让你有机会先”检视“一次数据,如果数据不完整(发生了半包)的话,就直接return,等待TCP的下次轮询,当有足够的数据之后,我们可以根据自己的规则,将数据写入到List<Object> out参数中,告诉netty我们有足够的数据了,可以继续进行下面的步骤了。
NettyDecoder: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class NettyDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() <= CodecConstants.HEADER_SIZE) { return ; } in.markReaderIndex(); short magic = in.readShort(); if (magic != CodecConstants.MAGIC_HEAD) { in.resetReaderIndex(); throw new TransportException("NettyDecoder: magic number error: " + magic); } in.skipBytes(2 ); int contentLength = in.readInt(); if (in.readableBytes() < contentLength + 8 ) { in.resetReaderIndex(); return ; } in.resetReaderIndex(); byte [] data = new byte [CodecConstants.HEADER_SIZE + contentLength]; in.readBytes(data); out.add(data); } }
首先检查数据大小是否大于协议头,大于协议头我们才继续。
由于大于协议头,所以后面的16个字节的数据我们可以放心的读出,而不用担心越界异常。我们先检查了一下Magic头,看看是不是我们协议的数据,然后去读contentLength字段,我们就知道了这个数据头所携带的数据包有多大,然后看看ByteBuf中是否有足够的数据,如果没有直接返回,等待下次轮询,如果有了足够数据,则直接取出这个数据包中的所有数据,加入到out中。
千万要注意,这里只能读出本数据包中的数据,由于可能发生粘包,如果将ByteBuf中的数据全部读出来,可能会读到下个数据包的部分数据,导致真正要处理下个数据包的时候,读不出完整的数据,从而导致报错或者一些意想不到的错误,甚至死循环。
三、总结 我们进行了最最简单的编码,直奔最终的结果,去完成一次最简单的RPC调用,这些代码写完的时间非常短,所以里面可能有大量大量的隐藏问题。这段代码暂时仅用于学习和理解RPC框架流程,随着之后的完善,不排除用于生产环境的可能。
代码在这。
GitHub项目地址:susu
之后会引入Zookeeper作为注册中心,支持Cluster和负载均衡部分,并完善代码逻辑,加入各种设置(本文中都没有设置入口,端口和ip全都写死的)。
如果要运行,请将Client中的sleep代码放出来,因为有可能调用的时候,Client和Server的TCP还没链接,之后也会修复这个问题。