HDFS RPC简介
HDFS RPC简介
背景
- HDFS 是除了数据交互之外的内外部服务相互沟通的渠道,了解RPC的实现能能更清楚的了解组件之前的交互模式
- 同时HDFS RPC也是一个通用的知识,不管哪个大数据组件基本都会涉及
接口
hdfs的rpc实现的接口交互流程如上图
hdfs的client和namenode之间是通过ClientProtocol的接口进行远程过程调用(client与datanode为ClientDatanodeProtocol,namenode与datanode为DatanodeProtocol等)
由于在组件之间的传输需要通过socket进行传输就需要实现参数的序列化和反序列化,目前hdfs都是protobuf进行序列化,所以添加了一层ClientNameNameNodeProtocolPB适配器接口
客户端
- ClientNamenodeProtocolPB接口和ClientProtocol接口一一对应,不过ClientProtocol接口参数是抽象的,例如rename的参数src和dest都是String类型,而ClientNamenodeProtocolPB则参数为RenameRequestProto的序列化类型
- 在ClientNamenodeProtocolTranslatorPB则是实现了ClientProtocol接口,客户端调用ClientNamenodeProtocolTranslatorPB的rename(src,dest),ClientNamenodeProtocolTranslatorPB则将src和dest打包成RenameRequestProto然后调用ClientNamenodeProtocolPB的代理对象执行rename(ClientNamenodeProtocolPB)的函数
- 对ClientNamenodeProtocolPB代理对象的rename函数调用则转化为调用内部的ProtobufRpcEngine.Invoker对象的invoke方法(java的动态代理机制),invoke方法首先构造描述RPC调用信息的对象RequestHeaderProto,用于记录客户端在什么协议上调用了什么方法(在ClientProtocol协议上调用rename方法),然后将RequestHeaderProto对象以及调用参数RenameRequestProto一起打包成一个RpcRequestWrapper对象
- 然后invoker方法调用RPC.Client的call方法将请求通过socket发送到远程服务端,然后等待返回结果
服务端
- 服务端是通过RPC.Server对象监听socket请求,然后构造一个ProtoBufRpcInvoker解析到是一个ClientProtocol接口调用
- 从而查询到实现了ClientProtocol接口的BlockingService对象,然后ProtoBufRpcInvoker将调用BlockingService的callBlockingMethod方法
- callBlockingMethod判断出是一个ClientNamenodeProtocolPB的rename调用,则会在持有的ClientNamenodeProtocolServerSideTranslatorPB对象上调用rename(RenameRequestProto)方法
- ClientNamenodeProtocolServerSideTranslatorPB对象是实现ClientNameNameNodeProtocolPB接口,该对象会将RenameRequestProto请求反序列化,然后调用NameNodeRpcServer.rename(src,dest)方法。
- NameNodeRpcServer对象则是实现了ClientProtocol接口,NameNodeRpcServer 则是真正执行rename操作的业务逻辑
客户端 & 服务端
- 客户端将先从全局CLIENTS的缓存客户端中获取一个客户端,没有就构造一个client,client和DFSClient是一对多的关系。client保存了一个sendParamsExecutor线程池处理数据的发送
- ClientNamenodeProtocolPB代理对象的函数调用都是内部Invoker对象的invoke()转发,在调用rename函数的时候,就执行Invoker.invoke(),invoker()函数就调用client的call,在call发送数据之前,需要获取一个Connection对象,该对象封装了客户端和服务端通信的socket,该对象也在client中缓存,如果第一次获取,则通过setupIOstreams连接服务端,socket连接后还会writeConnectionHeader同步给服务端一些版本,认证信息
- 服务端通过NameNodeRpcServer内的Listener对象接受客户端的socket连接,并从readers线程池中选择一个reader,并构造一个Connection对象放入reader的等待处理队列。这里的readers线程默认就一个线程
- reader的工作线程将在等待队列pendingConnections中的Connection取出并注册OP_READ事件等待客户端发起数据。其中pendingConnections默认初始化100个元素,是一个LinkedBlockingQueue队列理论上最大为Int.MAX
- 回到客户端,客户端创建Connection后构造一个call对象,call对象是一个请求的抽象,包含了请求的参数,rpc类型,最重要的是有callid,这个callid是客户端可服务端索引一个请求的依据。通过Connection将call对象交给sendParamsExecutor线程池发送到服务端,并且call对象也会加到Connection对象的calls队列中等待服务端的返回结果
- 服务端的reader接收到了客户端发来的数据,将解析出RpcRequestHeaderProto,该对象中保存了callid,retry次数,客户端id等信息。再将后面RpcRequest请求反序列化出来,构造call对象,然后将call对象放到callQueue队列中等待处理, callQueue默认为一个LinkedBlockingQueue最大长度为handler线程个数 * 100
- handler 线程池消费callQueue的call,通过rpckind找到ProtoBufRpcInvoker,调用该对象的call()将请求进一步反序列化,拿到调用的接口以及函数,通过接口信息获取先前注册的BlockingService,再调用BlockingService的callBlockingMethod区分调用的那个函数,然后调用
NameNodeRpcServer对象执行业务逻辑。默认handler为10个线程,一般线上集群需要调整到100以上 - 执行完业务逻辑后handler会将结果序列化后直接发送会客户端,如果没能完全发送回客户端,会在channel上注册OP_WRITE事件,将call对象放到SelectionKey里
- Responder线程会等待socket可以写入后,将SelectionKey中的call取出进行异步返回结果给客户端,如果还是无法写成功,connection会被关闭,结果也会被丢弃
- 客户端读取in的输入流反序列化结果数据,将结果保存到rpcResponse并notify唤醒client在call上等待的线程处理返回结果,这个call上等待的线程就是客户端调用DFSClient执行rename的线程
数据包格式
- length: 每个protobuf类型的数据都包含length字段为writeDelimitedTo()方法会先写入数据的length 在写入数据
- RpcRequestHeaderProto:PPC调用头域,保存了callId,clientId,rpcKind等重要信息。服务器发回的响应消息中会带回callId,clientId等信息,用于提取call,鉴权等
- RpcRequest:这是在ProtobufRpcEngine.Invoker.invoke()方法中构造RpcRequestWrapper类。包括两部分类容
- requestHeader:请求元信息,在什么结构上调用什么方法
- requestParam:请求参数,使用protobuf封装的,例如RenameRequestProto参数