RPC实现——thrift框架


发布于 2017-07-02 / 40 阅读 / 0 评论 /
Thrift是一种RPC框架实现,在大数据领域使用广泛。

Thrift是Facebook在2007年开发的跨语言RPC服务框架,提供多语言的编译能力,并提供多种服务器工作模式。用户通过Thrift的IDL(Interfact Definition Layer)来描述接口函数以及数据类型,然后通过Thrift的编译环境生成各种语言类型的接口文件,用户可以根据自己的需要采用不同的语言开发客户端代码和服务端代码。

1.Thrift客户端和服务端架构

thrift架构如下图所示:

可以看出,Thrift框架完整的契合RPC通信的原理。Thrift架构中主要有以下5层:

(1)“Input Code”部分是用户实现的业务逻辑。

(2)“Service Client”是Apache Thrift根据IDL自动生成的客户端和服务端代码。与“write()/read()”一起,在RPC的原理图中对应“client stub”和“server stub”。

(3)“write()/read()”也是Apache Thrift根据IDL自动生成的客户端和服务端代码,用来实现对数据的读写操作。

(4)TProtocol层是用来对数据进行序列化和反序列化,具体方法包括二进制、JSON、以及其他Apache Thrift定义的格式。

(5)TTransport层提供数据传输功能,使用Apache Thrift可以方便地定义一个服务并选择不同的传输协议。

2.Thrift网络栈

Thrift网络栈如下图所示:

网络栈有四层。

2.1.Transport层

代表thrift的数据传输方式,thrift定义了以下几种常见的数据传输方式:

(1)TSocket:阻塞式socket。

(2)TFramedTransport:以frame为单位进行传输,非阻塞式服务中使用。

(3)TFileTransport:以文件形式进行传输。

2.2.Protocol层

代表thrift客户端和服务端之间的传输数据的协议,通俗来讲就是客户端和服务端之间传输数据的格式,如json等。thrift定义了如下几种常见的格式:

(1)TBinaryProtocol:二进制格式。

(2)TCompactProtocol:压缩格式。

(3)TJSONProtocol:JSON格式。

(4)TSimpleJSONProtocol:提供只写的JSON协议。

2.3.Processor层

Processor封装了从输入数据流中读取数据和向数据流中写数据的操作。读写数据流用Protocol对象表示,与服务相关的processor实现由编译器产生。

Processor主要工作流程有:从连接中读取数据(使用input protocol),将处理授权给handler(由用户实现),最后将结果写到连接上(使用output protocol)。

2.4.Server层

Server将以上所有特性集成在一起,Server实现的步骤有:

(1)创建一个transport对象。

(2)为transport对象创建输入输出protocol。

(3)基于输入输出protocol创建processor。

(4)等待连接请求并将之交给processor处理。

3.Thrift支持的服务模型

从分类上来说,Thrift提供的网络服务模型可分为:单线程、多线程、事件驱动三种。也可划分为:阻塞服务模型、非阻塞服务模型两种。

3.1.TSimpleServer——单线程阻塞IO

TSimpleServer的工作模式采用最简单的阻塞IO模型,实现方法简洁明了,便于理解。但是,一次只能接收和处理一个socket连接,效率比较低。

流程图如下所示:

启动一个服务监听socket,由于线程处理是阻塞IO,要等到业务处理完成后,才能重新accept等待一个新的连接。

3.2.TThreadPoolServer——多线程阻塞IO

TThreadPoolServer采用阻塞socket方式工作,祝现场负责阻塞式监听是否有新的socket到来,具体的业务处理交由一个线程池来处理。。

流程图如下所示:

TThreadPoolServer解决了TSimpleServer不支持并发和多连接的问题,引入了线程池。在accept一个业务socket后,立马把业务socket封装成一个任务并提交到线程池进行执行。

3.2.1.TThreadPoolServer模式优点

TThreadPoolServer模式有以下优点:

(1)拆分了监听线程(Accept Thread)和处理客户端连接的工作现场(Worker Thread),数据读取和业务处理都交给线程池处理。因此能够提升并发度。

(2)线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池处理,性能也非常高。

3.2.2.TThreadPoolServer模式缺点

TThreadPoolServer模式有以下缺点:

(1)该模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。

(2)默认线程池允许创建的最大线程数量为Interger.MAX_VALUE,可能创建出大量线程,导致OOM(内存溢出)。

3.3.TNonblockingServer——单线程非阻塞IO

TNonblockingServer也是单线程工作,但采用NIO的模式。流程图如下所示:

TNonblockingServer利用IO多路复用模型(select、epoll)处理socket就绪事件,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket则处理连接并产生一个新业务socket并将其注册到selector上。selector当没有就绪事件时是阻塞的,有就绪事件就是非阻塞的,会往下执行。

3.3.1.TNonblockingServer模式优点

TNonblockingServer模式的优点有:

(1)相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,对accept/read/write等IO事件进行监控和处理,同时监控多个socket的状态变化。

3.3.2.TNonblockingServer模式缺点

TNonblockingServer模式的缺点有:

(1)TNonblockingServer模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

3.4.THsHaServer——多线程非阻塞IO

鉴于TNonblockingServer的缺点,THsHaServer继承于TNonblockingServer,引入了线程池提高了任务处理的并发能力。针对读操作,单独引入线程池处理,也是Reactor的实现。

流程图如下所示:

THsHaServer与TNonblockingServer模式的差异就在图中红色部分。

3.4.1.THsHaServer优点

THsHaServer的优点有:

(1)THsHaServer与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。

3.4.2.THsHaServer缺点

THsHaServer的缺点有:

(1)主线程仍然需要完成所有socket的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。

3.5.TThreadedSelectorServer——多Reactor模型

TThreadedSelectorServer是对THsHaServer的一种扩充,它将selector中读写IO事件(read/write)从主线程中分离出来。同时引入worker工作线程池。

流程图如下所示:

TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它的内部由以下4个部分组成:

(1)一个AcceptThread:相当于多Reactor的mainReactor,专门用于监听socket上的新连接。

(2)若干个SelectorThread:相当于多Reactor的subReactor,专门用于处理业务socket的网络I/O读写操作,所有网络数据的读写均是由这些线程来完成。

(3)一个负载均衡器SelectorThreadLoadBalancer对象:主要用于AcceptThread线程接收到一个socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。

(4)一个ExecutorService类型的工作线程池:在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc请求的handler回调处理。即具体业务处理线程。

5.Thrift demo

Apache Thrift官方文档中也提供了对应的demo,可查看https://thrift.apache.org/tutorial/java.html。

下面以TSimpleServer为例,展示thrift框架的使用。使用过程需要用到以下依赖:

<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.9.3</version>
</dependency>

Demo中少了一些类的自动生成(通过thrift文件生成对应的java类),以及接口的实现。

5.1.thrift server demo

具体代码如下:

package com.thrift.rpc.demo;

import com.thrift.rpc.service.UserService;
import com.thrift.rpc.service.impl.UserServiceImpl;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;

public class Server1 {

    public static void main(String[] args) {
        try {
            // 1. 创建Transport
            TServerSocket serverTransport = new TServerSocket(9123);
            TServer.Args tArgs = new TServer.Args(serverTransport);

            // 2. 为Transport创建Protocol
            tArgs.protocolFactory(new TBinaryProtocol.Factory());
            // tArgs.protocolFactory(new TCompactProtocol.Factory());
            // tArgs.protocolFactory(new TJSONProtocol.Factory());

            // 3. 为Protocol创建Processor
            TProcessor tprocessor = new UserService.Processor<UserService.Iface>(new UserServiceImpl());
            tArgs.processor(tprocessor);

            // 4. 创建Server并启动 org.apache.thrift.server.TSimpleServer - 简单的单线程服务模型,一般用于测试
            TServer server = new TSimpleServer(tArgs);
            System.out.println("UserService TSimpleServer start ....");
            server.serve();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其他的服务端模型也是按照这四个步骤来实现的,也符合《Thrift网络栈——Server层》章节中描述的操作方法。

5.2.thrift client demo

具体代码如下:

package com.thrift.rpc.demo;

import com.thrift.rpc.entity.User;
import com.thrift.rpc.exception.UserNotFoundException;
import com.thrift.rpc.service.UserService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

import java.util.List;

public class Client1 {

    public static void main(String[] args) {
        try {
            TTransport transport = new TSocket("127.0.0.1", 9123);
            TProtocol protocol = new TBinaryProtocol(transport);
            UserService.Client client = new UserService.Client(protocol);
            transport.open();

            // 查询User列表
            List<User> users = client.findUsersByName("wang");
            System.out.println("client.findUsersByName()方法結果 == >" + users);

            // 保存User
            boolean isUserSaved = client.save(new User(101, "WMJ"));
            System.out.println("user saved result == > " + isUserSaved);

            // 删除用户
            client.deleteByUserId(1002);

            transport.close();

        } catch (TTransportException e) {
            System.out.println("TTransportException==>" + e.getLocalizedMessage());
        } catch (UserNotFoundException e) {
            System.out.println("UserNotFoundException==>" + e.getLocalizedMessage());
        } catch (TException e) {
            System.out.println("TException==>" + e.getLocalizedMessage());
        }
    }
}

远程调用了findUsersByName、save、deleteByUserId三个方法。