grpc-java springboot 同步异步调用 demo
近期学习 grpc-java,从 demo 入手,进行学习
grpc 的基础知识暂不整理,后续有时间再整理一波,这一篇主要是记录一个简单的 grpc 使用 springboot 创建的 demo。
我整个 demo 分成三个 Project :
- grpc-springboot-demo-api:用于存放 proto 文件,生成Java 代码
- grpc-springboot-demo-server:服务端代码
- grpc-springboot-demo-consumer:客户端代码
demo-api(proto)
创建一个最简单的 proto 文件,请求跟相应都只包含一个 String 的对象。
syntax = "proto3";
option java_package = "grpc.springboot.demo.api";
option java_outer_classname = "HelloWorldService";
package helloworld;
// 定义服务
service HelloWorld {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// 定义请求体
message HelloRequest {
string message = 1;
}
// 定义相应内容
message HelloResponse {
string message = 1;
}
文件创建好之后,需要进行编译,pom文件中需要引入相关插件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.parallelline1996</groupId>
<artifactId>grpc-springboot-demo-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.12.0</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.0.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.0.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
完成后,直接通过 maven 工具的 install 指令,推到本地仓库即可,构建时,便会自动生成我们需要的类。
demo-server
服务端项目的目录结构:
- Starter:springboot 的启动类
- GrpcServerConfiguration:grpc 服务端的启动类
- HelloServerImpl:提供服务的实现类
首先 pom 文件,主要是导入 springboot 的启动的依赖,以及 grpc 的依赖,以及我们上面的 api 项目。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.parallelline1996</groupId>
<artifactId>grpc-springboot-demo-server</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.parallelline1996</groupId>
<artifactId>grpc-springboot-demo-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!-- spring boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.28.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.5.21.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Starter 启动类:
@SpringBootApplication
public class Starter implements CommandLineRunner {
private GrpcServerConfiguration configuration;
public Starter(GrpcServerConfiguration configuration) {
this.configuration = configuration;
}
// 启动 grpc 服务端
@Override
public void run(String... args) throws Exception {
configuration.start();
}
// 启动 springboot 容器
public static void main(String[] args) {
SpringApplication.run(Starter.class, args);
}
}
GrpcServerConfiguration grpc 服务端启动类:
这里注意:如果不在创建 Server 指定线程池,则会使用 grpc-java 中提供的默认线程池响应从客户端发送的请求(后续关于 Server 的创建,会再写博客说明)。也可以直接指定线程池,只需调用 ServerBuilder.executor(executor)
即可。
@Component
public class GrpcServerConfiguration {
private static final Logger logger = LoggerFactory.getLogger(GrpcServerConfiguration.class);
// Grpc 服务端提供的服务
@Autowired
private HelloServiceImpl service;
// Grpc 服务端的地址
@Value("${grpc.server.port}")
private int port;
private Server server;
private ThreadPoolExecutor executor;
@PostConstruct
public void init() {
executor = new ThreadPoolExecutor(10, 10, 60,
TimeUnit.MILLISECONDS, new SynchronousQueue<>());
}
public void start() throws IOException {
// 构建服务端
logger.info("Starting gRPC on port {}.", port);
// 使用默认线程池
server = ServerBuilder.forPort(port).addService(service).build().start();
// 使用自定义线程池用于处理请求
// server = ServerBuilder.forPort(port).addService(service).executor(executor).build().start();
logger.info("gRPC server started, listening on {}.", port);
// 添加服务端关闭的逻辑
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
logger.info("Shutting down gRPC server.");
GrpcServerConfiguration.this.stop();
logger.info("gRPC server shut down successfully.");
}
}));
}
private void stop() {
if (server != null) {
// 关闭服务端
server.shutdown();
}
}
HelloServiceImpl:业务逻辑的实现类,继承 demo-api 中申明的 HelloWorldGrpc.HelloWorldImplBase
,重写 sayHello
方法即可。
@Component
public class HelloServiceImpl extends HelloWorldGrpc.HelloWorldImplBase {
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
@Override
public void sayHello(HelloWorldService.HelloRequest request,
StreamObserver<HelloWorldService.HelloResponse> responseObserver) {
String message = request.getMessage();
logger.info("Client Received message:{}", message);
// 创建结果,返回响应内容
String rsp = String.format("Hello, %s. This message have send to server", message);
HelloWorldService.HelloResponse response = HelloWorldService.HelloResponse
.newBuilder()
.setMessage(rsp)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
demo-consumer
客服端项目的目录结构:
- Starter:springboot 的启动类
- GrpcClient:客户端的启动类
- HelloController:对外提供服务
pom 文件,主要是导入 springboot 的启动的依赖,以及 grpc 的依赖,同样也需要引入 api 项目的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.parallelline1996</groupId>
<artifactId>grpc-springboot-demo-consumer</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.parallelline1996</groupId>
<artifactId>grpc-springboot-demo-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!-- spring boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>1.28.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.0-jre</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.5.21.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Starter,启动类,与服务端服务启动类无差异。
@SpringBootApplication
public class Starter implements CommandLineRunner {
private GrpcClient configuration;
public Starter(GrpcClient configuration) {
this.configuration = configuration;
}
// 启动 grpc 客户端
@Override
public void run(String... args) {
configuration.start();
}
// 启动 springboot 容器
public static void main(String[] args) {
SpringApplication.run(Starter.class, args);
}
}
GrpcClient,gRPC 客户端,主要作用是监听 gRPC 服务端,开启通道。
@Component
public class GrpcClient {
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
// Grpc 服务端的地址
@Value("${grpc.server.host}")
private String host;
// Grpc 服务端暴露的接口
@Value("${grpc.server.port}")
private int port;
private HelloWorldGrpc.HelloWorldBlockingStub stub;
private HelloWorldGrpc.HelloWorldFutureStub futureStub;
public void start() {
// 开启channel
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
// 通过 channel 获取服务端 blocking-stub
stub = HelloWorldGrpc.newBlockingStub(channel);
// 通过 channel 获取服务端 future-stub
futureStub = HelloWorldGrpc.newFutureStub(channel);
logger.info("gRPC client start, server address: host:{}, port:{}", host, port);
}
public HelloWorldGrpc.HelloWorldBlockingStub getStub() {
return this.stub;
}
public HelloWorldGrpc.HelloWorldFutureStub getFutureStub() {
return this.futureStub;
}
}
gRPC 的客户端支持三种调用方式:
- 同步调用
- 基于 Future 的异步调用
- 基于回调的异步调用
下面的代码分别对应三种调用方式:
@RestController
public class HelloController {
private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
@Autowired
GrpcClient configuration;
/** * 同步调用,阻塞 */
@GetMapping("/hello")
public String hello(@RequestParam(name = "name", defaultValue = "test", required = false) String name) {
logger.info("Server received request.name={}", name);
// 构建一个请求
HelloWorldService.HelloRequest request = HelloWorldService.HelloRequest
.newBuilder()
.setMessage(name)
.build();
// 使用stub发送请求至服务端
try {
// 这里获取了一个 blocking-stub
HelloWorldGrpc.HelloWorldBlockingStub stub = configuration.getStub();
// 同步发送,将阻塞等待请求结果
HelloWorldService.HelloResponse response = stub.sayHello(request);
logger.info("Server response received: [{}]", response.toString());
return response.getMessage();
} catch (StatusRuntimeException e) {
logger.info(" Error Received - Error code : {}", e.getStatus().getCode());
}
return "error";
}
/** * 基于 Future 的异步调用 */
@GetMapping("/hello-async-future")
public String helloAsyncFuture(@RequestParam(name = "name", defaultValue = "test", required = false) String name) {
logger.info("Server received request.name={}", name);
// 构建一个请求
HelloWorldService.HelloRequest request = HelloWorldService.HelloRequest
.newBuilder()
.setMessage(name)
.build();
// 使用stub发送请求至服务端
try {
// 获取 future-stub 对象,调用请求后,将获取 future
ListenableFuture<HelloWorldService.HelloResponse> listenableFuture = configuration.getFutureStub().sayHello(request);
// 通过 future 对象的 get 请求获取结果
// 由于在同一线程中调用,并没有实现真正的异步,仍为同步阻塞
HelloWorldService.HelloResponse response = listenableFuture.get();
logger.info("Server response received: [{}]", response.toString());
return response.getMessage();
} catch (StatusRuntimeException e) {
logger.info(" Error Received - Error code : {}", e.getStatus().getCode());
} catch (Exception e) {
logger.error("error ", e);
}
return "error";
}
/** * 基于回调的异步调用 */
@GetMapping("/hello-async-callback")
public String helloAsyncCallback(@RequestParam(name = "name", defaultValue = "test", required = false) String name) {
logger.info("Server received request.name={}", name);
// 构建一个请求
HelloWorldService.HelloRequest request = HelloWorldService.HelloRequest
.newBuilder()
.setMessage(name)
.build();
// 使用stub发送请求至服务端
try {
HelloWorldGrpc.HelloWorldFutureStub futureStub = configuration.getFutureStub();
ListenableFuture<HelloWorldService.HelloResponse> listenableFuture = futureStub.sayHello(request);
// 通过设备监听器,同时指定回调线程池的方式进行
listenableFuture.addListener(() -> {
try {
HelloWorldService.HelloResponse response = listenableFuture.get();
logger.info("Server response received: [{}]", response.toString());
} catch (Exception e) {
e.printStackTrace();
}
}, new ThreadPoolExecutor(1, 1, 60,
TimeUnit.MILLISECONDS, new SynchronousQueue<>()));
return "async";
} catch (StatusRuntimeException e) {
logger.info(" Error Received - Error code : {}", e.getStatus().getCode());
} catch (Exception e) {
logger.error("error ", e);
}
return "error";
}
}
完整的代码连接:https://github.com/Parallelline1996/Daily-Learning/tree/master/gRPC/code-repository/grpc-springboot-demo
还没有评论,来说两句吧...