一个轻量级分布式 RPC 框架 上

一个轻量级分布式 RPC 框架 上

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

具体的介绍就不多说了,通过搜索引擎可以获得很多这方面的介绍。

这里推荐一篇文章:https://my.oschina.net/huangyong/blog/361751

对于架构的设计,说些自己的看法,不想去说那么抽象,拿一个人来讲:

首先,人体的组成必须要由骨架,这就是表示项目应该有一个总体的架构图。

然后,要有各个器官来各司其职,这就是各个模块了。

再者,各个器官之间要交流,那就需要有个中间件来承载的,这里就是血管,而血液就是信息承载体,里面包含了各种器官需要的物质和生产出的物 质,血液在代码中的体现可能是一个ConcurrentMap

最后,专注于各个模块的实现,所用的逻辑和技术具体问题具体再分析处理

首先对此框架的设计图:

图1

图1

本文将为您揭晓开发轻量级分布式 RPC框架的具体过程,该框架基于 TCP协议,提供了NIO特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。

根据以上技术需求,我们可使用如下技术选型:

  1. Spring: 它是最强大的依赖注入框架,也是业界的权威标准。
  2. Netty: 它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。
  3. Kryo: 一个快速高效的Java序列化框架,旨在提供快速、高效和易用的API。无论文件、数据库或网络数据Kryo都可以随时完成序列化
  4. ZooKeeper: 提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。

Netty 请自己找文章或书学习的,推荐Netty.in.Action

编写核心模块

一个模块下面也是可以根据很多细分的小模块流程来做的

1,编写通用模块

因为用的是netty,所以对于netty的编程主要考虑几个方面,编解码处理,核心逻辑处理类。编解码处理就涉及到了序列化处理和所要处理的对象,

我们所要请求的包括方法的名称参数和方法所属类,设计上,以一个id为唯一标志,服务端处理完返回的结果同样携带此id,这样就可以很轻松取到了,

这也就是血液所携带的信息的体现。

设计请求和回复类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.nia.rpc.core.protocol;

import lombok.Data;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
@Data
public class Request {
private long requestId;
private Class<?> clazz;
private String method;
private Class<?>[] parameterTypes;
private Object[] params;
private long requestTime;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.nia.rpc.core.protocol;

import lombok.Getter;
import lombok.Setter;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
@Setter
@Getter
public class Response {
private long requestId;
private Object response;
private Throwable throwable;
}

关于lombok,请看通过Lombok来简化你的代码

序列化处理类

先创建一个接口,方便以后有其他序列化实现,这里仅使用Kryo

1
2
3
4
5
6
7
8
9
10
package com.nia.rpc.core.serializer;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public interface Serializer {
byte[] serialize(Object obj);
<T> T deserialize(byte[] bytes);
}

具体实现步骤很简单,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.nia.rpc.core.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public class KryoSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
Kryo kryo=new Kryo();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
kryo.writeClassAndObject(output,obj);
output.close();
return byteArrayOutputStream.toByteArray();
}

@Override
public <T> T deserialize(byte[] bytes) {
Kryo kryo=new Kryo();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
input.close();

return (T) kryo.readClassAndObject(input);
}
}

编解码处理类
编码处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.nia.rpc.core.protocol;

import com.nia.rpc.core.serializer.KryoSerializer;
import com.nia.rpc.core.serializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public class RpcEncoder extends MessageToByteEncoder<Object> {
private Serializer serializer = new KryoSerializer();

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf out) throws Exception {

byte[] bytes = serializer.serialize(msg);
int length = bytes.length;
out.writeInt(length);
out.writeBytes(bytes);
}
}
解码处理

注意点请看注释,都是一贯套路 具体关于此类的文档总结:netty 数据分包、组包、粘包处理机制(部分).md)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.nia.rpc.core.protocol;

import com.nia.rpc.core.serializer.KryoSerializer;
import com.nia.rpc.core.serializer.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
//常用的处理大数据分包传输问题的解决类:LengthFieldBasedFrameDecoder
public class RpcDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcDecoder.class);
private Serializer serializer = new KryoSerializer();

public RpcDecoder(int maxFrameLength) {
super(maxFrameLength, 0, 4, 0, 4);
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf decode = (ByteBuf) super.decode(ctx, in);
if (decode != null) {
int byteLength = decode.readableBytes();
byte[] byteHolder = new byte[byteLength];
decode.readBytes(byteHolder);
Object deserialize = serializer.deserialize(byteHolder);
return deserialize;
}
LOGGER.debug("Decoder Result is null");
return null;
}
}

2,编写服务端模块

首先搞定ip和端口的获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.nia.rpc.core.utils;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* IP and Port Helper for RPC
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public class NetUtils {

/**
* 此处实现的并不到位,暂时就这样处理的
* 用Java获取本机IP地址,需要处理:
*1. 多块网卡。
*2. 排除loopback设备、虚拟网卡
*看似简单的代码,写起来还是要小心一些的。
* @return
*/
public static String getLocalIp() {

try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return null;
}


}
服务端对客户端过来方法请求 的处理逻辑:

此处的service代码里已经解释过,就是服务端所注册的接口(其实也不一定是接口,对外可能是restful的一个地址)的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.nia.rpc.core.server;

import com.nia.rpc.core.protocol.Request;
import com.nia.rpc.core.protocol.Response;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public class RpcServerHandler extends SimpleChannelInboundHandler<Request> {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcServerHandler.class);

private Object service;

//此处传入service的实现类对象
public RpcServerHandler(Object service) {
this.service = service;
}


protected void channelRead0(ChannelHandlerContext channelHandlerContext, Request msg) throws Exception {


String methodName = msg.getMethod();
Object[] params = msg.getParams();
Class<?>[] parameterTypes = msg.getParameterTypes();
long requestId = msg.getRequestId();
//通过反射来获取客户端所要调用的方法并执行
Method method = service.getClass().getDeclaredMethod(methodName, parameterTypes);
method.setAccessible(true);
Object invoke = method.invoke(service, params);
Response response = new Response();
response.setRequestId(requestId);
response.setResponse(invoke);
channelHandlerContext.pipeline().writeAndFlush(response);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("Exception caught on {}, ", ctx.channel(), cause);
ctx.channel().close();
}
}
定义服务接口:
1
2
3
4
5
6
7
8
9
10
package com.nia.rpc.core.server;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public interface Server {
void start();
void shutdown();
}
服务端主逻辑的实现:
  1. 通过Curator来操作zookeeper的节点,具体的使用请看跟着实例学习ZooKeeper的用法

  2. 服务端起起来,具体看下面start()代码,都是netty的格式化用法,将之前的准备应用到此

  3. 然后调用registerService()方面实现注册逻辑:

    ​ 获取所连接zk地址;

    ​ 获取服务端本地ip;

    ​ 通过Curator创建一个zk的客户端;

    ​ 添加注册基础服务节点 ;

  4. 关停相关服务的逻辑,具体看下面源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package com.nia.rpc.core.server;

import com.nia.rpc.core.protocol.RpcDecoder;
import com.nia.rpc.core.protocol.RpcEncoder;
import com.nia.rpc.core.utils.NetUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.nia.rpc.core.utils.Constant.ZK_DATA_PATH;

/**
* Author 知秋
* Created by Auser on 2017/2/17.
*/
public class ServerImpl implements Server {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerImpl.class);

private String localIp;
private int port;
private boolean started = false;
private Channel channel;
private Object serviceImpl;
private String serviceName;
private String zkConn;
private String serviceRegisterPath;

private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup = new NioEventLoopGroup();

private CuratorFramework curatorFramework;

public ServerImpl(int port, Object serviceImpl, String serviceName) {
this.port = port;
this.serviceImpl = serviceImpl;
this.serviceName = serviceName;
}

public ServerImpl(int port, Object serviceImpl, String serviceName, String zkConn) {
this.port = port;
this.serviceImpl = serviceImpl;
this.serviceName = serviceName;
this.zkConn = zkConn;
}

@Override
public void start() {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcDecoder(10 * 1024 * 1024))
.addLast(new RpcEncoder())
.addLast(new RpcServerHandler(serviceImpl));
}
});
try {
//调用bind等待客户端来连接
ChannelFuture future = serverBootstrap.bind(port).sync();
//接着注册服务
registerService();

LOGGER.info("Server Started At {}", port);
started = true;
this.channel = future.channel();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void registerService() {
zkConn = getZkConn();
localIp = NetUtils.getLocalIp();
String serviceIp=localIp+":"+port;
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkConn,
new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
//连接上zk然后开始注册服务节点
String serviceBasePath=ZK_DATA_PATH+serviceName;
//添加基础服务节点
try {
curatorFramework.create()
.creatingParentContainersIfNeeded()
.forPath(serviceBasePath);
} catch (Exception e) {
if (e.getMessage().contains("NodeExist")) {
LOGGER.info("This Path Service has already Exist");
} else {
LOGGER.error("Create Path Error ", e);
throw new RuntimeException("Register error");
}
}

boolean registerSuccess=false;

//如果添加成功,添加标识服务具体路径的节点
while (!registerSuccess){
try {
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(serviceBasePath+"/"+serviceIp);
//这里测试出现无限注册,特么坑死了,忘添加状态修改了
registerSuccess = true;

} catch (Exception e) {
//出错重新注册(要先删除下节点再重新注册)
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
LOGGER.info("Retry Register ZK, {}", e.getMessage());
try {
curatorFramework.delete().forPath(serviceBasePath + "/" + serviceIp);
} catch (Exception e1) {
e1.printStackTrace();
}
}

}

}

@Override
public void shutdown() {
//关停相关服务的逻辑
LOGGER.info("Shutting down server {}", serviceName);
unRegister();
if (curatorFramework != null) {
curatorFramework.close();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

private void unRegister() {
LOGGER.info("unRegister zookeeper");
try {
curatorFramework.delete().forPath(ZK_DATA_PATH+serviceName+"/"+localIp+":"+port);
} catch (Exception e) {
e.printStackTrace();
}
}

public String getZkConn() {
return zkConn;
}

public void setZkConn(String zkConn) {
this.zkConn = zkConn;
}

public String getLocalIp() {
return localIp;
}

public void setLocalIp(String localIp) {
this.localIp = localIp;
}

public Channel getChannel() {
return channel;
}

public void setChannel(Channel channel) {
this.channel = channel;
}

}

暂时先到此,接下来的内容请看下篇

源码查看:https://github.com/muyinchen/migo-RPC

您的支持将鼓励我继续创作!