背景

Protobuf 和 Thrift 一样都是高性能RPC的实现方案,非常容易上手。在使用的时候,如果没用性能要求基本上都可以直接参考官网实现,但是如果在高并发场景下,有性能要求,就需要进行一些优化处理才能达到最高的性能。

Protobuf服务定义、代码生成

服务定义


protobuf

代码解读

复制代码

// 语法 // 导包 // package // option // message、service // protoc.exe .\hello.proto --java_out=../src/main/java syntax = "proto3"; package hello; option java_package = "cn.mj.protobuf.test"; option java_multiple_files = true; message HelloRequest{ string name = 1; string message = 2; } message HelloResponse { string name = 1; string message = 2; int64 time = 3; } service HelloService{ rpc hello(HelloRequest) returns(HelloResponse){} }

代码生成

pom.xml 必要依赖、插件

xml

代码解读

复制代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cn.mj.protobuf.test</groupId> <artifactId>protobuf-test</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>protobuf-common</artifactId> <packaging>jar</packaging> <name>protobuf-common</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <protoc.version>4.28.3</protoc.version> <grpc.version>1.68.1</grpc.version> <protobuf.version>4.28.3</protobuf.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-bom</artifactId> <version>${grpc.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-services</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> <version>${protobuf.version}</version> </dependency> <dependency> <!-- Use newer version than in protobuf-java-util --> <groupId>com.google.j2objc</groupId> <artifactId>j2objc-annotations</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> <version>6.0.53</version> <scope>provided</scope> <!-- not needed at runtime --> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.7.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

生成代码

shell

代码解读

复制代码

mvn compile -e

生成代码后直接在target,可以直接引入使用。

服务端

Protobuf服务提供实现


java

代码解读

复制代码

public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase { @Override public void hello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { responseObserver .onNext( HelloResponse .newBuilder() .setName("hello") .setMessage("world") .setTime( System.currentTimeMillis() ) .build() ); responseObserver.onCompleted(); } }

Protobuf 服务端实现


java

代码解读

复制代码

@Slf4j @Component public class HelloServer { private Server server; @PostConstruct public void start() throws IOException { int port = 5001; server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) .addService(new HelloServiceImpl()) .build() .start(); log.info("Server started, listening on {}",port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. log.info("*** shutting down gRPC server since JVM is shutting down"); try { HelloServer.this.stop(); } catch (InterruptedException e) { log.error("shutdown failed",e); } log.info("*** server shut down"); } }); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } }

客户端

Controller


java

代码解读

复制代码

@RestController public class HelloController { @Resource private HelloService helloService; @GetMapping("/hello") public HelloResponseDTO hello() throws Exception { return helloService.hello(); } }

gRpc实现

每次创建连接

java

代码解读

复制代码

@Slf4j @Service public class HelloService { public HelloResponseDTO hello() throws InterruptedException { ManagedChannel channel = Grpc.newChannelBuilder("127.0.0.1:5001", InsecureChannelCredentials.create()).build(); try{ HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); HelloResponse helloResponse = blockingStub.hello(helloRequest()); return HelloResponseDTO .builder() .name(helloResponse.getName()) .time(helloResponse.getTime()) .message(helloResponse.getMessage()) .build(); }finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } private HelloRequest helloRequest() { return HelloRequest.newBuilder() .setName("Hello") .build(); } }

结论:

jmeter压测QPS最多2000左右,一直持续下去会报错java.net.BindException: Address already in use: connect

一直复用同一个连接

java

代码解读

复制代码

@Slf4j @Service public class HelloService { ManagedChannel channel = Grpc.newChannelBuilder("127.0.0.1:5001", InsecureChannelCredentials.create()).build(); public HelloResponseDTO hello() throws InterruptedException { try{ HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); HelloResponse helloResponse = blockingStub.hello(helloRequest()); return HelloResponseDTO .builder() .name(helloResponse.getName()) .time(helloResponse.getTime()) .message(helloResponse.getMessage()) .build(); }finally { // channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } private HelloRequest helloRequest() { return HelloRequest.newBuilder() .setName("Hello") .build(); } }

结论:

  1. jmeter压测QPS稳定到9000+
  2. 没用异常
  3. 内存、CPU50+
正常复用连接
StubRepository

通过ThreadLocal实现复用连接


java

代码解读

复制代码

@Repository public class StubRepository { ThreadLocal<ManagedChannel> channelHolder = new ThreadLocal<>(); ThreadLocal<HelloServiceGrpc.HelloServiceBlockingStub> stubHolder = new ThreadLocal<>(); ThreadLocal<Long> creatTimeHolder = new ThreadLocal<>(); private ManagedChannel channel() { ManagedChannel managedChannel = channelHolder.get(); if (managedChannel != null){ managedChannel.shutdownNow(); } managedChannel = Grpc.newChannelBuilder("127.0.0.1:5001", InsecureChannelCredentials.create()).build(); channelHolder.set(managedChannel); return managedChannel; } private HelloServiceGrpc.HelloServiceBlockingStub stub() { HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel()); stubHolder.set(stub); return stub; } public HelloServiceGrpc.HelloServiceBlockingStub getStub() { Long createTime = creatTimeHolder.get(); Long now = System.currentTimeMillis(); if (createTime == null || now - createTime > 5000){ creatTimeHolder.set(now); return stub(); } return stubHolder.get(); } public HelloServiceGrpc.HelloServiceBlockingStub rebuildStub() { creatTimeHolder.set(System.currentTimeMillis()); return stub(); } }

HelloService

java

代码解读

复制代码

@Slf4j @Service public class HelloService { final StubRepository stubRepository; public HelloService(StubRepository stubRepository) { this.stubRepository = stubRepository; } public HelloResponseDTO hello() throws InterruptedException { try{ HelloServiceGrpc.HelloServiceBlockingStub stub = stubRepository.getStub(); HelloResponse helloResponse = stub.hello(helloRequest()); return HelloResponseDTO .builder() .name(helloResponse.getName()) .time(helloResponse.getTime()) .message(helloResponse.getMessage()) .build(); }finally { // channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } private HelloRequest helloRequest() { return HelloRequest.newBuilder() .setName("Hello") .build(); } }

结论:

  1. jmeter压测QPS最终稳定7000+
  2. 没有异常
  3. 内存正常,CPU 50%

总结

  1. 基本使用:每次创建一个channel吞吐量2000左右顶破天
  2. 一直使用同一个channel:吞吐量最高9000多,但是当服务端出问题、重启什么的后,就会有问题。
  3. 通过ThreadLocal进行复用:吞吐量7000+。很稳定。这个方案解决了如下问题:
    1. 长期不活跃,服务端主动断开:可以catch住异常,掉通StubRepositoty的rebuildStub重新连接即可
    2. 服务端不可用:catch住异常,重新连接即可
    3. 复用连接
    4. 通过ThreadLocal复用,避免线程争强,减少竞争
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐