Protobuf使用性能优化
protobuf代码解读复制代码// 语法 // 导包 // package // option // message、service // protoc.exe .\hello.proto --java_out=../src/main/java syntax = "proto3";基本使用:每次创建一个channel吞吐量2000左右顶破天一直使用同一个channel:吞吐量最高9000多,但是
背景
Protobuf 和 Thrift 一样都是高性能RPC的实现方案,非常容易上手。在使用的时候,如果没用性能要求基本上都可以直接参考官网实现,但是如果在高并发场景下,有性能要求,就需要进行一些优化处理才能达到最高的性能。
Protobuf服务定义、代码生成
服务定义
protobuf
代码解读
复制代码
// 语法 // 导包 // package // option // message、service // protoc.exe .\hello.proto --java_out=../src/main/java syntax = "proto3"; package hello; option java_package = "cn.mj.protobuf.test"; option java_multiple_files = true; message HelloRequest{ string name = 1; string message = 2; } message HelloResponse { string name = 1; string message = 2; int64 time = 3; } service HelloService{ rpc hello(HelloRequest) returns(HelloResponse){} }
代码生成
pom.xml 必要依赖、插件
xml
代码解读
复制代码
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cn.mj.protobuf.test</groupId> <artifactId>protobuf-test</artifactId> <version>1.0-SNAPSHOT</version> </parent> <artifactId>protobuf-common</artifactId> <packaging>jar</packaging> <name>protobuf-common</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <protoc.version>4.28.3</protoc.version> <grpc.version>1.68.1</grpc.version> <protobuf.version>4.28.3</protobuf.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-bom</artifactId> <version>${grpc.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-services</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> <version>${protobuf.version}</version> </dependency> <dependency> <!-- Use newer version than in protobuf-java-util --> <groupId>com.google.j2objc</groupId> <artifactId>j2objc-annotations</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> <version>6.0.53</version> <scope>provided</scope> <!-- not needed at runtime --> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.7.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
生成代码
shell
代码解读
复制代码
mvn compile -e
生成代码后直接在target,可以直接引入使用。
服务端
Protobuf服务提供实现
java
代码解读
复制代码
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase { @Override public void hello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) { responseObserver .onNext( HelloResponse .newBuilder() .setName("hello") .setMessage("world") .setTime( System.currentTimeMillis() ) .build() ); responseObserver.onCompleted(); } }
Protobuf 服务端实现
java
代码解读
复制代码
@Slf4j @Component public class HelloServer { private Server server; @PostConstruct public void start() throws IOException { int port = 5001; server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) .addService(new HelloServiceImpl()) .build() .start(); log.info("Server started, listening on {}",port); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. log.info("*** shutting down gRPC server since JVM is shutting down"); try { HelloServer.this.stop(); } catch (InterruptedException e) { log.error("shutdown failed",e); } log.info("*** server shut down"); } }); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } }
客户端
Controller
java
代码解读
复制代码
@RestController public class HelloController { @Resource private HelloService helloService; @GetMapping("/hello") public HelloResponseDTO hello() throws Exception { return helloService.hello(); } }
gRpc实现
每次创建连接
java
代码解读
复制代码
@Slf4j @Service public class HelloService { public HelloResponseDTO hello() throws InterruptedException { ManagedChannel channel = Grpc.newChannelBuilder("127.0.0.1:5001", InsecureChannelCredentials.create()).build(); try{ HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); HelloResponse helloResponse = blockingStub.hello(helloRequest()); return HelloResponseDTO .builder() .name(helloResponse.getName()) .time(helloResponse.getTime()) .message(helloResponse.getMessage()) .build(); }finally { channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } private HelloRequest helloRequest() { return HelloRequest.newBuilder() .setName("Hello") .build(); } }
结论:
jmeter压测QPS最多2000左右,一直持续下去会报错java.net.BindException: Address already in use: connect
一直复用同一个连接
java
代码解读
复制代码
@Slf4j @Service public class HelloService { ManagedChannel channel = Grpc.newChannelBuilder("127.0.0.1:5001", InsecureChannelCredentials.create()).build(); public HelloResponseDTO hello() throws InterruptedException { try{ HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel); HelloResponse helloResponse = blockingStub.hello(helloRequest()); return HelloResponseDTO .builder() .name(helloResponse.getName()) .time(helloResponse.getTime()) .message(helloResponse.getMessage()) .build(); }finally { // channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } private HelloRequest helloRequest() { return HelloRequest.newBuilder() .setName("Hello") .build(); } }
结论:
- jmeter压测QPS稳定到9000+
- 没用异常
- 内存、CPU50+
正常复用连接
StubRepository
通过ThreadLocal实现复用连接
java
代码解读
复制代码
@Repository public class StubRepository { ThreadLocal<ManagedChannel> channelHolder = new ThreadLocal<>(); ThreadLocal<HelloServiceGrpc.HelloServiceBlockingStub> stubHolder = new ThreadLocal<>(); ThreadLocal<Long> creatTimeHolder = new ThreadLocal<>(); private ManagedChannel channel() { ManagedChannel managedChannel = channelHolder.get(); if (managedChannel != null){ managedChannel.shutdownNow(); } managedChannel = Grpc.newChannelBuilder("127.0.0.1:5001", InsecureChannelCredentials.create()).build(); channelHolder.set(managedChannel); return managedChannel; } private HelloServiceGrpc.HelloServiceBlockingStub stub() { HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel()); stubHolder.set(stub); return stub; } public HelloServiceGrpc.HelloServiceBlockingStub getStub() { Long createTime = creatTimeHolder.get(); Long now = System.currentTimeMillis(); if (createTime == null || now - createTime > 5000){ creatTimeHolder.set(now); return stub(); } return stubHolder.get(); } public HelloServiceGrpc.HelloServiceBlockingStub rebuildStub() { creatTimeHolder.set(System.currentTimeMillis()); return stub(); } }
HelloService
java
代码解读
复制代码
@Slf4j @Service public class HelloService { final StubRepository stubRepository; public HelloService(StubRepository stubRepository) { this.stubRepository = stubRepository; } public HelloResponseDTO hello() throws InterruptedException { try{ HelloServiceGrpc.HelloServiceBlockingStub stub = stubRepository.getStub(); HelloResponse helloResponse = stub.hello(helloRequest()); return HelloResponseDTO .builder() .name(helloResponse.getName()) .time(helloResponse.getTime()) .message(helloResponse.getMessage()) .build(); }finally { // channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } } private HelloRequest helloRequest() { return HelloRequest.newBuilder() .setName("Hello") .build(); } }
结论:
- jmeter压测QPS最终稳定7000+
- 没有异常
- 内存正常,CPU 50%
总结
- 基本使用:每次创建一个channel吞吐量2000左右顶破天
- 一直使用同一个channel:吞吐量最高9000多,但是当服务端出问题、重启什么的后,就会有问题。
- 通过ThreadLocal进行复用:吞吐量7000+。很稳定。这个方案解决了如下问题:
- 长期不活跃,服务端主动断开:可以catch住异常,掉通StubRepositoty的
rebuildStub重新连接即可 - 服务端不可用:catch住异常,重新连接即可
- 复用连接
- 通过ThreadLocal复用,避免线程争强,减少竞争
- 长期不活跃,服务端主动断开:可以catch住异常,掉通StubRepositoty的
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)