grpc-java springboot 同步异步调用 demo

曾经终败给现在 2021-09-21 21:10 794阅读 0赞

近期学习 grpc-java,从 demo 入手,进行学习

grpc 的基础知识暂不整理,后续有时间再整理一波,这一篇主要是记录一个简单的 grpc 使用 springboot 创建的 demo。

我整个 demo 分成三个 Project :

  • grpc-springboot-demo-api:用于存放 proto 文件,生成Java 代码
  • grpc-springboot-demo-server:服务端代码
  • grpc-springboot-demo-consumer:客户端代码

demo-api(proto)

创建一个最简单的 proto 文件,请求跟相应都只包含一个 String 的对象。

  1. syntax = "proto3";
  2. option java_package = "grpc.springboot.demo.api";
  3. option java_outer_classname = "HelloWorldService";
  4. package helloworld;
  5. // 定义服务
  6. service HelloWorld {
  7. rpc SayHello (HelloRequest) returns (HelloResponse) {}
  8. }
  9. // 定义请求体
  10. message HelloRequest {
  11. string message = 1;
  12. }
  13. // 定义相应内容
  14. message HelloResponse {
  15. string message = 1;
  16. }

文件创建好之后,需要进行编译,pom文件中需要引入相关插件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.parallelline1996</groupId>
  7. <artifactId>grpc-springboot-demo-api</artifactId>
  8. <version>1.0.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>com.google.protobuf</groupId>
  12. <artifactId>protobuf-java</artifactId>
  13. <version>3.5.1</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>io.grpc</groupId>
  17. <artifactId>grpc-all</artifactId>
  18. <version>1.12.0</version>
  19. </dependency>
  20. </dependencies>
  21. <build>
  22. <extensions>
  23. <extension>
  24. <groupId>kr.motd.maven</groupId>
  25. <artifactId>os-maven-plugin</artifactId>
  26. <version>1.4.1.Final</version>
  27. </extension>
  28. </extensions>
  29. <plugins>
  30. <plugin>
  31. <groupId>org.xolstice.maven.plugins</groupId>
  32. <artifactId>protobuf-maven-plugin</artifactId>
  33. <version>0.5.0</version>
  34. <configuration>
  35. <protocArtifact>com.google.protobuf:protoc:3.0.0:exe:${os.detected.classifier}</protocArtifact>
  36. <pluginId>grpc-java</pluginId>
  37. <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.0:exe:${os.detected.classifier}</pluginArtifact>
  38. </configuration>
  39. <executions>
  40. <execution>
  41. <goals>
  42. <goal>compile</goal>
  43. <goal>compile-custom</goal>
  44. </goals>
  45. </execution>
  46. </executions>
  47. </plugin>
  48. <plugin>
  49. <groupId>org.apache.maven.plugins</groupId>
  50. <artifactId>maven-compiler-plugin</artifactId>
  51. <configuration>
  52. <source>6</source>
  53. <target>6</target>
  54. </configuration>
  55. </plugin>
  56. </plugins>
  57. </build>
  58. </project>

完成后,直接通过 maven 工具的 install 指令,推到本地仓库即可,构建时,便会自动生成我们需要的类。

在这里插入图片描述

demo-server

服务端项目的目录结构:

  • Starter:springboot 的启动类
  • GrpcServerConfiguration:grpc 服务端的启动类
  • HelloServerImpl:提供服务的实现类

在这里插入图片描述
首先 pom 文件,主要是导入 springboot 的启动的依赖,以及 grpc 的依赖,以及我们上面的 api 项目。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.parallelline1996</groupId>
  7. <artifactId>grpc-springboot-demo-server</artifactId>
  8. <version>1.0.0-SNAPSHOT</version>
  9. <properties>
  10. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>com.parallelline1996</groupId>
  16. <artifactId>grpc-springboot-demo-api</artifactId>
  17. <version>1.0.0-SNAPSHOT</version>
  18. </dependency>
  19. <!-- spring boot -->
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.httpcomponents</groupId>
  31. <artifactId>httpclient</artifactId>
  32. <version>4.5.5</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-actuator</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>io.grpc</groupId>
  40. <artifactId>grpc-all</artifactId>
  41. <version>1.28.1</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.google.guava</groupId>
  45. <artifactId>guava</artifactId>
  46. <version>27.0-jre</version>
  47. </dependency>
  48. </dependencies>
  49. <dependencyManagement>
  50. <dependencies>
  51. <dependency>
  52. <groupId>org.springframework.boot</groupId>
  53. <artifactId>spring-boot-dependencies</artifactId>
  54. <version>1.5.21.RELEASE</version>
  55. <type>pom</type>
  56. <scope>import</scope>
  57. </dependency>
  58. </dependencies>
  59. </dependencyManagement>
  60. </project>

Starter 启动类:

  1. @SpringBootApplication
  2. public class Starter implements CommandLineRunner {
  3. private GrpcServerConfiguration configuration;
  4. public Starter(GrpcServerConfiguration configuration) {
  5. this.configuration = configuration;
  6. }
  7. // 启动 grpc 服务端
  8. @Override
  9. public void run(String... args) throws Exception {
  10. configuration.start();
  11. }
  12. // 启动 springboot 容器
  13. public static void main(String[] args) {
  14. SpringApplication.run(Starter.class, args);
  15. }
  16. }

GrpcServerConfiguration grpc 服务端启动类:
这里注意:如果不在创建 Server 指定线程池,则会使用 grpc-java 中提供的默认线程池响应从客户端发送的请求(后续关于 Server 的创建,会再写博客说明)。也可以直接指定线程池,只需调用 ServerBuilder.executor(executor)
即可。

  1. @Component
  2. public class GrpcServerConfiguration {
  3. private static final Logger logger = LoggerFactory.getLogger(GrpcServerConfiguration.class);
  4. // Grpc 服务端提供的服务
  5. @Autowired
  6. private HelloServiceImpl service;
  7. // Grpc 服务端的地址
  8. @Value("${grpc.server.port}")
  9. private int port;
  10. private Server server;
  11. private ThreadPoolExecutor executor;
  12. @PostConstruct
  13. public void init() {
  14. executor = new ThreadPoolExecutor(10, 10, 60,
  15. TimeUnit.MILLISECONDS, new SynchronousQueue<>());
  16. }
  17. public void start() throws IOException {
  18. // 构建服务端
  19. logger.info("Starting gRPC on port {}.", port);
  20. // 使用默认线程池
  21. server = ServerBuilder.forPort(port).addService(service).build().start();
  22. // 使用自定义线程池用于处理请求
  23. // server = ServerBuilder.forPort(port).addService(service).executor(executor).build().start();
  24. logger.info("gRPC server started, listening on {}.", port);
  25. // 添加服务端关闭的逻辑
  26. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  27. @Override
  28. public void run() {
  29. logger.info("Shutting down gRPC server.");
  30. GrpcServerConfiguration.this.stop();
  31. logger.info("gRPC server shut down successfully.");
  32. }
  33. }));
  34. }
  35. private void stop() {
  36. if (server != null) {
  37. // 关闭服务端
  38. server.shutdown();
  39. }
  40. }

HelloServiceImpl:业务逻辑的实现类,继承 demo-api 中申明的 HelloWorldGrpc.HelloWorldImplBase,重写 sayHello 方法即可。

  1. @Component
  2. public class HelloServiceImpl extends HelloWorldGrpc.HelloWorldImplBase {
  3. private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
  4. @Override
  5. public void sayHello(HelloWorldService.HelloRequest request,
  6. StreamObserver<HelloWorldService.HelloResponse> responseObserver) {
  7. String message = request.getMessage();
  8. logger.info("Client Received message:{}", message);
  9. // 创建结果,返回响应内容
  10. String rsp = String.format("Hello, %s. This message have send to server", message);
  11. HelloWorldService.HelloResponse response = HelloWorldService.HelloResponse
  12. .newBuilder()
  13. .setMessage(rsp)
  14. .build();
  15. responseObserver.onNext(response);
  16. responseObserver.onCompleted();
  17. }

demo-consumer

客服端项目的目录结构:

  • Starter:springboot 的启动类
  • GrpcClient:客户端的启动类
  • HelloController:对外提供服务

在这里插入图片描述

pom 文件,主要是导入 springboot 的启动的依赖,以及 grpc 的依赖,同样也需要引入 api 项目的依赖。

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.parallelline1996</groupId>
  7. <artifactId>grpc-springboot-demo-consumer</artifactId>
  8. <version>1.0.0-SNAPSHOT</version>
  9. <properties>
  10. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>com.parallelline1996</groupId>
  16. <artifactId>grpc-springboot-demo-api</artifactId>
  17. <version>1.0.0-SNAPSHOT</version>
  18. </dependency>
  19. <!-- spring boot -->
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.httpcomponents</groupId>
  31. <artifactId>httpclient</artifactId>
  32. <version>4.5.5</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.springframework.boot</groupId>
  36. <artifactId>spring-boot-starter-actuator</artifactId>
  37. </dependency>
  38. <dependency>
  39. <groupId>io.grpc</groupId>
  40. <artifactId>grpc-all</artifactId>
  41. <version>1.28.1</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.google.guava</groupId>
  45. <artifactId>guava</artifactId>
  46. <version>27.0-jre</version>
  47. </dependency>
  48. </dependencies>
  49. <dependencyManagement>
  50. <dependencies>
  51. <dependency>
  52. <groupId>org.springframework.boot</groupId>
  53. <artifactId>spring-boot-dependencies</artifactId>
  54. <version>1.5.21.RELEASE</version>
  55. <type>pom</type>
  56. <scope>import</scope>
  57. </dependency>
  58. </dependencies>
  59. </dependencyManagement>
  60. </project>

Starter,启动类,与服务端服务启动类无差异。

  1. @SpringBootApplication
  2. public class Starter implements CommandLineRunner {
  3. private GrpcClient configuration;
  4. public Starter(GrpcClient configuration) {
  5. this.configuration = configuration;
  6. }
  7. // 启动 grpc 客户端
  8. @Override
  9. public void run(String... args) {
  10. configuration.start();
  11. }
  12. // 启动 springboot 容器
  13. public static void main(String[] args) {
  14. SpringApplication.run(Starter.class, args);
  15. }
  16. }

GrpcClient,gRPC 客户端,主要作用是监听 gRPC 服务端,开启通道。

  1. @Component
  2. public class GrpcClient {
  3. private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
  4. // Grpc 服务端的地址
  5. @Value("${grpc.server.host}")
  6. private String host;
  7. // Grpc 服务端暴露的接口
  8. @Value("${grpc.server.port}")
  9. private int port;
  10. private HelloWorldGrpc.HelloWorldBlockingStub stub;
  11. private HelloWorldGrpc.HelloWorldFutureStub futureStub;
  12. public void start() {
  13. // 开启channel
  14. ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
  15. // 通过 channel 获取服务端 blocking-stub
  16. stub = HelloWorldGrpc.newBlockingStub(channel);
  17. // 通过 channel 获取服务端 future-stub
  18. futureStub = HelloWorldGrpc.newFutureStub(channel);
  19. logger.info("gRPC client start, server address: host:{}, port:{}", host, port);
  20. }
  21. public HelloWorldGrpc.HelloWorldBlockingStub getStub() {
  22. return this.stub;
  23. }
  24. public HelloWorldGrpc.HelloWorldFutureStub getFutureStub() {
  25. return this.futureStub;
  26. }
  27. }

gRPC 的客户端支持三种调用方式:

  • 同步调用
  • 基于 Future 的异步调用
  • 基于回调的异步调用

下面的代码分别对应三种调用方式:

  1. @RestController
  2. public class HelloController {
  3. private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
  4. @Autowired
  5. GrpcClient configuration;
  6. /** * 同步调用,阻塞 */
  7. @GetMapping("/hello")
  8. public String hello(@RequestParam(name = "name", defaultValue = "test", required = false) String name) {
  9. logger.info("Server received request.name={}", name);
  10. // 构建一个请求
  11. HelloWorldService.HelloRequest request = HelloWorldService.HelloRequest
  12. .newBuilder()
  13. .setMessage(name)
  14. .build();
  15. // 使用stub发送请求至服务端
  16. try {
  17. // 这里获取了一个 blocking-stub
  18. HelloWorldGrpc.HelloWorldBlockingStub stub = configuration.getStub();
  19. // 同步发送,将阻塞等待请求结果
  20. HelloWorldService.HelloResponse response = stub.sayHello(request);
  21. logger.info("Server response received: [{}]", response.toString());
  22. return response.getMessage();
  23. } catch (StatusRuntimeException e) {
  24. logger.info(" Error Received - Error code : {}", e.getStatus().getCode());
  25. }
  26. return "error";
  27. }
  28. /** * 基于 Future 的异步调用 */
  29. @GetMapping("/hello-async-future")
  30. public String helloAsyncFuture(@RequestParam(name = "name", defaultValue = "test", required = false) String name) {
  31. logger.info("Server received request.name={}", name);
  32. // 构建一个请求
  33. HelloWorldService.HelloRequest request = HelloWorldService.HelloRequest
  34. .newBuilder()
  35. .setMessage(name)
  36. .build();
  37. // 使用stub发送请求至服务端
  38. try {
  39. // 获取 future-stub 对象,调用请求后,将获取 future
  40. ListenableFuture<HelloWorldService.HelloResponse> listenableFuture = configuration.getFutureStub().sayHello(request);
  41. // 通过 future 对象的 get 请求获取结果
  42. // 由于在同一线程中调用,并没有实现真正的异步,仍为同步阻塞
  43. HelloWorldService.HelloResponse response = listenableFuture.get();
  44. logger.info("Server response received: [{}]", response.toString());
  45. return response.getMessage();
  46. } catch (StatusRuntimeException e) {
  47. logger.info(" Error Received - Error code : {}", e.getStatus().getCode());
  48. } catch (Exception e) {
  49. logger.error("error ", e);
  50. }
  51. return "error";
  52. }
  53. /** * 基于回调的异步调用 */
  54. @GetMapping("/hello-async-callback")
  55. public String helloAsyncCallback(@RequestParam(name = "name", defaultValue = "test", required = false) String name) {
  56. logger.info("Server received request.name={}", name);
  57. // 构建一个请求
  58. HelloWorldService.HelloRequest request = HelloWorldService.HelloRequest
  59. .newBuilder()
  60. .setMessage(name)
  61. .build();
  62. // 使用stub发送请求至服务端
  63. try {
  64. HelloWorldGrpc.HelloWorldFutureStub futureStub = configuration.getFutureStub();
  65. ListenableFuture<HelloWorldService.HelloResponse> listenableFuture = futureStub.sayHello(request);
  66. // 通过设备监听器,同时指定回调线程池的方式进行
  67. listenableFuture.addListener(() -> {
  68. try {
  69. HelloWorldService.HelloResponse response = listenableFuture.get();
  70. logger.info("Server response received: [{}]", response.toString());
  71. } catch (Exception e) {
  72. e.printStackTrace();
  73. }
  74. }, new ThreadPoolExecutor(1, 1, 60,
  75. TimeUnit.MILLISECONDS, new SynchronousQueue<>()));
  76. return "async";
  77. } catch (StatusRuntimeException e) {
  78. logger.info(" Error Received - Error code : {}", e.getStatus().getCode());
  79. } catch (Exception e) {
  80. logger.error("error ", e);
  81. }
  82. return "error";
  83. }
  84. }

完整的代码连接:https://github.com/Parallelline1996/Daily-Learning/tree/master/gRPC/code-repository/grpc-springboot-demo

发表评论

表情:
评论列表 (有 0 条评论,794人围观)

还没有评论,来说两句吧...

相关阅读

    相关 同步调用异步调用

    同步调用: 调用者等待被调用者返回结果(执行完)才执行下一步 优点: 代码简单 缺点: 若被调用者执行的是耗时操作,会产生阻塞 异步调用: 调用者不用等待

    相关 SpringBoot异步调用

    除了异步请求,一般上我们用的比较多的应该是异步调用。通常在开发过程中,会遇到一个方法是和实际业务无关的,没有紧密性的。比如记录日志信息等业务。这个时候正常就是启一个新线程去做一

    相关 Springboot 异步调用

    当我们业务中需要执行一个比较耗时的任务并且不影响主题程序的运行,那就需要异步了 首先,配置springboot框架整体的线程池: package pers.zc.a