HDFS RPC通信原理


发布于 2024-06-14 / 40 阅读 / 0 评论 /
本文基于hadoop3.3.4

Hadoop RPC协议基于IPC(Inter-Process Communcations),进程间通信实现了一套高效的轻量级RPC,这套框架使用了NIO、动态代理、Protobuf等技术。

1.HDFS RPC Server

Hadoop定义了RpcEngine接口抽象使用不同的序列化框架的RPC引擎,RPCEngine接口包括两个重要的方法。

(1)getProxy():采用java动态代理机制,客户端在代理对象上的调用会由RpcInvocationHandler对象处理。RpcInvocationHandler会将请求序列化,并调用Client.call()方法将请求发送到远程服务器。当服务器返回响应信息后,RpcInvocationHandler会将响应信息反序列化并返回给调用程序。

(2)getServer():用于产生一个RPC Server对象,服务器会启动这个Server对象监听从客户端发来的请求.成功从网络接收数据后,Server对象会调用RpcInvoker对象处理这个请求。

1.1.Server端调用链

HDFS RPC Server类结构链路如下图所示:

org.apache.hadoop.ipc.Server是一个抽象类,根据不同的RpcKind实例化不同的Server。

1.2.RPCEngine

RPCEngine定义在org.apache.hadoop.ipc.RpcEngine,表示整个远程调用的引擎,主要提供getProxy和getServer方法。原生实现有三种:

当前默认使用ProtobufRpcEngine2。

每个RPCEngine中都定义了一个org.apache.hadoop.ipc.Server子类,具体类结构如下图所示:

当前WritableRpcEngine已不推荐使用。

1.3.Server.Call

Server端接收到请求后,将请求报文封装成Server.Call对象。

Server.Call中有两个比较重要的参数:

(1)callId:int类型,表示Call的唯一键值

(2)clientId:byte[]类型,表示客户端id。

1.4.Server.RpcCall

当从socket中读到一个RPC请求时,将这个请求初始化为一个Server.RpcCall实例。

Server.RpcCall是Server.Call的子类。

Server.RpcCall中有四个比较重要的参数:

(1)connection:每个请求都必须包含Connection对象,表示与客户端的连接。

(3)rpcRequest:Writable类型,表示rpc请求信息读取的序列化器。

(3)rpcResponse:ByteBuffer类型,表示rpc响应信息。

(4)rv:Writable类型,表示rpc响应的输出序列化器。

(5)responseParams:ResponseParams类型,表示响应参数。

1.5.Server.Listener

Server.Listener是一个Thread子类,是在启动RPC Server时创建的一个监听器,是一个用于接收请求的线程。Listener类中主要有以下几个参数:

(1)InetSocketAddress address:服务端socket地址。

(2)Reader[] readers:reader线程列表。数组数量由ipc.server.read.threadpool.size决定,默认值为1。

(3)ServerSocketChannel acceptChannel:服务端socket通道。

(4)Selector selector:acceptChannel注册的选择器。

Server.Listener的初始化简化过程如下所示:

Listener(int port) throws IOException {
  address = new InetSocketAddress(bindAddress, port);

  // Create a new server socket and set to non blocking mode
  acceptChannel = ServerSocketChannel.open();
  acceptChannel.configureBlocking(false);
  acceptChannel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddr);

  // Bind the server socket to the local host and port
  bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
  
  // create a selector;
  selector= Selector.open();

  // 此处还会初始化readers,并启动每个Reader

  // Register accepts on the server socket with the selector.
  acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
}

Listener线程启动后,使用selector去读取acceptChannel,在acceptChannel中读出每个RPC请求,将读取到的请求注册到reader线程的选择器中,同时构建一个Connection实例,构建Connection实例的时候会先读取RPC连接头部,将客户端请求的协议从字符串加载转为Class对象,然后将Connection实例附加到Reader中,供Reader线程去读取,并将Connection实例加入到全局的一个连接对象List中。如果发生异常,则将当前正在进行读取连接对象关闭,如果是OutOfMemoryError还会清理connection对象,将无意义的连接清除。比如一些超时的连接,清除的时候会关闭每个connection的acceptChannel。如果停止Listener线程则会清除connectionList中所有的连接对象。

1.6.Server.Listener.Reader

Reader也是一个Thread子类,用于读取Listener接收到的RPC请求,并将这些请求加入到callQueue中。

Reader中有两个重要的属性:

(1)BlockingQueue<Connection> pendingConnections:等待处理的连接队列。

(2)Selector readSelector:读选择器,用来解析与这个selector绑定的socketChannel的请求数据。Listener在接收新的connection的时候,会将每一个connection与这个读选择器绑定。

Reader的处理过程是这样的:

第一步:循环将pendingConnections中的所有连接的channel都注册到readSelector
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);

第二步:选择有IO需求的channel的key值。Reader从readSelector中读取出来的每一个key都是附加了一个connection对象的,这里有SelectionKey的attach和attachment实现的。
readSelector.select();

第三步:遍历第二步得到的所有key值,读取每个channel中的请求数据。Reader读取出connection对象后,读取这个connection对象的数据,从响应的connection维护的Socket通道读取数据,这里会做一次版本匹配以及安全信息的验证,版本匹配失败则关闭这个连接。数据读取完成后,需要对数据进行处理,即创建Call对象,并且将Call对象加入到callQueue中,等待Handler去处理。
Connection.readAndProcess

以上三个步骤在同一个无限循环中,由Server.running参数控制是否跳出循环。

1.7.Server.Handler

Handler是用来处理远程调用请求的,Handler是在创建完RPC Server后使用server.start创建并启动的。

Server中Handler的数量由dfs.namenode.service.handler.count参数决定,默认值为10。

2.HDFS RPC Client

HDFS RPC Client定义在org.apache.hadoop.ipc.Client,作为IPC服务的客户端。

2.1.Client端调用链

Client端调用链如下图所示:

下面,我们通过一个案例来说明HDFS client建立rpc通信需要经历的过程

首先,根据配置文件创建DistributedFileSystem对象
Configuration conf = new Configuration();
conf.addResource("etc/hadoop/hdfs-site.xml");
conf.addResource("/etc/hadoop/core-site.xml");
UserGroupInformation.setConfiguration(conf);
FileSystem fs = FileSystem.get(conf);
FileSystem中定义了文件系统的操作接口,我们只需要调用FileSystem中的方法即可操作文件系统。如果是HDFS,则这里对应的是DistributedFileSystem对象。
DistributedFileSystem类中有一个DFSClient属性,这个就是HDFS RPC Client,用于与HDFS NameNode和DataNode进行RPC通信。
DFSClient创建过程如下所示
FileSystem.get(conf)
    defaultUrl = getDefaultUri(conf) // 获取fs.defaultFS配置值,默认为“file:///”
    FileSystem.get(defaultUrl, conf)
        FileSystem.createFileSystem(defaultUrl, conf)
            clazz = getFileSystemClass(defaultUrl.getScheme(), conf) // HDFS对应DistributedFileSystem类
            fs = ReflectionUtils.newInstance(clazz, conf); // 反射创建DistributedFileSystem对象
            fs.initialize(defaultUrl, conf)
                DistributedFileSystem.initDFSClient(defaultUrl, conf)
                    new DFSClient(defaultUrl, conf, statistics)
                        ugi = UserGroupInformation.getCurrentUser()
                        ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf, defaultUrl);
                        namenode = proxyInfo.getProxy();

Client的调用链如下图所示:

2.2.InvocationHandler

InvocationHandler用于反射获取对应的处理接口,比如ClientProtocol,类结构如下图所示:

这里也是根据不同的rpc类型,初始化不同的handler。

2.3.SocketFactory

SocketFactory用于创建对应的Socket连接,当前支持一下几种不同的SocketFactory。