gRPC快速入门

一时失言乱红尘 2022-02-03 17:55 495阅读 0赞

gRPC 是 Google 开源的基于 Protobuf 和 Http2.0 协议的通信框架,底层由netty提供。之前也简单的介绍过 HTTP/2 重要特性,gRPC提供四种模式:unaryclient streamingserver streaming 以及 bidirectional streaming,对于底层 HTTP/2 来说,它们都是 stream,并且仍然是一套 request + response 模型。

采用gRPC作为底层通讯的开源产品也比较多,比如: Tensorflow、CoreOS、Tidb。

gRPC 特色之处在于它的客户端是多路复用的线程安全的,可以拿过来直接使用,而且多路复用的客户端在效率上有明显优势。

插播句

gRPC 出来的时间要比 Thrift (Facebook )要晚,Google 自然是知道 gRPC 要比 Thrift 相比有明显的优势所以才敢放出来开源的,在开源影响力上Google 一直很强势

Protobuf

gRPC 的 service 接口是基于 protobuf 定义的,我们可以非常方便的将 service 与 HTTP/2 关联起来。

思考:我们知道Protobuf 协议仅仅定义了单个消息的序列化格式,HTTP/2模式下该如何protobuf集成呢?

难道我们需要自定义头部的长度和消息的类名称来区分消息边界?

Protobuf 将消息序列化后,内容放在 HTTP2.0 的 Data 帧的 Payload 字段里,消息的长度在 Data 帧的 Length 字段里,消息的类名称放在 Headers 帧的 Path 头信息中。我们需要通过头部字段的 Path 来确定具体消息的解码器来反序列化 Data 帧的 Payload 内容,而这些工作 gRPC 都已经封装好了。

  • Path : /Service-Name/{method name}
  • Service-Name : ?( {proto package name} “.” ) {service name}
  • Message-Type : {fully qualified proto message name}
  • Content-Type : “application/grpc+proto”

其实request-response的奥秘都在这张图中,下面将展开讲

20190509215910680.png

Request

request 通常包含:

  • Request-Headers,直接使用的 HTTP/2 headers,在 Headers Frame 里面派发,定义的 header 主要有 Call-Definition 以及 Custom-Metadata
  • Length-Prefixed-Message,主要在 Data Frame 里面派发,它有一个 Compressed flag ,然后后面跟着四字节的 message length 以及实际的 message
  • EOS(end-of-stream),在最后的 DATA frame 里面带上了 END_STREAM 这个 flag。用来表示 stream 不会在发送任何数据,可以关闭了

Call-Definition 里面包括 Method(其实就是用的 HTTP/2 的 POST),Content-Type 等。

Custom-Metadata 则是应用层自定义的任意 key-value,key 不建议使用 grpc- 开头,因为这是为 gRPC 后续自己保留的。

Compressed flag 用来表示改 message 是否压缩,如果为 1,表示该 message 采用了压缩,而压缩算啊定义在 header 里面的 Message-Encoding 里面。

思考:Request-Header何时算响应结束呢?

上图也有答案,Headers Frame中带上了END_HEADERS 表示已经是头信息的最后一个帧

Response

主要包含:

  • Response-Headers,主要包括 HTTP-Status,Content-Type 以及 Custom-Metadata 等
  • Length-Prefixed-Message,
  • Trailers,包括了 Status 以及 0 或者多个 Custom-Metadata
  • Trailers-Only,如果遇到了错误,也可以直接返回。也有 HTTP-Status ,Content-Type 和 Trailers

HTTP-Status 就是通常的 HTTP 200,301,400 这些不再解释

Status 也就是 gRPC 的 status

Status-Message 则是 gRPC 的 message,采用了 Percent-Encoded 的编码方式,具体参考这里

思考:response何时算响应结束呢?

从上图就可以找到答案,最后收到的 Headers frame 里面,带上了 Trailers,并且DATA frame有 END_STREAM 这个 flag,那么就意味着 response结束!

接下来将带你从实操中理解gRPC的一些抽象的概念,可下载完整的源码。

os-maven-plugin

它主要是用于根据不同操作系统提供不同的变量(protoc版本信息)

[INFO] os.detected.name: windows
[INFO] os.detected.arch: x86_64
[INFO] os.detected.version: 6.1
[INFO] os.detected.version.major: 6
[INFO] os.detected.version.minor: 1
[INFO] os.detected.classifier: windows-x86_64

maven-dependency-plugin

自动下载protoc(可执行程序)到本地目录

protobuf-maven-plugin

针对*.proto文件生成java文件(主要是contract,dto)

服务提供方

gRPC 的一个特色之处在于提供了 Streaming 模式,有了 Streaming 模式,客户端可以将一连串的请求连续发送到服务器,服务器也可以将一连串连续的响应回复给客户端。Streaming 就是双工对话,它允许一个人同时说又同时听,如果你对rxjava有一定的了解的话,我擦,是不是有种熟悉的感觉!

2019051410360431.png

  1. public class OrderServiceProvider extends OrderServiceGrpc.OrderServiceImplBase{
  2. @Override
  3. public void getOrder(OrderId request, StreamObserver<OrderDTO> responseObserver) {
  4. responseObserver.onNext(OrderDTO.newBuilder().setName("test").build());
  5. responseObserver.onCompleted();
  6. }
  7. @Override
  8. public void addOrder(OrderDTO request, StreamObserver<OrderId> responseObserver) {
  9. responseObserver.onNext(OrderId.newBuilder().setId(9527).build());
  10. responseObserver.onCompleted();
  11. }
  12. @Override
  13. public void queryOrder(OrderDTO request, StreamObserver<OrderId> responseObserver) {
  14. responseObserver.onNext(OrderId.newBuilder().setId(9527).build());
  15. responseObserver.onNext(OrderId.newBuilder().setId(9528).build());
  16. responseObserver.onCompleted();
  17. }
  18. }

客户消费端

gRPC提供了丰富的Contract(客户端Sevice协议)的实现方式,google的解决方案显然很棒,不像dubbo比较凑合。

回顾下dubbo

在2.7版本之前:

  1. 将同步接口声明成 async=true,比如<dubbo:reference id="asyncService" interface="xxxx" async="true"/>
  2. 通过上下文类获取 future,比如:Future fooFuture = RpcContext.getContext().getFuture();

问题:不太符合异步编程的习惯,如果同时进行多个异步调用,使用不当很容易造成上下文污染,jdk原生Future 并不支持 callback 的调用方式。

在2.7.1(孵化中的正式版本)中改造:显式声明异步接口使用了JDK1.8 的CompletableFuture

  1. //声明
  2. public interface AsyncService {
  3. String sayHello(String name);
  4. default CompletableFuture<String> sayHiAsync(String name) {
  5. return CompletableFuture.completedFuture(sayHello(name));
  6. }
  7. }
  8. //调用
  9. CompletableFuture<String> future = asyncService.sayHiAsync("Han MeiMei");
  10. future.whenComplete((retValue, exception) -> {
  11. if (exception == null) {
  12. System.out.println(retValue);
  13. } else {
  14. exception.printStackTrace();
  15. }
  16. });

而这个异步更多是客户端异步,虽然2.7 新增了服务端异步的支持,然而个人认为服务端异步的特性较为鸡肋

but,dubbo在3.0.0-SNAPSHOT中已经有了彻底的改变,支持响应式之RSocket

提供了3种Contract:

  • BlockingStub,阻塞式
  • FutureStub,guava的ListenableFuture
  • Stub,支持StreamObserver

    public class OrderClient {

    1. private String host = "localhost";
    2. private int serverPort = 9527;
    3. private ManagedChannel managedChannel;
    4. private OrderServiceGrpc.OrderServiceBlockingStub orderConsumer;
    5. private OrderServiceGrpc.OrderServiceFutureStub orderAsyncConsumer;
    6. private OrderServiceGrpc.OrderServiceStub orderStreamConsumer;
    7. private OrderId orderDTO = OrderId.newBuilder().setId(123).build();
    8. @Before
    9. public void init() {
    10. managedChannel = ManagedChannelBuilder.forAddress(host, serverPort)
    11. //Channels are secure by default (via SSL/TLS)
    12. .usePlaintext().
    13. build();
    14. orderConsumer = OrderServiceGrpc.newBlockingStub(managedChannel);
    15. orderAsyncConsumer = OrderServiceGrpc.newFutureStub(managedChannel);
    16. orderStreamConsumer = OrderServiceGrpc.newStub(managedChannel);
    17. }
    18. @Test
    19. public void getOrder() {
    20. OrderDTO result = orderConsumer.getOrder(orderDTO);
    21. System.out.println(result);
    22. }
    23. @Test
    24. public void asyncGetOrder() {
    25. ListenableFuture<OrderDTO> result = orderAsyncConsumer.getOrder(orderDTO);
    26. System.out.println(result);
    27. }
    28. @Test
    29. public void streamGetOrder() throws InterruptedException {
    30. CountDownLatch latch = new CountDownLatch(1);
    31. StreamObserver<OrderDTO> responseObserver = new StreamObserver<OrderDTO>() {
    32. @Override
    33. public void onNext(OrderDTO value) {
    34. System.out.println("get result :" + value);
    35. }
    36. @Override
    37. public void onError(Throwable t) {
    38. Status status = Status.fromThrowable(t);
    39. System.out.println("failed with status : " + status);
    40. latch.countDown();
    41. }
    42. @Override
    43. public void onCompleted() {
    44. System.out.println("finished!");
    45. latch.countDown();
    46. }
    47. };
    48. orderStreamConsumer.getOrder(orderDTO, responseObserver);
    49. latch.await();
    50. }

    }

总结,gRPC 的基石就是 HTTP/2,然后在上面使用 protobuf 协议定义好 service RPC,如果对这些不了解,将无法很好的掌握。HTTP_2

引用资料

  • gRPC 官方英文文档
  • gRPC Errors
  • gGRPC 设计与实现

发表评论

表情:
评论列表 (有 0 条评论,495人围观)

还没有评论,来说两句吧...

相关阅读

    相关 gRPC快速整合SpringCloud

    gRPC是由 google开发的一个高性能、通用的开源RPC框架,主要面向移动应用开发且基于HTTP/2协议标准而设计,同时支持大多数流行的编程语言。它是一种与语言、平台无关、

    相关 gRPC快速入门

    gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grp

    相关 GRPC基础入门

      项目中要使用rpc协议框架来实现两个系统之间的接口调用。A系统调用B系统的相应接口,因为考虑到http请求会包含更多冗余信息,造成请求过大,因此选用了rpc众多框架中的gr