RPC实例Apache Thrift 下篇(2)

清疚 2022-05-30 02:49 299阅读 0赞

3、正式开始编码

我已经在CSDN的资源区上传了这个示例工程的所有代码(http://download.csdn.net/download/wanbf123/10266929)。读者可以直接到资源下载站进行下载(不收积分哦~~^\_^)。这篇文章将紧接上文,主要介绍这个工程几个主要的类代码。

3-1、编写服务端主程序

服务端主程序的类名:processor.MainProcessor,它负责在服务端启动Apache Thrift并且在服务监听启动成功后,连接到zookeeper,注册这个服务的基本信息。

这里要注意一下,Apache Thrift的服务监听是阻塞式的,所以processor.MainProcessor的Apache Thrift操作应该另起线程进行(processor.MainProcessor.StartServerThread),并且通过线程间的锁定操作,保证zookeeper的连接一定是在Apache Thrift成功启动后才进行。

  1. package processor;
  2. import java.io.IOException;
  3. import java.util.Set;
  4. import java.util.concurrent.Executors;
  5. import net.sf.json.JSONObject;
  6. import org.apache.commons.logging.Log;
  7. import org.apache.commons.logging.LogFactory;
  8. import org.apache.log4j.BasicConfigurator;
  9. import org.apache.thrift.TProcessor;
  10. import org.apache.thrift.protocol.TBinaryProtocol;
  11. import org.apache.thrift.protocol.TProtocol;
  12. import org.apache.thrift.server.ServerContext;
  13. import org.apache.thrift.server.TServerEventHandler;
  14. import org.apache.thrift.server.TThreadPoolServer;
  15. import org.apache.thrift.server.TThreadPoolServer.Args;
  16. import org.apache.thrift.transport.TServerSocket;
  17. import org.apache.thrift.transport.TTransport;
  18. import org.apache.thrift.transport.TTransportException;
  19. import org.apache.zookeeper.CreateMode;
  20. import org.apache.zookeeper.KeeperException;
  21. import org.apache.zookeeper.WatchedEvent;
  22. import org.apache.zookeeper.Watcher;
  23. import org.apache.zookeeper.ZooKeeper;
  24. import org.apache.zookeeper.ZooDefs.Ids;
  25. import org.apache.zookeeper.data.Stat;
  26. import business.BusinessServicesMapping;
  27. import thrift.iface.DIYFrameworkService;
  28. import thrift.iface.DIYFrameworkService.Iface;
  29. public class MainProcessor {
  30. static {
  31. BasicConfigurator.configure();
  32. }
  33. /** * 日志 */
  34. private static final Log LOGGER = LogFactory.getLog(MainProcessor.class);
  35. private static final Integer SERVER_PORT = 8090;
  36. /** * 专门用于锁定以保证这个主线程不退出的一个object对象 */
  37. private static final Object WAIT_OBJECT = new Object();
  38. /** * 标记apache thrift是否启动成功了 * 只有apache thrift启动成功了,才需要连接到zk */
  39. private boolean isthriftStart = false;
  40. public static void main(String[] args) {
  41. /* * 主程序要做的事情: * * 1、启动thrift服务。并且服务调用者的请求 * 2、连接到zk,并向zk注册自己提供的服务名称,告知zk真实的访问地址、访问端口 * (向zk注册的服务,存储在BusinessServicesMapping这个类的K-V常量中) * */
  42. //1、========启动thrift服务
  43. MainProcessor mainProcessor = new MainProcessor();
  44. mainProcessor.startServer();
  45. // 一直等待,apache thrift启动完成
  46. synchronized (mainProcessor) {
  47. try {
  48. while(!mainProcessor.isthriftStart) {
  49. mainProcessor.wait();
  50. }
  51. } catch (InterruptedException e) {
  52. MainProcessor.LOGGER.error(e);
  53. System.exit(-1);
  54. }
  55. }
  56. //2、========连接到zk
  57. try {
  58. mainProcessor.connectZk();
  59. } catch (IOException | KeeperException | InterruptedException e) {
  60. MainProcessor.LOGGER.error(e);
  61. System.exit(-1);
  62. }
  63. // 这个wait在业务层面,没有任何意义。只是为了保证这个守护线程不会退出
  64. synchronized (MainProcessor.WAIT_OBJECT) {
  65. try {
  66. MainProcessor.WAIT_OBJECT.wait();
  67. } catch (InterruptedException e) {
  68. MainProcessor.LOGGER.error(e);
  69. System.exit(-1);
  70. }
  71. }
  72. }
  73. /** * 这个私有方法用于连接到zk上,并且注册相关服务 * @throws IOException * @throws InterruptedException * @throws KeeperException */
  74. private void connectZk() throws IOException, KeeperException, InterruptedException {
  75. // 读取这个服务提供者,需要在zk上注册的服务
  76. Set<String> serviceNames = BusinessServicesMapping.SERVICES_MAPPING.keySet();
  77. // 如果没有任何服务需要注册到zk,那么这个服务提供者就没有继续注册的必要了
  78. if(serviceNames == null || serviceNames.isEmpty()) {
  79. return;
  80. }
  81. // 默认的监听器
  82. MyDefaultWatcher defaultWatcher = new MyDefaultWatcher();
  83. // 连接到zk服务器集群,添加默认的watcher监听
  84. ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher);
  85. //创建一个父级节点Service
  86. Stat pathStat = null;
  87. try {
  88. pathStat = zk.exists("/Service", defaultWatcher);
  89. //如果条件成立,说明节点不存在(只需要判断一个节点的存在性即可)
  90. //创建的这个节点是一个“永久状态”的节点
  91. if(pathStat == null) {
  92. zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  93. }
  94. } catch(Exception e) {
  95. System.exit(-1);
  96. }
  97. // 开始添加子级节点,每一个子级节点都表示一个这个服务提供者提供的业务服务
  98. for (String serviceName : serviceNames) {
  99. JSONObject nodeData = new JSONObject();
  100. nodeData.put("ip", "127.0.0.1");
  101. nodeData.put("port", MainProcessor.SERVER_PORT);
  102. zk.create("/Service/" + serviceName, nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  103. }
  104. //执行到这里,说明所有的service都启动完成了
  105. MainProcessor.LOGGER.info("===================所有service都启动完成了,主线程开始启动===================");
  106. }
  107. /** * 这个私有方法用于开启Apache thrift服务端,并进行持续监听 * @throws TTransportException */
  108. private void startServer() {
  109. Thread startServerThread = new Thread(new StartServerThread());
  110. startServerThread.start();
  111. }
  112. private class StartServerThread implements Runnable {
  113. @Override
  114. public void run() {
  115. MainProcessor.LOGGER.info("看到这句就说明thrift服务端准备工作 ....");
  116. // 服务执行控制器(只要是调度服务的具体实现该如何运行)
  117. TProcessor tprocessor = new DIYFrameworkService.Processor<Iface>(new DIYFrameworkServiceImpl());
  118. // 基于阻塞式同步IO模型的Thrift服务,正式生产环境不建议用这个
  119. TServerSocket serverTransport = null;
  120. try {
  121. serverTransport = new TServerSocket(MainProcessor.SERVER_PORT);
  122. } catch (TTransportException e) {
  123. MainProcessor.LOGGER.error(e);
  124. System.exit(-1);
  125. }
  126. // 为这个服务器设置对应的IO网络模型、设置使用的消息格式封装、设置线程池参数
  127. Args tArgs = new Args(serverTransport);
  128. tArgs.processor(tprocessor);
  129. tArgs.protocolFactory(new TBinaryProtocol.Factory());
  130. tArgs.executorService(Executors.newFixedThreadPool(100));
  131. // 启动这个thrift服务
  132. TThreadPoolServer server = new TThreadPoolServer(tArgs);
  133. server.setServerEventHandler(new StartServerEventHandler());
  134. server.serve();
  135. }
  136. }
  137. /** * 为这个TThreadPoolServer对象,设置是一个事件处理器。 * 以便在TThreadPoolServer正式开始监听服务请求前,通知mainProcessor: * “Apache Thrift已经成功启动了” * @author yinwenjie * */
  138. private class StartServerEventHandler implements TServerEventHandler {
  139. @Override
  140. public void preServe() {
  141. /* * 需要实现这个方法,以便在服务启动成功后, * 通知mainProcessor: “Apache Thrift已经成功启动了” * */
  142. MainProcessor.this.isthriftStart = true;
  143. synchronized (MainProcessor.this) {
  144. MainProcessor.this.notify();
  145. }
  146. }
  147. /* (non-Javadoc) * @see org.apache.thrift.server.TServerEventHandler#createContext(org.apache.thrift.protocol.TProtocol, org.apache.thrift.protocol.TProtocol) */
  148. @Override
  149. public ServerContext createContext(TProtocol input, TProtocol output) {
  150. /* * 无需实现 * */
  151. return null;
  152. }
  153. @Override
  154. public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
  155. /* * 无需实现 * */
  156. }
  157. @Override
  158. public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
  159. /* * 无需实现 * */
  160. }
  161. }
  162. /** * 这是默认的watcher,什么也没有,也不需要有什么<br> * 因为按照功能需求,服务器端并不需要监控zk上的任何目录变化事件 * @author yinwenjie */
  163. private class MyDefaultWatcher implements Watcher {
  164. public void process(WatchedEvent event) {
  165. }
  166. }
  167. }

3-2、编写服务具体实现

服务端具体实现的代码很简单,就是在IDL脚本生成了java代码后,对DIYFrameworkService接口进行的实现。

  1. package processor;
  2. import java.nio.ByteBuffer;
  3. import net.sf.json.JSONObject;
  4. import org.apache.commons.lang.StringUtils;
  5. import org.apache.commons.logging.Log;
  6. import org.apache.commons.logging.LogFactory;
  7. import org.apache.thrift.TException;
  8. import business.BusinessService;
  9. import business.BusinessServicesMapping;
  10. import business.exception.BizException;
  11. import business.exception.ResponseCode;
  12. import business.pojo.AbstractPojo;
  13. import business.pojo.BusinessResponsePojo;
  14. import business.pojo.DescPojo;
  15. import thrift.iface.DIYFrameworkService.Iface;
  16. import thrift.iface.EXCCODE;
  17. import thrift.iface.RESCODE;
  18. import thrift.iface.Reponse;
  19. import thrift.iface.Request;
  20. import thrift.iface.ServiceException;
  21. import utils.JSONUtils;
  22. /** * IDL文件中,我们定义的唯一服务接口DIYFrameworkService.Iface的唯一实现 * @author yinwenjie * */
  23. public class DIYFrameworkServiceImpl implements Iface {
  24. /** * 日志 */
  25. public static final Log LOGGER = LogFactory.getLog(DIYFrameworkServiceImpl.class);
  26. /* (non-Javadoc) * @see thrift.iface.DIYFrameworkService.Iface#send(thrift.iface.Request) */
  27. @SuppressWarnings("unchecked")
  28. @Override
  29. public Reponse send(Request request) throws ServiceException, TException {
  30. /* * 由于MainProcessor中,在Apache Thrift 服务端启动时已经加入了线程池,所以这里就不需要再使用线程池了 * 这个服务方法的实现,需要做以下事情: * * 1、根据request中,描述的具体服务名称,在配置信息中查找具体的服务类 * 2、使用java的反射机制,调用具体的服务类(BusinessService接口的实现类)。 * 3、根据具体的业务处理结构,构造Reponse对象,并进行返回 * */
  31. //1、===================
  32. String serviceName = request.getServiceName();
  33. String className = BusinessServicesMapping.SERVICES_MAPPING.get(serviceName);
  34. //未发现服务
  35. if(StringUtils.isEmpty(className)) {
  36. return this.buildErrorReponse("无效的服务" , null);
  37. }
  38. //2、===================
  39. // 首先得到以json为描述格式的请求参数信息
  40. JSONObject paramJSON = null;
  41. try {
  42. byte [] paramJSON_bytes = request.getParamJSON();
  43. if(paramJSON_bytes != null && paramJSON_bytes.length > 0) {
  44. String paramJSON_string = new String(paramJSON_bytes);
  45. paramJSON = JSONObject.fromObject(paramJSON_string);
  46. }
  47. } catch(Exception e) {
  48. DIYFrameworkServiceImpl.LOGGER.error(e);
  49. // 向调用者抛出异常
  50. throw new ServiceException(EXCCODE.PARAMNOTFOUND, e.getMessage());
  51. }
  52. // 试图进行反射
  53. BusinessService<AbstractPojo> businessServiceInstance = null;
  54. try {
  55. businessServiceInstance = (BusinessService<AbstractPojo>)Class.forName(className).newInstance();
  56. } catch (Exception e) {
  57. DIYFrameworkServiceImpl.LOGGER.error(e);
  58. // 向调用者抛出异常
  59. throw new ServiceException(EXCCODE.SERVICENOTFOUND, e.getMessage());
  60. }
  61. // 进行调用
  62. AbstractPojo returnPojo = null;
  63. try {
  64. returnPojo = businessServiceInstance.handle(paramJSON);
  65. } catch (BizException e) {
  66. DIYFrameworkServiceImpl.LOGGER.error(e);
  67. return this.buildErrorReponse(e.getMessage() , e.getResponseCode());
  68. }
  69. // 构造处理成功情况下的返回信息
  70. BusinessResponsePojo responsePojo = new BusinessResponsePojo();
  71. responsePojo.setData(returnPojo);
  72. DescPojo descPojo = new DescPojo("", ResponseCode._200);
  73. responsePojo.setDesc(descPojo);
  74. // 生成json
  75. String returnString = JSONUtils.toString(responsePojo);
  76. byte[] returnBytes = returnString.getBytes();
  77. ByteBuffer returnByteBuffer = ByteBuffer.allocate(returnBytes.length);
  78. returnByteBuffer.put(returnBytes);
  79. returnByteBuffer.flip();
  80. // 构造response
  81. Reponse reponse = new Reponse(RESCODE._200, returnByteBuffer);
  82. return reponse;
  83. }
  84. /** * 这个私有方法,用于构造“Thrift中错误的返回信息” * @param erroe_mess * @return */
  85. private Reponse buildErrorReponse(String erroe_mess , ResponseCode responseCode) {
  86. // 构造返回信息
  87. BusinessResponsePojo responsePojo = new BusinessResponsePojo();
  88. responsePojo.setData(null);
  89. DescPojo descPojo = new DescPojo(erroe_mess, responseCode == null?ResponseCode._504:responseCode);
  90. responsePojo.setDesc(descPojo);
  91. // 存储byteBuffer;
  92. String responseJSON = JSONUtils.toString(responsePojo);
  93. byte[] responseJSON_bytes = responseJSON.getBytes();
  94. ByteBuffer byteBuffer = ByteBuffer.allocate(responseJSON_bytes.length);
  95. byteBuffer.put(byteBuffer);
  96. byteBuffer.flip();
  97. Reponse reponse = new Reponse(RESCODE._500, byteBuffer);
  98. return reponse;
  99. }
  100. }

3-3、编写客户端实现

在上文中已经介绍过了,客户端有两件事情需要做:连接到zookeeper查询注册的服务该如何访问;然后向真实的服务提供者发起请求。代码如下:

  1. package client;
  2. import java.nio.ByteBuffer;
  3. import java.util.List;
  4. import net.sf.json.JSONObject;
  5. import org.apache.commons.lang.StringUtils;
  6. import org.apache.commons.logging.Log;
  7. import org.apache.commons.logging.LogFactory;
  8. import org.apache.log4j.BasicConfigurator;
  9. import org.apache.thrift.protocol.TBinaryProtocol;
  10. import org.apache.thrift.protocol.TProtocol;
  11. import org.apache.thrift.transport.TSocket;
  12. import org.apache.zookeeper.CreateMode;
  13. import org.apache.zookeeper.WatchedEvent;
  14. import org.apache.zookeeper.Watcher;
  15. import org.apache.zookeeper.ZooKeeper;
  16. import org.apache.zookeeper.ZooDefs.Ids;
  17. import org.apache.zookeeper.data.Stat;
  18. import thrift.iface.DIYFrameworkService.Client;
  19. import thrift.iface.Reponse;
  20. import thrift.iface.Request;
  21. import utils.JSONUtils;
  22. public class ThriftClient {
  23. /** * 日志 */
  24. private static final Log LOGGER = LogFactory.getLog(ThriftClient.class);
  25. private static final String SERVCENAME = "queryUserDetailService";
  26. static {
  27. BasicConfigurator.configure();
  28. }
  29. public static final void main(String[] main) throws Exception {
  30. /* * 服务治理框架的客户端示例,要做以下事情: * * 1、连接到zk,查询当前zk下提供的服务列表中是否有自己需要的服务名称(queryUserDetailService) * 2、如果没有找到需要的服务名称,则客户端终止工作 * 3、如果找到了服务,则通过服务给出的ip,port,基于Thrift进行正式请求 * (这时,和zookeeper是否断开,关系就不大了) * */
  31. // 1、===========================
  32. // 默认的监听器
  33. ClientDefaultWatcher defaultWatcher = new ClientDefaultWatcher();
  34. // 连接到zk服务器集群,添加默认的watcher监听
  35. ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher);
  36. /* * 为什么客户端连接上来以后,也可能创建一个Service根目录呢? * 因为正式的环境下,不能保证客户端一点就在服务器端全部准备好的情况下,再来做调用请求 * */
  37. Stat pathStat = null;
  38. try {
  39. pathStat = zk.exists("/Service", defaultWatcher);
  40. //如果条件成立,说明节点不存在(只需要判断一个节点的存在性即可)
  41. //创建的这个节点是一个“永久状态”的节点
  42. if(pathStat == null) {
  43. zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  44. }
  45. } catch(Exception e) {
  46. System.exit(-1);
  47. }
  48. // 2、===========================
  49. //获取服务列表(不需要做任何的事件监听,所以第二个参数可以为false)
  50. List<String> serviceList = zk.getChildren("/Service", false);
  51. if(serviceList == null || serviceList.isEmpty()) {
  52. ThriftClient.LOGGER.info("未发现相关服务,客户端退出");
  53. return;
  54. }
  55. //然后查看要找寻的服务是否在存在
  56. boolean isFound = false;
  57. byte[] data;
  58. for (String serviceName : serviceList) {
  59. if(StringUtils.equals(serviceName, ThriftClient.SERVCENAME)) {
  60. isFound = true;
  61. break;
  62. }
  63. }
  64. if(!isFound) {
  65. ThriftClient.LOGGER.info("未发现相关服务,客户端退出");
  66. return;
  67. } else {
  68. data = zk.getData("/Service/" + ThriftClient.SERVCENAME, false, null);
  69. }
  70. /* * 执行到这里,zk的工作就完成了,接下来zk是否断开,就不重要了 * */
  71. zk.close();
  72. if(data == null || data.length == 0) {
  73. ThriftClient.LOGGER.info("未发现有效的服务端地址,客户端退出");
  74. return;
  75. }
  76. // 得到服务器地值说明
  77. JSONObject serverTargetJSON = null;
  78. String serverIp;
  79. String serverPort;
  80. try {
  81. serverTargetJSON = JSONObject.fromObject(new String(data));
  82. serverIp = serverTargetJSON.getString("ip");
  83. serverPort = serverTargetJSON.getString("port");
  84. } catch(Exception e) {
  85. ThriftClient.LOGGER.error(e);
  86. return;
  87. }
  88. //3、===========================
  89. TSocket transport = new TSocket(serverIp, Integer.parseInt(serverPort));
  90. TProtocol protocol = new TBinaryProtocol(transport);
  91. // 准备调用参数
  92. JSONObject jsonParam = new JSONObject();
  93. jsonParam.put("username", "yinwenjie");
  94. byte[] params = jsonParam.toString().getBytes();
  95. ByteBuffer buffer = ByteBuffer.allocate(params.length);
  96. buffer.put(params);
  97. buffer.flip();
  98. Request request = new Request(buffer, ThriftClient.SERVCENAME);
  99. // 开始调用
  100. Client client = new Client(protocol);
  101. // 准备传输
  102. transport.open();
  103. // 正式调用接口
  104. Reponse reponse = client.send(request);
  105. byte[] responseBytes = reponse.getResponseJSON();
  106. // 一定要记住关闭
  107. transport.close();
  108. // 将返回信息显示出来
  109. ThriftClient.LOGGER.info("respinse value = " + new String(responseBytes));
  110. }
  111. }
  112. /** * 这是默认的watcher,什么也没有,也不需要有什么<br> * 因为按照功能需求,客户端并不需要监控zk上的任何目录变化事件 * @author yinwenjie */
  113. class ClientDefaultWatcher implements Watcher {
  114. public void process(WatchedEvent event) {
  115. }
  116. }

3-4、工程结构说明

以上代码是服务器端、客户端的主要代码。整个工程还有其他的辅助代码,为了让各位读者能够看得清楚直接,我们将整个工程结构进行一下说明,下载后导入的工程结构如下图所示:

这里写图片描述

  1. 这是一个典型的JAVA工程。请使用 JDK 1.6+ 版本。我们将讲解整个工程结构。首先来看看这个工程中主要的package和它们的作用。
  2. business:具体的业务层逻辑都在这个包里面,其中exception包含了一个业务层异常的定义BizException,还有错误代码ResponseCode;impl包中放置具体的业务层实现,它们都必须实现BusinessService接口;Pojo是业务层对象模型。client:为了简单起见,我将服务端的实现和客户端的实现放置在一个工程中,client这个包就是客户端的实现代码了;utils包放置了两个工具类,用来进行日期格式化的DataUtils和用来进行json转换的JSONUtils。
  3. 定义的apache thrift IDL文件放置在thrift文件夹下面,名字叫做:demoHello.thrift;您可以使用它生成各种语言的代码;
  4. 工程需要maven的支持。
  5. 2016年08月08日,由网友OneZhous发现了一个程序的bug,这是由于Apache Thrift内部并不会在进行org.apache.thrift.TBaseHelper.copyBinary执行时,将java.nio.ByteBuffer自动进行flip()。所以在完成request和response对象设置后,需要开发人员自行进行flip()。感谢OneZhous对文章中的问题进行纠正,但是CSDN由于无法修改已上传的资源,所以还请各位读者在下载运行时注意这个问题:

    …… ByteBuffer buffer = ByteBuffer.allocate(params.length); buffer.put(params); buffer.flip(); // 以及位置 ByteBuffer byteBuffer = ByteBuffer.allocate(responseJSON_bytes.length); byteBuffer.put(byteBuffer); byteBuffer.flip(); ……

发表评论

表情:
评论列表 (有 0 条评论,299人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RPC实例Apache Thrift

    1、概述 通过上一篇文章RPC的基本概念的介绍,相信读者已经理解了基本的RPC概念。为了加深这个理解,后面几篇文章我将详细讲解一款典型的RPC规范的实现Apache Th

    相关 thrift rpc通信

    thrift rpc通信 框架   别人的简历: 负责抓取程序的开发和维护,对抓取内容进行数据提取、整理。 1、定向数据抓取程序的维护和开发,了解了Sqlite数据库