spring通过Spring Integration实现tcp通信
Spring Integration TCP 模块主要提供了以下组件:连接工厂 (): 负责创建和管理 TCP 连接。分为客户端 () 和服务端 () 两种类型通道适配器 (): 用于单向通信。入站适配器 (): 接收 TCP 数据并通过通道发送到应用。出站适配器 (): 将消息载荷通过 TCP 连接发送出去。消息网关 (Gateway): 用于请求-响应模式的双向通信。入站网关 (): 处理来自
1:Spring Integration TCP 核心概念
Spring Integration TCP 模块主要提供了以下组件:
连接工厂 (ConnectionFactory
): 负责创建和管理 TCP 连接。分为客户端 (TcpNetClientConnectionFactory
) 和服务端 (TcpNetServerConnectionFactory
) 两种类型
通道适配器 (Channel Adapter
): 用于单向通信。
入站适配器 (TcpReceivingChannelAdapter
): 接收 TCP 数据并通过通道发送到应用。
出站适配器 (TcpSendingMessageHandler
): 将消息载荷通过 TCP 连接发送出去。
消息网关 (Gateway
): 用于请求-响应模式的双向通信。
入站网关 (TcpInboundGateway
): 处理来自客户端的请求并返回响应。
出站网关 (TcpOutboundGateway
): 向服务器发送请求并等待响应。
2:代码实现
1)添加依赖
<!-- spring-integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>2.5.15</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>5.5.18</version>
</dependency>
2)创建tcp服务端配置
package **.**;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer; // 用于处理消息边界
@Configuration
public class TcpServerConfig {
/**
* 创建TCP服务器连接工厂
* 负责创建和管理服务器端的TCP连接
*
* @return AbstractServerConnectionFactory TCP服务器连接工厂实例
*/
@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
// 创建NIO服务器连接工厂,监听端口12345
TcpNioServerConnectionFactory factory = new TcpNioServerConnectionFactory(12345);
// 设置序列化器:使用LF(换行符)作为消息分隔符的字节数组序列化器
// 这是解决TCP粘包/拆包问题的关键配置
factory.setSerializer(new ByteArrayLfSerializer());
// 设置反序列化器:同样使用LF作为分隔符
factory.setDeserializer(new ByteArrayLfSerializer());
// 可选配置:设置读取超时时间(毫秒)
// factory.setSoTimeout(5000);
// 可选配置:设置服务器 backlog(等待连接队列大小)
// factory.setBacklog(100);
return factory;
}
/**
* 创建TCP入站网关
* 负责处理来自客户端的TCP连接请求,并将请求转发到指定的消息通道
*
* @param connectionFactory TCP连接工厂
* @return TcpInboundGateway TCP入站网关实例
*/
@Bean
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway gateway = new TcpInboundGateway();
// 设置连接工厂
gateway.setConnectionFactory(connectionFactory);
// 设置请求通道名称,接收到的消息将发送到此通道
gateway.setRequestChannelName("tcpServerInChannel");
// 可选:设置错误通道名称,用于处理连接或处理过程中出现的错误
// gateway.setErrorChannelName("tcpErrorChannel");
return gateway;
}
/**
* 服务端消息处理器
* 处理从TCP连接接收到的消息,并返回响应
*
* @param messagePayload 接收到的消息字节数组
* @return byte[] 响应字节数组
*/
@ServiceActivator(inputChannel = "tcpServerInChannel")
public byte[] processRequest(byte[] messagePayload) {
// 将字节数组转换为字符串
String request = new String(messagePayload);
// 打印接收到的消息(实际应用中应替换为业务逻辑)
System.out.println("Server received: " + request);
// 处理请求并生成响应(这里简单返回处理后的字符串)
return ("Processed: " + request).getBytes();
}
}
3)创建tcp客户端配置
package **.**;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer;
@Configuration
public class TcpClientConfig {
/**
* 创建TCP客户端连接工厂
* 负责创建和管理客户端的TCP连接
*
* @return AbstractClientConnectionFactory TCP客户端连接工厂实例
*/
@Bean
public AbstractClientConnectionFactory clientConnectionFactory() {
// 创建TCP客户端连接工厂,连接到localhost:12345
TcpNetClientConnectionFactory factory = new TcpNetClientConnectionFactory("localhost", 12345);
// 设置序列化器:使用LF(换行符)作为消息分隔符的字节数组序列化器
// 与服务端的序列化器配置必须一致
factory.setSerializer(new ByteArrayLfSerializer());
// 设置反序列化器:同样使用LF作为分隔符
factory.setDeserializer(new ByteArrayLfSerializer());
// 设置为长连接(false),而不是每次发送后关闭连接(true)
factory.setSingleUse(false);
return factory;
}
/**
* 创建TCP出站网关
* 负责通过TCP连接发送消息并接收响应
*
* @param connectionFactory TCP连接工厂
* @return TcpOutboundGateway TCP出站网关实例
*/
@Bean
@ServiceActivator(inputChannel = "tcpClientOutChannel")
public TcpOutboundGateway tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway gateway = new TcpOutboundGateway();
// 设置连接工厂
gateway.setConnectionFactory(connectionFactory);
// 可选:设置是否等待响应(true)或只是发送消息(false)
// gateway.setRemoteTimeout(5000); // 设置等待响应的超时时间
return gateway;
}
/**
* 消息网关接口
* 提供类型安全的接口用于发送TCP消息
* Spring Integration会自动实现此接口
*/
@MessagingGateway(defaultRequestChannel = "tcpClientOutChannel")
public interface TcpClientGateway {
/**
* 发送消息到TCP服务器并接收响应
*
* @param message 要发送的消息字节数组
* @return byte[] 服务器响应的字节数组
*/
byte[] send(byte[] message);
}
}
4)处理消息
在服务端,使用 @ServiceActivator
注解的方法来处理接收到的消息并返回响应。
在客户端,通过注入的 MessagingGateway
接口来发送消息并接收响应:
package **.**;
import com.ruoyi.lxsjgl.config.TcpClientConfig;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/tcp")
public class TcpClientController {
// 注入TCP客户端网关
@Resource
private TcpClientConfig.TcpClientGateway tcpClientGateway;
/**
* REST接口:通过TCP发送消息
*
* @param message 要发送的消息内容
* @return String 包含服务器响应的结果字符串
*/
@GetMapping("/sendMessage")
public String sendMessageOverTcp(@RequestParam String message) {
try {
// 通过TCP客户端网关发送消息
byte[] responseBytes = tcpClientGateway.send(message.getBytes());
// 将响应字节数组转换为字符串
String response = new String(responseBytes);
// 返回格式化后的响应
return "Server responded: " + response;
} catch (Exception e) {
// 处理异常情况
return "Error sending message: " + e.getMessage();
}
}
}

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐
所有评论(0)