gRPC

GRPC


1.1 gRPC简介


gRPC是由Google开源的一个高性能的RPC框架。由Stubby Google内部RPC演化而来,2015年正式开源。支持异构系统的RPC

云原生时代是一个RPC标准: 
容器:
* 容器化 Docker go语言开发
* 服务编排 k8s go语言开发
* gRPC go语言开发
 
 gRPC的设计思路
 		1、网络通信 ---> gRPC自己封装网络通信的部分 提供多语言的 网络通信的封装(C Java[Netty] GO)
 		2、协议 ---> HTTP2 传输数据的时候 二进制数据内容 支持双向流(双工) 连接的多路复用
 		3、序列化 ---> 基于文本 JSON 基于二进制 Java原生序列化方式 Thrift二进制的序列化 压缩二进制序列化
 									protobuf (Protocol Buffers) Google开源的一种序列化方式 时间效率和空间效率是JSON的3-5倍
 									IDL语言
 		4、代理的创建 ---> 让调用者像调用本地方法一样 去调用远端的服务方法
 											stub
 gRPC 与 ThriftRPC 区别
 		共性:支持异构语言的RPC
 		区别:
 				1、网络通信 Thrift TCP(传输层协议)专属协议  传输速度更快
 				 				 	gRPC	HTTP2(应用层协议)        更方便整合
 			  2、性能角度 Thrift RPC 性能 高于 gRPC
 			  3、gRPC 大厂背书 (Google) 云原生时代与其他组件合作的顺利。所以gRPC应用更加广泛
 gRPC的好处
 		1、高效的进行进程间通信
 		2、支持多种语言 原生支持C Go Java实现 C语言版本上扩展 C++ C# NodeJS Python Ruby PHP...
 		3、支持多平台运行 Linux Android IOS MacOS Windows
 		4、gRPC序列化方式采用protobuf 效率更高
 		5、使用HTTP2协议
 		6、大厂的背书

2.1 HTTP2.0协议


HTTP1.x协议

一、HTTP1.0协议

  • 请求 响应的模式
  • 短连接协议(无状态协议)
  • HTTP基于TCP协议,TCP协议是长连接协议,为什么HTTP是短连接协议?HTTP自己主动切断的,互联网发展前期,服务器中通信长时间处于连接状态会导致资源不足
  • 传输数据 文本结构 单工(只能客户端找服务端,服务器不能主动向客户端推送数据) 无法实现服务端推送 变相实现推送(客户端轮询的方式)

二、HTTP1.1协议

  • 有限的长连接 (保持一段时间 keepalived决定)
  • 升级的方式 WebSocket
  • 双工 实现服务器向客户端推送

三、共性

  • 传输数据文本格式 可读性好但是效率差
  • 本质上HTTP1.x协议无法实现双工通信,是因为升级了WebSocket才实现的双工通信
  • 资源的请求 需要发送多次请求 建立多个连接才可以完成

HTTP2.0协议

  • HTTP2.0协议是一个二进制协议,效率高于HTTP1.x协议,可读性差
  • 可以实现双工通信
  • 一个请求 一个连接 可以请求多个数据 【多路复用】

HTTP2.0协议的三个概念

  • 数据流 stream
  • 消息 message
  • 帧 frame
20240802160614.png

概念

  • 数据流的优先级:可以通过为不同的stream设置权重,来限制不同流的传输顺序
  • 流控:client发送的数据太快了,server处理不过来,通知client暂停数据的发送

2.2 Protocol Buffers【protobuf】


protobuf 是一种与编程语言无关【IDL】 与具体平台无关【OS】它定义的中间语言,可以方便的在client与server中进行RPC的数据传输

protobuf 两种版本 proto2 proto3,但是目前主流应用的都是proto3

protobuf 的使用需要安装的protobuf的编译器,编译器的目的是把protobuf的IDL语言,转换成具体某一种开发语言

protobuf编译器的安装

brew install prorobuf

截屏2024-08-05 09.57.54 20240805095754.png

protobuf语法

  • 文件格式

    .proto
  • 版本设定

    syntax = "proto3";
  • 注释

    1、单行注释     //
    2、多行注释     /* */
  • 与Java语言相关的语法

    // 后续protobuf生成的java代码 一个源文件还是多个源文件  xx.java
    option java_multiple_files = false;
    
    // 指定protobuf生成的类 放置在哪个包中
    option java_package = "com.suns";
    
    // 指定protobuf生成的外部类的名字(管理内部类【内部类才是真正开发使用的类】)
    option java_outer_classname = "UserService";
  • 逻辑包【了解】

    // protobuf对于文件内容的管理
    package xxx;
  • 导入

    UserService.proto
    
    OrderService.proto
    	import "xxx/UserService.proto";
  • 基本类型

    20240805163224.png
  • 枚举类型

    enum SEASON {
    	SPRING = 0;
    	SUMMER = 1;
    }
    // 枚举的值 必须是0开始
  • 消息 Message

    message LoginRequest {
    	string username = 1; // 1这个值 是字段在message中的编号
    	string password = 2;
    	int32 age = 3;
    }
    
    // 编号 从1开始 到 2^29-1 注意:19000 - 19999的编号不能用,因为它是protobuf自己保留的。 
    - singular : 这个字段的值 只能是0个或者1个(默认关键字) null "123456"
    - repeated
    
    message Result {
    	string content = 1;
    	repeated string status = 2; // 这个字段的返回值 是多个 等价于Java List Protobuf getStatusList() --> List
    }
    
    protobuf【grpc】
    // 可以定义多个消息
    
    message LoginRequest {
    	...
    }
    
    message LoginResponse {
    	...
    }
    
    消息可以嵌套
    message SearchResponse {
    	message Result {
    		string url = 1;
    		string title = 2;
    	}
    	
    	string xxx = 1;
    	int32 yyy = 2;
    	Result ppp = 3;
    }
    
    SearchResponse.Result
    
    message AAA {
    	string xxx = 1;
    	SearchResponse.Result yyy = 2;
    }
    
    oneof【其中一个】
    message SimpleMessage {
    	oneof test_oneof {
    		string name = 1;
    		int32 age = 2;
    	}
    }
    // test_oneof值只能为name和age中的一个
  • 服务

    service HelloService {
    	rpc hello(HelloRequest) returns(HelloResponse){}
    }
    
    // 里面是可以定义多个服务方法
    // 定义多个服务接口
    // gRPC 服务 4个服务方式

2.3 gRPC开发


  • 项目结构

    一、xxx-api模块
    	定义 protobuf IDL语言
    	并且通过命令创建具体的代码,后续client server引入使用
    	1、message
    	2、service
    二、xxx-server模块
    	1、实现api模块中定义的服务接口
    	2、发布gRPC服务(创建服务端程序)
    三、xxx-client模块
    	1、创建服务端stub(代理)
    	2、基于代理(stub)RPC调用
  • api模块

    一、.proto文件 书写protobuf的IDL
    二、[了解]protoc命令 把proto文件中的IDL 转换成编程语言
    	protoc --java_out=/xxx/xxx   /xxx/xxx/xx.proto
    三、[实战] maven插件 进行protobuf IDL 文件的编译,并把他放置在IDEA具体位置 
    	grpc-java

    导入依赖和插件

    <dependencies>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-netty-shaded</artifactId>
                <version>1.30.2</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-protobuf</artifactId>
                <version>1.26.0</version>
            </dependency>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-stub</artifactId>
                <version>1.26.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.tomcat</groupId>
                <artifactId>annotations-api</artifactId>
                <version>6.0.53</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
        <build>
            <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.7.1</version>
                </extension>
            </extensions>
            <plugins>
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.6.1</version>
                    <configuration>
                        <protoSourceRoot>src/main/proto</protoSourceRoot>
                        <!-- protoc命令 —— message信息 -->
                      	<!-- ${} maven内置变量 获取当前操作系统类型 坑:macos M芯片运行时可能出问题-->
                        <protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact>
                        <pluginId>grpc-java</pluginId>
                      	<!-- 生成服务接口 service -->
                        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact>
                        <outputDirectory>${basedir}/src/main/java</outputDirectory>
                        <clearOutputDirectory>false</clearOutputDirectory>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <!-- 编译消息对象 -->
                                <goal>compile</goal>
                                <!-- 依赖消息对象,生成接口服务 -->
                                <goal>compile-custom</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    proto文件

    syntax = "proto3";
    
    option java_multiple_files = false;
    option java_package = "com.grpc";
    option java_outer_classname = "HelloProto";
    
    /**
     * IDL文件 目的 发布RPC服务,service ---> message    message <---
     */
    
    message HelloRequest {
        string name = 1;
    }
    
    message HelloResponse {
        string result = 1;
    }
    
    service HelloService {
        rpc Hello(HelloRequest) returns (HelloResponse) {}
    }

    点击protobuf:compile 生成message相关信息

    20240808161140.png

    点击protobuf:compile-custom 生成接口相关信息

    20240808162857.png 20240806102313.png
  • xxx-server服务端模块

// 1、实现业务接口 添加具体的功能 (MyBatis + MySQL)
package com.grpc.service;

import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:19
 * @description:
 **/
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase{
    /*
       1、接受client提交的参数
       2、业务处理service + dao调用对应的业务功能
       3、提供返回值
     */

    @Override
    public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
        //1、接收client的请求参数
        String name = request.getName();
        //2、业务处理
        System.out.println("name Parameter:" + name);
        //3、封装响应
        //3.1、创建相应对象的构造者
        HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();
        //3.2、填充数据
        builder.setResult("hello method invoke ok");
        //3.3、封装相应
        HelloProto.HelloResponse helloResponse = builder.build();

        responseObserver.onNext(helloResponse);
        responseObserver.onCompleted();
    }
}

// 2、创建服务端 (Netty)
package com.grpc;

import com.grpc.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:50
 * @description:
 **/
public class GrpcServer1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1、绑定端口
        ServerBuilder serverBuilder = ServerBuilder.forPort(9000);
        //2、发布服务
        serverBuilder.addService(new HelloServiceImpl());
        //3、创建服务对象
        Server server = serverBuilder.build();

        server.start();
        server.awaitTermination();
    }
}
  • xxx-client模块
// 1、client通过代理对象完成远端对象的调用
package com.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient1 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub
            HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
            // 3、完成RPC调用
            // 3.1、准备参数
            HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
            builder.setName("shantq");
            HelloProto.HelloRequest helloRequest = builder.build();
            // 3.2、进行功能RPC调用,获取相应内容
            HelloProto.HelloResponse helloResponse = helloService.hello(helloRequest);
            String result = helloResponse.getResult();
            System.out.println("result:" + result);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

repeated

syntax = "proto3";

option java_multiple_files = false;
option java_package = "com.grpc";
option java_outer_classname = "HelloProto";

/**
 * IDL文件 目的 发布RPC服务,service ---> message    message <---
 */

message HelloRequest {
    string name = 1;
}

message HelloResponse {
    string result = 1;
}

message HelloRequest1 {
    repeated string name = 1;
}

message HelloResponse1 {
    string result = 1;
}

service HelloService {
    rpc Hello(HelloRequest) returns (HelloResponse) {}
    rpc Hello1(HelloRequest1) returns (HelloResponse1) {}
}
package com.grpc.service;

import com.google.protobuf.ProtocolStringList;
import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:19
 * @description:
 **/
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase{
    @Override
    public void hello1(HelloProto.HelloRequest1 request, StreamObserver<HelloProto.HelloResponse1> responseObserver) {
        ProtocolStringList nameList = request.getNameList();
        for (String s : nameList) {
            System.out.println("s = " + s);
        }

        System.out.println("HelloServiceImpl.hello1");

        HelloProto.HelloResponse1.Builder builder = HelloProto.HelloResponse1.newBuilder();
        builder.setResult("ok");

        HelloProto.HelloResponse1 helloResponse1 = builder.build();

        responseObserver.onNext(helloResponse1);
        responseObserver.onCompleted();
    }
package com.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient2 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub
            HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
            // 3、完成RPC调用
            // 3.1、准备参数
            HelloProto.HelloRequest1.Builder builder = HelloProto.HelloRequest1.newBuilder();
            builder.addName("shantq1");
            builder.addName("shantq2");
            builder.addName("shantq3");
            HelloProto.HelloRequest1 build = builder.build();
            // 3.2、进行功能RPC调用,获取相应内容
            HelloProto.HelloResponse1 helloResponse = helloService.hello1(build);
            String result = helloResponse.getResult();
            System.out.println("result:" + result);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}
  • tips

    responseObserver.onNext(helloResponse1); // 通过这个方法 把响应的消息 回传client
    responseObserver.onCompleted();          // 通知client 整个服务结束  底层返回标记
    																				// client就会监听标记 【grpc做的】

2.4 gRPC的四种通信方式


一、简单rpc 一元rpc(Unary RPC)

第一个RPC程序,实际上就是一元RPC

  • 特点
20240809113741.png

当client发起调用后,提交数据,并且等待 服务器响应

开发过程中,主要采用就是一元RPC的这种通信方式

  • 语法
service HelloService {
    rpc Hello(HelloRequest) returns (HelloResponse) {}
    rpc Hello1(HelloRequest1) returns (HelloResponse1) {}
}

二、服务端流式RPC(Server Streaming RPC)

一个请求对象,服务端可以回传多个结果对象

20240809115637.png

错误的认知:

  • 认为服务端返回的是一组数据 就应该封装在一个List中,如果这样进行,叫做返回一个结果

返回的多组数据不一定同时返回

使用场景:

  • 查询股票行情

语法:

service HelloService {
    rpc Hello(HelloRequest) returns (HelloResponse) {}
    rpc Hello1(HelloRequest1) returns (HelloResponse1) {}
    rpc C2ss(HelloRequest) returns (stream HelloResponse) {} //服务端流式rpc
}
package com.grpc.service;

import com.google.protobuf.ProtocolStringList;
import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:19
 * @description:
 **/
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase{

    @Override
    public void c2ss(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
        //1、接收client的请求参数
        String name = request.getName();
        //2、做业务处理
        System.out.println("name = " + name);
        //3、根据业务处理的结果,提供响应
        for (int i = 0; i < 10; i++) {
            HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();
            builder.setResult("处理的结果" + i);
            HelloProto.HelloResponse build = builder.build();

            responseObserver.onNext(build);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        responseObserver.onCompleted();
    }
}
package com.grpc;

import com.grpc.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:50
 * @description:
 **/
public class GrpcServer1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1、绑定端口
        ServerBuilder serverBuilder = ServerBuilder.forPort(9000);
        //2、发布服务
        serverBuilder.addService(new HelloServiceImpl());
        //3、创建服务对象
        Server server = serverBuilder.build();

        server.start();
        server.awaitTermination();
    }
}
package com.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.Iterator;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient3 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub  (阻塞的形式,等待服务端响应时不能异步处理任务,不符合真正的开发需求)
            HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);
            // 3、完成RPC调用
            // 3.1、准备参数
            HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
            builder.setName("shantq");
            HelloProto.HelloRequest helloRequest = builder.build();
            // 3.2、进行功能RPC调用,获取相应内容
            Iterator<HelloProto.HelloResponse> helloResponseIterator = helloService.c2ss(helloRequest);
            while (helloResponseIterator.hasNext()) System.out.println("result: " + helloResponseIterator.next().getResult());
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

/*
	监听 异步方式 处理服务端流式RPC的开发
	1、api
	2、server
	3、client
*/
package com.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient4 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub
            HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);
            // 3、完成RPC调用
            // 3.1、准备参数
            HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
            builder.setName("shantq");
            HelloProto.HelloRequest helloRequest = builder.build();
            // 3.2、进行功能RPC调用,获取相应内容
            helloService.c2ss(helloRequest, new StreamObserver<HelloProto.HelloResponse>() {
                @Override
                public void onNext(HelloProto.HelloResponse helloResponse) {
                    // 服务端 响应了 一个消息后 如果需要立即处理,把代码写在这个方法中
                    System.out.println("服务端每一次响应端信息" + helloResponse.getResult());
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("ERROR!!!");
                }

                @Override
                public void onCompleted() {
                    // 需要把服务端 响应端所有数据 拿到后 再进行业务处理
                    System.out.println("需要把服务端 响应端所有数据 拿到后 再进行业务处理");
                }
            });

            managedChannel.awaitTermination(11, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

三、客户端流式RPC(Client Streaming RPC)

客户端发送多个请求对象,服务端只返回一个结果

应用场景:IOT(物联网【传感器】向服务端 发送数据)

语法:

rpc Cs2s(stream HelloRequest) returns (HelloResponse) {}
package com.grpc.service;

import com.google.protobuf.ProtocolStringList;
import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:19
 * @description:
 **/
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase{

    @Override
    public StreamObserver<HelloProto.HelloRequest> cs2s(StreamObserver<HelloProto.HelloResponse> responseObserver) {
        return new StreamObserver<HelloProto.HelloRequest>() {
            @Override
            public void onNext(HelloProto.HelloRequest helloRequest) {
                System.out.println("接收到了client发送的消息" + helloRequest.getName());
                HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();
                builder.setResult("this is result");
                HelloProto.HelloResponse build = builder.build();
                responseObserver.onNext(build);
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                System.out.println("client的所有消息 都发送到了 服务端...");
                /*
                如果想让server都接收完client发送的数据之后再统一处理,可以将response结果在onCompleted里设置,否则可以在onNext中设置(接收一条,响应一条)
                HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();
                builder.setResult("this is result");
                HelloProto.HelloResponse build = builder.build();
                responseObserver.onNext(build);
                responseObserver.onCompleted();*/
            }
        };
    }
}

// server不用变 同上 通信用的netty  实现:绑定接口 发布服务 创建服务对象
package com.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient5 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub
            HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);
            // 3、完成RPC调用
            StreamObserver<HelloProto.HelloRequest> streamObserver = helloService.cs2s(new StreamObserver<HelloProto.HelloResponse>() {
                @Override
                public void onNext(HelloProto.HelloResponse helloResponse) {
                    // 监控响应
                    System.out.println("服务端 响应 数据内容为:" + helloResponse.getResult());
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onCompleted() {
                    System.out.println("服务端响应结束...");
                }
            });

            // 客户端发送数据到服务端 多条数据 不定时
            for (int i = 0; i < 10; i++) {
                HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();
                builder.setName("shantq " + i);
                HelloProto.HelloRequest build = builder.build();

                streamObserver.onNext(build);
                Thread.sleep(1000);
            }

            streamObserver.onCompleted();
            managedChannel.awaitTermination(11, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

四、双向流RPC(Bi-directional Stream RPC)

客户端可以发送多个请求消息,服务器响应多个响应消息

20240812115511.png

应用场景:聊天室

package com.grpc.service;

import com.google.protobuf.ProtocolStringList;
import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:19
 * @description:
 **/
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase{
    @Override
    public StreamObserver<HelloProto.HelloRequest> cs2ss(StreamObserver<HelloProto.HelloResponse> responseObserver) {
        return new StreamObserver<HelloProto.HelloRequest>() {
            @Override
            public void onNext(HelloProto.HelloRequest helloRequest) {
                System.out.println("接收到client 提交的消息 " + helloRequest.getName());
                responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("response " + helloRequest.getName() + "result ").build());

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                System.out.println("接收了所有的请求消息...");
                responseObserver.onCompleted();
            }
        };
    }
}
package com.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.TimeUnit;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient6 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub
            HelloServiceGrpc.HelloServiceStub helloService = HelloServiceGrpc.newStub(managedChannel);
            // 3、完成RPC调用
            StreamObserver<HelloProto.HelloRequest> streamObserver = helloService.cs2ss(new StreamObserver<HelloProto.HelloResponse>() {
                @Override
                public void onNext(HelloProto.HelloResponse helloResponse) {
                    System.out.println("响应的结果 " + helloResponse.getResult());
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onCompleted() {
                    System.out.println("响应全部结束...");
                }
            });

            for (int i = 0; i < 10; i++) {
                streamObserver.onNext(HelloProto.HelloRequest.newBuilder().setName("shantq " + i).build());
            }
            streamObserver.onCompleted();

            managedChannel.awaitTermination(11, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

2.5 gRPC代理方式


一、BlockingStub

  • 阻塞 通信方式

二、Stub

  • 异步 通过监听处理

三、FutureStub

  • 同步 异步 NettyFuture
  • FutureStub只能应用 一元RPC
syntax = "proto3";

option java_multiple_files = false;
option java_package = "com.grpc";
option java_outer_classname = "TestProto";

message TestRequest {
    string name = 1;
}

message TestResponse {
    string result = 1;
}

service TestService {
    rpc Test1(TestRequest) returns (TestResponse) {};
}
package com.grpc.service;

import com.grpc.TestProto;
import com.grpc.TestServiceGrpc;
import io.grpc.stub.StreamObserver;

/**
 * @author: tianqi shan
 * @create: 2024-08-12 15:15
 * @description:
 **/
public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
    @Override
    public void test1(TestProto.TestRequest request, StreamObserver<TestProto.TestResponse> responseObserver) {
        String name = request.getName();
        System.out.println("name = " + name);

        responseObserver.onNext(TestProto.TestResponse.newBuilder().setResult("test is ok").build());
        responseObserver.onCompleted();
    }
}
package com.grpc;

import com.grpc.service.HelloServiceImpl;
import com.grpc.service.TestServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 17:50
 * @description:
 **/
public class GrpcServer1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        //1、绑定端口
        ServerBuilder serverBuilder = ServerBuilder.forPort(9000);
        //2、发布服务
        serverBuilder.addService(new HelloServiceImpl());
        serverBuilder.addService(new TestServiceImpl());
        //3、创建服务对象
        Server server = serverBuilder.build();

        server.start();
        server.awaitTermination();
    }
}
package com.grpc;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author: tianqi shan
 * @create: 2024-08-08 19:20
 * @description:
 **/
public class GrpcClient7 {
    public static void main(String[] args) {
        // 1、创建通信的管道
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();
        try {
            // 2、获得代理对象 stub
            TestServiceGrpc.TestServiceFutureStub testServiceFutureStub = TestServiceGrpc.newFutureStub(managedChannel);
            // 3、完成RPC调用
            ListenableFuture<TestProto.TestResponse> future = testServiceFutureStub.test1(TestProto.TestRequest.newBuilder().setName("shantq").build());
            /*
            同步操作
            TestProto.TestResponse testResponse = future.get();
            System.out.println(testResponse.getResult());*/

            /*
            future.addListener(() -> {
                System.out.println("异步的rpc响应 回来了");
            }, Executors.newCachedThreadPool());*/

            Futures.addCallback(future, new FutureCallback<TestProto.TestResponse>() {
                @Override
                public void onSuccess(@NullableDecl TestProto.TestResponse testResponse) {
                    System.out.println("result:" + testResponse.getResult());
                }

                @Override
                public void onFailure(Throwable throwable) {

                }
            }, Executors.newCachedThreadPool());

            System.out.println("后续的操作...");
            managedChannel.awaitTermination(11, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            managedChannel.shutdown();
        }
    }
}

2.6 gRPC与SpringBoot整合


20240812155942.png 20240813095042.png

一、搭建SpringBoot开发环境

二、引入与Grpc相关的内容(服务端和客户端都要引)

<dependency>
    <groupId>org.example</groupId>
    <artifactId>rpc-grpc-api</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

<dependency>
    <groupId>net.devh</groupId>
    <artifactId>grpc-server-spring-boot-starter</artifactId>
    <version>2.12.0.RELEASE</version>
</dependency>

三、开发服务

// server
package com.example.service;

import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
    @Override
    public void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {
        String name = request.getName();
        System.out.println("name = " + name);
        responseObserver.onNext(HelloProto.HelloResponse.newBuilder().setResult("this is result...").build());
        responseObserver.onCompleted();
    }
}
# 核心配置的 就是gRPC服务的端口号
spring:
  application:
    name: boot-server
  main:
    web-application-type: none #不启动tomcat

grpc:
  server:
    port: 9000
// client
package com.grpc.controller;

import com.grpc.HelloProto;
import com.grpc.HelloServiceGrpc;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @GrpcClient("grpc-server")
    private HelloServiceGrpc.HelloServiceBlockingStub stub;

    @RequestMapping("/test1")
    public String test1(String name) {
        System.out.println("name = " + name);
        HelloProto.HelloResponse hello = stub.hello(HelloProto.HelloRequest.newBuilder().setName(name).build());
        return hello.getResult();
    }
}
spring:
  application:
    name: boot-client

grpc:
  client:
    grpc-server:
      address: 'static://127.0.0.1:9000'
      negotiation-type: plaintext

2.7 gRPC的高级应用


一、拦截器 一元拦截器

二、Stream Tracer [监听流] 流式拦截器

三、Retry Policy 客户端重试机制

四、NameResolver

  • consule
  • ectd

五、负载均衡(pick-first,轮询)

六、grpc与微服务整合

  • 序列化(protobuf)Dobbo
  • grpc Dobbo
  • grpc GateWay
  • grpc JWT
  • grpc Nacos2.0
  • grpc OpenFeign