W5500 micropython 驱动测试 网线直连电脑静态IP
【ESP32连接W5500网络模块问题解决】使用ESP32的MicroPython连接W5500模块时遇到两个关键问题:1)DHCP服务器初始化失败,需通过is_dhcp=False参数手动关闭;2)设置静态IP时出现类型错误,需通过ip_to_bytes()函数将IP字符串转换为字节格式。解决方法包括:修改初始化参数、自定义IP转换函数,并通过socket.set_interface(nic)设
W5500 Lite 丝印简写:

接线按照:sck=Pin(26),mosi=Pin(25),miso=Pin(13) cs=Pin(27,Pin.OUT),电源3.3V
1、网线直连电脑,初始化失败,默认是有DHCP服务器
#创建W55xx驱动对象
nic =WIZNET5K(spi,cs,rst)
在ESP32的mpy下运行上述代码报错:
File "wiznet5k.py", line 170, in __init__ AssertionError: Failed to configure DHCP Server!
静态IP有构造方法 nic=WIZNET5K(spi,cs,rst,is_dhcp=False)
def __init__(self, spi_bus, cs, reset=None, is_dhcp=True, mac=DEFAULT_MAC, hostname=None, dhcp_timeout=30, debug=False)
2、手动设置网络参数
nic.ifconfig = ('192.168.1.100', '255.255.255.0', '192.168.1.1', '8.8.8.8')
报错 TypeError: can't convert str to int
@property
def ifconfig(self):
"""Returns the network configuration as a tuple."""
print('IFCONFIG')
return (self.ip_address, self.read(REG_SUBR, 0x00, 4), self.read(REG_GAR, 0x00, 4), self._dns)
@ifconfig.setter
def ifconfig(self, params):
"""Sets network configuration to provided tuple in format:
(ip_address, subnet_mask, gateway_address, dns_server).
"""
ip_address, subnet_mask, gateway_address, dns_server = params
self.write(REG_SIPR, 0x04, ip_address)
self.write(REG_SUBR, 0x04, subnet_mask)
self.write(REG_GAR, 0x04, gateway_address)
self._dns = dns_server
解决办法:
def ip_to_bytes(ip_str):
"""将IP地址字符串转换为字节格式"""
return bytes([int(part) for part in ip_str.split('.')])
nic.ifconfig = (
ip_to_bytes('172.16.30.119'),
ip_to_bytes('255.255.255.0'),
ip_to_bytes('172.16.30.254'),
ip_to_bytes('8.8.8.8')
)
from wiznet5k import WIZNET5K
from machine import Pin,SPI
import wiznet5k_socket as socket
import sma_esp32_w5500_requests as requests
import time
#自定义接线引脚
spi=SPI(2,baudrate=8000000,sck=Pin(26),mosi=Pin(25),miso=Pin(13))
#CS对应的GPIO
cs=Pin(27,Pin.OUT)
#虚指GPIO实际接高电平即可
rst=Pin(39)
#创建W5500驱动对象 is_dhcp=False
nic=WIZNET5K(spi,cs,rst,is_dhcp=False)
#打印相关信息
print("\n\n以太网芯片版本:", nic.chip)
print("网卡MAC地址:",[hex(i) for i in nic.mac_address])
print("IP地址:",nic.pretty_ip(nic.ip_address))
# def ip_to_bytes(ip_str):
# parts = ip_str.split('.')
# # 将每个部分转换为整数,然后转换为字节
# return bytes(int(part) for part in parts)
def ip_to_bytes(ip_str):
"""将IP地址字符串转换为字节格式"""
return bytes([int(part) for part in ip_str.split('.')])
# 手动设置网络参数
# nic.ifconfig = ('192.168.1.100', '255.255.255.0', '192.168.1.1', '8.8.8.8')
# TypeError: can't convert str to int
nic.ifconfig = (
ip_to_bytes('172.16.30.119'),
ip_to_bytes('255.255.255.0'),
ip_to_bytes('172.16.30.254'),
ip_to_bytes('8.8.8.8')
)
print("网络配置设置成功")
'''设置网络接口
关键点:在创建 socket 之前,必须调用 socket.set_interface(nic) 来设置 W5500 为默认网络接口。
这是解决 'NoneType' object has no attribute 'get_socket' 错误的关键步骤'''
socket.set_interface(nic)
print("网络接口设置成功")
#创建udp套接字
udp_socket =socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("UDP socket创建成功!")
# 测试socket功能
udp_socket.settimeout(1.0)
print("Socket配置完成")
#准备接收方的地址
dest_addr = ('172.16.30.133', 8080)
print("IP地址:",nic.pretty_ip(nic.ip_address))
#发送数据到指定的电脑上的指定程序中
for i in range(1000000):
send_data="hello world--%d"%i
print(send_data)
udp_socket.sendto(send_data.encode('utf-8'),dest_addr)
time.sleep(0.1)
#关闭套接字
udp_socket.close()

第一部分:上面写的这些常识。但UDP与TCP连接对抽象层socket有不同的需求。
硬件驱动层 (wiznet5k.py)基础通信正常:TCP连接建立、数据传输、状态机转换。
核心方法工作:
socket_open() - 正确打开socket
socket_listen() - 监听功能正常
socket_accept() - 接受连接(有socket交换机制)
socket_write() - 数据发送正常(返回实际字节数)
socket_read() - 数据接收正常
socket_status() - 状态读取准确
第二部分:TCP测试与抽象层API修复
W5500芯片内置8个独立的物理Socket(编号0-7)
每个Socket独立工作,有自己的寄存器组
硬件限制:最大并发连接数 = 8(包括监听Socket)
二、W5500硬件特殊性
1. 8个物理Socket硬件实现
W5500芯片内置8个独立的物理Socket(编号0-7)
每个Socket独立工作,有自己的寄存器组
硬件限制:最大并发连接数 = 8(包括监听Socket)
2. Socket状态机严格
# W5500 Socket状态编码
SNSR_SOCK_CLOSED = 0x00 # 关闭
SNSR_SOCK_INIT = 0x13 # 初始化
SNSR_SOCK_LISTEN = 0x14 # 监听
SNSR_SOCK_ESTABLISHED = 0x17 # 已连接
SNSR_SOCK_UDP = 0x22 # UDP模式
3. 监听与数据Socket分离
监听Socket(LISTEN状态)专门用于接受连接
数据Socket(ESTABLISHED状态)专门用于数据传输
accept()返回的是新的数据Socket

三层架构模型
┌─────────────────────────────────────┐
│应用程序层 (Application) │ ← 使用标准socket API
│ ↓ 标准socket API │
├─────────────────────────────────────┤
│socket抽象层 (wiznet5k_socket.py) │ ← 关键桥梁,API转换
│ ↓ 封装硬件细节,提供标准接口 │
├─────────────────────────────────────┤
│硬件驱动层 (wiznet5k.py) │ ← 直接操作W5500寄存器
│ ↓ SPI通信,寄存器操作 │
└─────────────────────────────────────┘
TCP连接流程正常,三次握手:CLOSED → INIT → LISTEN → SYNRECV → ESTABLISHED
Socket编号交换机制:监听socket和数据socket正确分离
多端口监听支持:可同时监听多个端口(8个物理socket)
抽象层 wiznet5k_socket.py 的主要bug已经通过deepseek修订,如下直接使用:
# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT
"""
`wiznet5k_socket`
================================================================================
A socket compatible interface with the Wiznet5k module.
修复版:解决响应延迟问题
* Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick, Vincenzo D'Angelo
* 修复: 响应延迟修复
"""
import gc
import time
from micropython import const
SNSR_SOCK_CLOSED = const(0x00)
SNSR_SOCK_INIT = const(0x13)
SNSR_SOCK_LISTEN = const(0x14)
SNSR_SOCK_SYNSENT = const(0x15)
SNSR_SOCK_SYNRECV = const(0x16)
SNSR_SOCK_ESTABLISHED = const(0x17)
SNSR_SOCK_FIN_WAIT = const(0x18)
SNSR_SOCK_CLOSING = const(0x1A)
SNSR_SOCK_TIME_WAIT = const(0x1B)
SNSR_SOCK_CLOSE_WAIT = const(0x1C)
SNSR_SOCK_LAST_ACK = const(0x1D)
SNSR_SOCK_UDP = const(0x22)
SNMR_TCP = const(0x21)
SNMR_UDP = const(0x02)
_the_interface = None # pylint: disable=invalid-name
def set_interface(iface):
"""Helper to set the global internet interface."""
global _the_interface # pylint: disable=global-statement, invalid-name
_the_interface = iface
def htonl(x):
"""Convert 32-bit positive integers from host to network byte order."""
return (
((x) << 24 & 0xFF000000)
| ((x) << 8 & 0x00FF0000)
| ((x) >> 8 & 0x0000FF00)
| ((x) >> 24 & 0x000000FF)
)
def htons(x):
"""Convert 16-bit positive integers from host to network byte order."""
return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF)
SOCK_STREAM = const(0x21) # TCP
TCP_MODE = 80
SOCK_DGRAM = const(0x02) # UDP
AF_INET = const(3)
SOCKET_INVALID = const(255)
# pylint: disable=too-many-arguments, unused-argument
def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
"""Translate the host/port argument into a sequence of 5-tuples that
contain all the necessary arguments for creating a socket connected to that service.
"""
if not isinstance(port, int):
raise RuntimeError("Port must be an integer")
if is_ipv4(host):
return [(AF_INET, socktype, proto, "", (host, port))]
return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))]
def gethostbyname(hostname):
"""Translate a host name to IPv4 address format. The IPv4 address
is returned as a string.
:param str hostname: Desired hostname.
"""
addr = _the_interface.get_host_by_name(hostname)
addr = "{}.{}.{}.{}".format(addr[0], addr[1], addr[2], addr[3])
return addr
def is_ipv4(host):
"""Checks if a host string is an IPv4 address.
:param str host: host's name or ip
"""
octets = host.split(".", 3)
if len(octets) != 4 or not "".join(octets).isdigit():
return False
for octet in octets:
if int(octet) > 255:
return False
return True
# pylint: disable=invalid-name, too-many-public-methods
class socket:
"""A simplified implementation of the Python 'socket' class
for connecting to a Wiznet5k module.
修复版:优化recv()方法避免阻塞延迟
:param int family: Socket address (and protocol) family.
:param int type: Socket type.
"""
# pylint: disable=redefined-builtin,unused-argument
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None):
if family != AF_INET:
raise RuntimeError("Only AF_INET family supported by W5K modules.")
self._sock_type = type
self._buffer = b""
self._timeout = 0
self._listen_port = None
self._socknum = _the_interface.get_socket()
if self._socknum == SOCKET_INVALID:
raise RuntimeError("Failed to allocate socket.")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._sock_type == SOCK_STREAM:
self.disconnect()
stamp = time.time()
while self.status == SNSR_SOCK_FIN_WAIT:
if time.time() - stamp > 1000:
raise RuntimeError("Failed to disconnect socket")
self.close()
stamp = time.time()
while self.status != SNSR_SOCK_CLOSED:
if time.time() - stamp > 1000:
raise RuntimeError("Failed to close socket")
@property
def socknum(self):
"""Returns the socket object's socket number."""
return self._socknum
@property
def status(self):
"""Returns the status of the socket"""
return _the_interface.socket_status(self.socknum)[0]
@property
def connected(self):
"""Returns whether or not we are connected to the socket."""
if self.socknum >= _the_interface.max_sockets:
return False
status = _the_interface.socket_status(self.socknum)[0]
if (
status == SNSR_SOCK_CLOSE_WAIT
and self.available() == 0
):
result = False
else:
result = status not in (
SNSR_SOCK_CLOSED,
SNSR_SOCK_LISTEN,
SNSR_SOCK_TIME_WAIT,
SNSR_SOCK_FIN_WAIT,
)
if not result and status != SNSR_SOCK_LISTEN:
self.close()
return result
def getpeername(self):
"""Return the remote address to which the socket is connected."""
return _the_interface.remote_ip(self.socknum)
def inet_aton(self, ip_string):
"""Convert an IPv4 address from dotted-quad string format.
:param str ip_string: IP Address, as a dotted-quad string.
"""
self._buffer = b""
self._buffer = [int(item) for item in ip_string.split(".")]
self._buffer = bytearray(self._buffer)
return self._buffer
def bind(self, address):
"""Bind the socket to the listen port, if host is specified the interface
will be reconfigured to that IP.
:param tuple address: local socket as a (host, port) tuple.
"""
if address[0] is not None:
ip_address = _the_interface.unpretty_ip(address[0])
current_ip, subnet_mask, gw_addr, dns = _the_interface.ifconfig
if ip_address != current_ip:
_the_interface.ifconfig = (ip_address, subnet_mask, gw_addr, dns)
self._listen_port = address[1]
# For UDP servers we need to open the socket here because we won't call
# listen
if self._sock_type == SOCK_DGRAM:
_the_interface.socket_listen(
self.socknum, self._listen_port, SNMR_UDP
)
self._buffer = b""
def listen(self, backlog=None):
"""Listen on the port specified by bind.
:param backlog: For compatibility but ignored.
"""
assert self._listen_port is not None, "Use bind to set the port before listen!"
_the_interface.socket_listen(self.socknum, self._listen_port)
self._buffer = b""
def accept(self):
"""Accept a connection. The socket must be bound to an address and listening for
connections. The return value is a pair (conn, address) where conn is a new
socket object usable to send and receive data on the connection, and address is
the address bound to the socket on the other end of the connection.
"""
stamp = time.time()
while self.status not in (
SNSR_SOCK_SYNRECV,
SNSR_SOCK_ESTABLISHED,
):
if self._timeout is not None and self._timeout > 0 and time.time() - stamp > self._timeout:
return None
if self.status == SNSR_SOCK_CLOSED:
self.close()
self.listen()
new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
current_socknum = self.socknum
# Create a new socket object and swap socket nums so we can continue listening
client_sock = socket()
client_sock._socknum = current_socknum # pylint: disable=protected-access
self._socknum = new_listen_socknum # pylint: disable=protected-access
self.bind((None, self._listen_port))
self.listen()
while self.status != SNSR_SOCK_LISTEN:
raise RuntimeError("Failed to open new listening socket")
return client_sock, addr
def connect(self, address, conntype=None):
"""Connect to a remote socket at address.
:param tuple address: Remote socket as a (host, port) tuple.
"""
assert (
conntype != 0x03
), "Error: SSL/TLS is not currently supported by CircuitPython."
host, port = address
if hasattr(host, "split"):
try:
host = tuple(map(int, host.split(".")))
except ValueError:
host = _the_interface.get_host_by_name(host)
if self._listen_port is not None:
_the_interface.src_port = self._listen_port
result = _the_interface.socket_connect(
self.socknum, host, port, conn_mode=self._sock_type
)
_the_interface.src_port = 0
if not result:
raise RuntimeError("Failed to connect to host", host)
self._buffer = b""
def send(self, data):
"""Send data to the socket. The socket must be connected to
a remote socket.
:param bytearray data: Desired data to send to the socket.
:return: Number of bytes actually sent.
"""
# 处理timeout参数 - 保持原始精度
if self._timeout is None:
timeout_val = None # None表示阻塞模式,传递给底层处理
elif isinstance(self._timeout, (int, float)):
timeout_val = self._timeout # 保持原始值
else:
timeout_val = 0 # 其他情况使用默认值
result = _the_interface.socket_write(self.socknum, data, timeout_val)
gc.collect()
return result # 返回实际发送的字节数
def sendto(self, data, address):
"""Send data to the socket. The socket must be connected to
a remote socket.
:param bytearray data: Desired data to send to the socket.
:param tuple address: Remote socket as a (host, port) tuple.
:return: Number of bytes actually sent.
"""
self.connect(address)
return self.send(data)
def recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches
"""Reads some bytes from the connected remote address.
修复版:优化接收逻辑,避免阻塞延迟
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
"""
if self.status == SNSR_SOCK_CLOSED:
return b""
if bufsize == 0:
# read everything on the socket
while True:
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
# 正确处理socket_read返回值
ret, data = _the_interface.socket_read(self.socknum, avail)
if ret > 0 and isinstance(data, (bytes, bytearray)):
self._buffer += data
else:
# 没有数据或数据无效
break
elif self._sock_type == SOCK_DGRAM:
# 同样修复UDP读取
ret, data = _the_interface.read_udp(self.socknum, avail)
if ret > 0 and isinstance(data, (bytes, bytearray)):
self._buffer += data
else:
break
else:
break
gc.collect()
ret = self._buffer
self._buffer = b""
gc.collect()
return ret
# 🔥 关键修复:如果有数据就立即返回,不要等待 bufsize
# 检查缓冲区是否已经有足够的数据
if len(self._buffer) >= bufsize:
ret = self._buffer[:bufsize]
self._buffer = self._buffer[bufsize:]
gc.collect()
return ret
stamp = time.time()
to_read = bufsize - len(self._buffer)
received = []
# 优化:每次循环检查是否有数据,而不是等待
while to_read > 0:
# 检查是否有数据可用
avail = self.available()
if avail:
# 只读取实际可用的数据,不超过需要的数据量
read_size = min(to_read, avail)
if self._sock_type == SOCK_STREAM:
ret, data = _the_interface.socket_read(self.socknum, read_size)
if ret > 0 and isinstance(data, (bytes, bytearray)):
self._buffer += data
to_read -= ret
elif self._sock_type == SOCK_DGRAM:
ret, data = _the_interface.read_udp(self.socknum, read_size)
if ret > 0 and isinstance(data, (bytes, bytearray)):
self._buffer += data
to_read -= ret
stamp = time.time() # 重置超时计时器
gc.collect()
# 检查超时
if self._timeout is not None and self._timeout > 0 and time.time() - stamp > self._timeout:
break
# 如果没有数据,短暂休眠避免CPU占用过高
if avail == 0:
time.sleep(0.001) # 1ms休眠
# 返回数据(可能没有达到bufsize)
if len(self._buffer) >= bufsize:
ret = self._buffer[:bufsize]
self._buffer = self._buffer[bufsize:]
else:
# 返回所有可用的数据
ret = self._buffer
self._buffer = b""
gc.collect()
return ret
def recvfrom(self, bufsize=0, flags=0):
"""Reads some bytes from the connected remote address.
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
:returns: a tuple (bytes, address) where address is a tuple (ip, port)
"""
return (
self.recv(bufsize),
(
_the_interface.remote_ip(self.socknum),
_the_interface.remote_port(self.socknum),
),
)
def recv_into(self, buf, nbytes=0, flags=0):
"""Reads some bytes from the connected remote address info the provided buffer.
:param bytearray buf: Data buffer
:param nbytes: Maximum number of bytes to receive
:param int flags: ignored, present for compatibility.
:returns: the number of bytes received
"""
if nbytes == 0:
nbytes = len(buf)
ret = self.recv(nbytes)
nbytes = len(ret)
buf[:nbytes] = ret
return nbytes
def recvfrom_into(self, buf, nbytes=0, flags=0):
"""Reads some bytes from the connected remote address info the provided buffer.
:param bytearray buf: Data buffer
:param nbytes: Maximum number of bytes to receive
:param int flags: ignored, present for compatibility.
:returns a tuple (nbytes, address) where address is a tuple (ip, port)
"""
return (
self.recv_into(buf, nbytes),
(
_the_interface.remote_ip(self.socknum),
_the_interface.remote_port(self.socknum),
),
)
def readline(self):
"""Attempt to return as many bytes as we can up to \
but not including '\r\n'.
"""
stamp = time.time()
while b"\r\n" not in self._buffer:
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
ret, data = _the_interface.socket_read(self.socknum, avail)
if ret > 0 and isinstance(data, (bytes, bytearray)):
self._buffer += data
elif self._sock_type == SOCK_DGRAM:
ret, data = _the_interface.read_udp(self.socknum, avail)
if ret > 0 and isinstance(data, (bytes, bytearray)):
self._buffer += data
if (
not avail
and self._timeout is not None
and self._timeout > 0
and time.time() - stamp > self._timeout
):
self.close()
raise RuntimeError("Didn't receive response, failing out...")
firstline, self._buffer = self._buffer.split(b"\r\n", 1)
gc.collect()
return firstline
def disconnect(self):
"""Disconnects a TCP socket."""
assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket."
_the_interface.socket_disconnect(self.socknum)
def close(self):
"""Closes the socket."""
_the_interface.socket_close(self.socknum)
def available(self):
"""Returns how many bytes of data are available to be read from the socket."""
return _the_interface.socket_available(self.socknum, self._sock_type)
def settimeout(self, value):
"""Sets socket read timeout.
:param value: Socket read timeout in seconds, or None for blocking mode.
"""
if value is None:
self._timeout = None
elif isinstance(value, (int, float)):
if value < 0:
raise Exception("Timeout period should be non-negative.")
self._timeout = value
else:
raise TypeError("Timeout must be a number or None")
def gettimeout(self):
"""Return the timeout in seconds (float) associated
with socket operations, or None if no timeout is set.
"""
return self._timeout
第三部分:多连接压力测试8个socket满负荷运行
W5500 Windows客户端 - V3.5 (增加长时间稳定性测试)
"""
W5500 Windows客户端 - V3.5 (增加长时间稳定性测试)
修复:恢复V2的连接策略
新增:1. 高频消息测试模式
2. 大数据包测试模式
3. 混合负载测试模式
4. 异常恢复测试模式
5. 长时间稳定性测试模式
修复:连接断开时自动重连
"""
import socket
import threading
import time
import json
from datetime import datetime
import random
import struct
# ==================== 配置 ====================
CONFIG = {
'server_ip': '172.16.30.75',
'server_port': 9050,
'test_mode': 'normal', # 'normal', 'high_freq', 'large_packet', 'mixed_load', 'recovery', 'long_term'
'test_duration': 30,
'log_file': 'w5500_final_fix.log',
}
# 不同测试模式的配置
TEST_MODES = {
'normal': {
'client_count': 8,
'message_interval': 1.0,
'message_size': 64,
'test_name': '常规压力测试'
},
'high_freq': {
'client_count': 8,
'message_interval': 0.2, # 5条/秒 → 40条/秒总吞吐
'message_size': 64,
'test_name': '高频小消息测试'
},
'large_packet': {
'client_count': 8,
'message_interval': 2.0,
'message_size': 1024, # 1KB大消息
'test_name': '大数据包测试'
},
'mixed_load': {
'client_count': 8,
'test_name': '混合负载测试',
'clients_config': [
{'interval': 0.2, 'size': 64}, # 快速客户端1
{'interval': 0.2, 'size': 64}, # 快速客户端2
{'interval': 0.5, 'size': 128}, # 中速客户端1
{'interval': 0.5, 'size': 128}, # 中速客户端2
{'interval': 1.0, 'size': 256}, # 慢速客户端1
{'interval': 1.0, 'size': 256}, # 慢速客户端2
{'interval': 2.0, 'size': 512}, # 超慢客户端1
{'interval': 2.0, 'size': 1024}, # 超慢客户端2(大数据包)
]
},
'recovery': {
'client_count': 8,
'test_name': '异常恢复测试',
'test_phases': [
{'name': 'phase1', 'duration': 10, 'action': 'normal'}, # 正常连接阶段
{'name': 'phase2', 'duration': 5, 'action': 'disconnect'}, # 断开阶段
{'name': 'phase3', 'duration': 5, 'action': 'reconnect'}, # 重连阶段
{'name': 'phase4', 'duration': 10, 'action': 'verify'}, # 验证阶段
]
},
'long_term': {
'client_count': 8,
'test_name': '长时间稳定性测试',
'duration': 3600, # 1小时
'interval_range': [0.5, 5.0], # 随机间隔
'size_range': [64, 1024], # 随机大小
'monitoring': {
'memory_check': 60, # 每60秒检查内存
'connection_check': 30 # 每30秒检查连接
}
}
}
# ==================== 统计 ====================
class TestStatsV35:
def __init__(self):
self.start_time = None
self.end_time = None
self.clients_connected = 0
self.total_sent = 0
self.total_received = 0
self.total_bytes_sent = 0
self.total_bytes_received = 0
self.connection_errors = 0
self.send_errors = 0
self.receive_errors = 0
self.reconnections = 0
self.disconnections = 0
self.test_mode = 'normal'
self.client_types = {}
self.current_phase = 'initial'
self.long_term_stats = {
'memory_usage': [],
'connection_counts': [],
'throughput_history': [],
'error_counts': []
}
def start_test(self):
self.start_time = time.time()
def end_test(self):
self.end_time = time.time()
def get_duration(self):
if self.start_time and self.end_time:
return self.end_time - self.start_time
elif self.start_time:
return time.time() - self.start_time
return 0
def add_reconnection(self):
self.reconnections += 1
def add_disconnection(self):
self.disconnections += 1
def record_long_term_stat(self, stat_type, value):
"""记录长时间测试的统计"""
timestamp = time.time() - self.start_time
if stat_type == 'memory':
self.long_term_stats['memory_usage'].append((timestamp, value))
elif stat_type == 'connections':
self.long_term_stats['connection_counts'].append((timestamp, value))
elif stat_type == 'throughput':
self.long_term_stats['throughput_history'].append((timestamp, value))
elif stat_type == 'errors':
self.long_term_stats['error_counts'].append((timestamp, value))
stats = TestStatsV35()
# ==================== 客户端类 - V3.5版 ====================
class ClientV35:
def __init__(self, client_id, client_config=None):
self.id = client_id
self.socket = None
self.running = False
self.connected = False
self.sent_count = 0
self.recv_count = 0
self.bytes_sent = 0
self.bytes_received = 0
self.send_times = []
self.thread = None
self.disconnect_logged = False
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5 # 增加重连次数
self.last_disconnect_time = 0
self.reconnection_successful = False
self.random_interval = False
self.random_size = False
# 测试配置
if client_config:
# 混合负载模式使用传入的配置
self.message_interval = client_config['interval']
self.message_size = client_config['size']
self.client_type = self.get_client_type(client_config)
else:
# 其他模式使用默认配置
if CONFIG['test_mode'] in TEST_MODES:
test_config = TEST_MODES[CONFIG['test_mode']]
if 'message_interval' in test_config:
self.message_interval = test_config['message_interval']
self.message_size = test_config['message_size']
elif CONFIG['test_mode'] == 'long_term':
# 长时间测试使用随机配置
self.random_interval = True
self.random_size = True
self.message_interval = random.uniform(*test_config['interval_range'])
self.message_size = random.randint(*test_config['size_range'])
else:
# 异常恢复测试使用默认值
self.message_interval = 1.0
self.message_size = 64
else:
self.message_interval = 1.0
self.message_size = 64
self.client_type = 'standard'
self.test_mode = CONFIG['test_mode']
def get_client_type(self, config):
"""根据配置确定客户端类型"""
if config['interval'] <= 0.3:
speed = 'fast'
elif config['interval'] <= 0.8:
speed = 'medium'
elif config['interval'] <= 1.5:
speed = 'slow'
else:
speed = 'very_slow'
if config['size'] >= 512:
size = 'large'
elif config['size'] >= 128:
size = 'medium'
else:
size = 'small'
return f"{speed}_{size}"
def connect(self):
try:
print(f"[Client{self.id:02d}] 连接中...")
self.socket = socket.socket()
self.socket.settimeout(5.0) # 连接超时
self.socket.connect((CONFIG['server_ip'], CONFIG['server_port']))
self.socket.settimeout(0.5) # 接收超时
self.connected = True
stats.clients_connected += 1
# 记录重连成功
if self.reconnect_attempts > 0:
stats.add_reconnection()
self.reconnection_successful = True
print(f"[Client{self.id:02d}] 重连成功 (第{self.reconnect_attempts}次尝试)")
else:
print(f"[Client{self.id:02d}] 连接成功")
return True
except Exception as e:
print(f"[Client{self.id:02d}] 连接失败: {e}")
stats.connection_errors += 1
return False
def reconnect(self):
"""重连方法"""
if self.reconnect_attempts >= self.max_reconnect_attempts:
print(f"[Client{self.id:02d}] 已达到最大重连次数 ({self.max_reconnect_attempts})")
return False
self.reconnect_attempts += 1
print(f"[Client{self.id:02d}] 尝试重连 ({self.reconnect_attempts}/{self.max_reconnect_attempts})...")
# 关闭旧socket
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
self.connected = False
self.disconnect_logged = False
# 等待一段时间再重连
wait_time = min(1.0 * self.reconnect_attempts, 3.0) # 减少等待时间
print(f"[Client{self.id:02d}] 等待{wait_time:.1f}秒后重连...")
time.sleep(wait_time)
return self.connect()
def start(self):
self.running = True
self.thread = threading.Thread(target=self.run_client, daemon=True)
self.thread.start()
def run_client(self):
if not self.connect():
self.running = False
return
start_time = time.time()
duration = CONFIG['test_duration']
buffer = b""
last_send_time = 0
phase_start_time = start_time
current_phase_index = 0
last_random_change = start_time
last_connection_check = start_time
connection_check_interval = 2.0 # 每2秒检查一次连接
# 长时间测试的特殊设置
if self.test_mode == 'long_term':
duration = TEST_MODES['long_term']['duration']
print(f"[Client{self.id:02d}] 长时间测试: {duration}秒, "
f"初始间隔: {self.message_interval:.1f}s, 大小: {self.message_size}字节")
# 异常恢复测试的阶段管理
elif self.test_mode == 'recovery':
phases = TEST_MODES['recovery']['test_phases']
total_phase_duration = sum(phase['duration'] for phase in phases)
duration = min(duration, total_phase_duration)
try:
while self.running and time.time() - start_time < duration:
current_time = time.time()
elapsed = current_time - start_time
# 检查连接状态(定期主动检查)
if current_time - last_connection_check >= connection_check_interval:
last_connection_check = current_time
if not self.connected:
print(f"[Client{self.id:02d}] 检测到连接断开,尝试重连...")
if not self.reconnect():
print(f"[Client{self.id:02d}] 重连失败,停止客户端")
break
else:
# 重连成功,重置缓冲区
buffer = b""
# 长时间测试的随机变化
if self.test_mode == 'long_term' and self.random_interval:
if current_time - last_random_change > 30.0: # 每30秒变化一次
self.message_interval = random.uniform(*TEST_MODES['long_term']['interval_range'])
self.message_size = random.randint(*TEST_MODES['long_term']['size_range'])
last_random_change = current_time
print(f"[Client{self.id:02d}] 配置变化: 间隔{self.message_interval:.1f}s, 大小{self.message_size}字节")
# 异常恢复测试的阶段检查
elif self.test_mode == 'recovery':
phase_elapsed = current_time - phase_start_time
if current_phase_index < len(phases) - 1 and phase_elapsed >= phases[current_phase_index]['duration']:
current_phase_index += 1
phase_start_time = current_time
new_phase = phases[current_phase_index]
stats.current_phase = new_phase['name']
print(f"\n[阶段切换] 进入{new_phase['name']}: {new_phase['action']}")
# 执行阶段动作
if new_phase['action'] == 'disconnect':
if self.id in [7, 8]: # 随机断开2个客户端
print(f"[Client{self.id:02d}] 执行主动断开")
self.force_disconnect()
continue
elif new_phase['action'] == 'reconnect':
if self.id in [7, 8] and not self.connected:
print(f"[Client{self.id:02d}] 尝试重新连接")
self.reconnect()
# 根据连接状态决定是否发送消息
if self.connected:
# 根据间隔发送消息
if current_time - last_send_time >= self.message_interval:
if not self.send_message():
# 发送失败,立即尝试重连
print(f"[Client{self.id:02d}] 发送失败,尝试重连...")
if self.reconnect():
# 重连成功后立即发送消息
if self.send_message():
last_send_time = current_time
continue
else:
print(f"[Client{self.id:02d}] 重连失败,停止客户端")
break
last_send_time = current_time
# 接收消息
buffer = self.receive_messages(buffer)
# 根据测试模式调整休眠时间
sleep_time = self.get_sleep_time()
time.sleep(sleep_time)
except Exception as e:
print(f"[Client{self.id:02d}] 运行错误: {e}")
import traceback
traceback.print_exc()
finally:
self.stop_client()
def force_disconnect(self):
"""强制断开连接(用于异常恢复测试)"""
if self.connected and self.socket:
try:
# 发送断开通知
disconnect_msg = f"BYE Client{self.id:02d} (主动断开)\r\n"
self.socket.settimeout(0.1)
self.socket.send(disconnect_msg.encode())
except:
pass
# 关闭socket
try:
self.socket.close()
except:
pass
stats.clients_connected -= 1
stats.add_disconnection()
self.connected = False
self.last_disconnect_time = time.time()
print(f"[Client{self.id:02d}] 已主动断开连接")
def get_sleep_time(self):
"""根据客户端类型获取休眠时间"""
if self.test_mode == 'high_freq':
return 0.001
elif self.test_mode == 'mixed_load':
if self.message_interval <= 0.3:
return 0.001 # 快速客户端
elif self.message_interval <= 0.8:
return 0.005 # 中速客户端
elif self.message_interval <= 1.5:
return 0.01 # 慢速客户端
else:
return 0.05 # 超慢客户端(大数据包)
elif self.test_mode == 'large_packet':
return 0.05
elif self.test_mode == 'long_term':
return 0.01 # 长时间测试使用中等休眠
else:
return 0.01
def send_message(self):
"""发送消息"""
# 检查连接状态
if not self.connected or not self.socket:
if self.running and not self.disconnect_logged:
print(f"[Client{self.id:02d}] 发送时连接已断开,尝试重连...")
if self.reconnect():
# 重连成功,继续发送
print(f"[Client{self.id:02d}] 重连成功,继续发送")
else:
print(f"[Client{self.id:02d}] 重连失败")
return False
else:
return False
try:
self.sent_count += 1
# 根据消息大小选择发送方式
if self.message_size >= 512:
# 大数据包使用结构化数据
message = self.create_large_message()
elif self.message_size >= 128:
# 中等大小消息
message = self.create_medium_message()
else:
# 小消息
message = self.create_small_message()
# 设置发送超时
self.socket.settimeout(1.0)
sent = self.socket.send(message)
if sent == len(message):
stats.total_sent += 1
stats.total_bytes_sent += sent
self.bytes_sent += sent
self.send_times.append(time.time())
# 根据测试模式和消息频率调整日志输出
self.log_send_progress(sent)
return True
else:
print(f"[Client{self.id:02d}] 发送不完整 ({sent}/{len(message)}字节)")
stats.send_errors += 1
self.connected = False
return False
except socket.timeout:
print(f"[Client{self.id:02d}] 发送超时")
stats.send_errors += 1
self.connected = False
return False
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError, ConnectionError) as e:
print(f"[Client{self.id:02d}] 发送时连接断开: {e}")
stats.send_errors += 1
self.connected = False
# 尝试重连
if self.running:
return self.reconnect() # 返回重连结果
return False
except Exception as e:
err_str = str(e)
if "10038" in err_str or "10053" in err_str or "10054" in err_str:
print(f"[Client{self.id:02d}] 连接错误: {e}")
stats.send_errors += 1
self.connected = False
return False
print(f"[Client{self.id:02d}] 发送失败: {e}")
stats.send_errors += 1
return False
def create_small_message(self):
"""创建小消息"""
timestamp = datetime.now().strftime('%H:%M:%S.%f')[:12]
base_msg = f"Client{self.id:02d} Msg{self.sent_count:04d} {timestamp}"
# 如果消息长度不够,填充随机数据
if len(base_msg) < self.message_size - 2:
padding = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789',
k=self.message_size - len(base_msg) - 3))
msg = f"{base_msg} {padding}\r\n"
else:
msg = f"{base_msg[:self.message_size-3]}...\r\n"
return msg.encode()
def create_medium_message(self):
"""创建中等大小消息"""
timestamp = time.time()
msg_id = self.sent_count
# 创建消息头
header = struct.pack('!HHf', self.id, msg_id, timestamp)
# 计算需要填充的数据大小
remaining = self.message_size - len(header) - 2 # -2 for \r\n
if remaining > 0:
# 生成填充数据(随机文本)
text_data = f"Medium message #{msg_id} from Client{self.id:02d} "
padding_len = remaining - len(text_data.encode())
if padding_len > 0:
padding = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=padding_len))
full_text = text_data + padding
else:
full_text = text_data[:remaining]
padding_bytes = full_text.encode()[:remaining]
else:
padding_bytes = b""
# 组合完整消息
message = header + padding_bytes + b"\r\n"
# 确保消息大小正确
if len(message) != self.message_size:
# 调整填充数据
padding_len = self.message_size - len(header) - 2
padding = bytes([random.randint(32, 126) for _ in range(padding_len)])
message = header + padding + b"\r\n"
return message
def create_large_message(self):
"""创建大数据包"""
timestamp = time.time()
msg_id = self.sent_count
# 创建消息头
header = struct.pack('!HHd', self.id, msg_id, timestamp)
# 计算需要填充的数据大小
remaining = self.message_size - len(header) - 2 # -2 for \r\n
if remaining > 0:
# 生成填充数据(结构化数据)
# 第一部分:文本描述
text_part = f"Large packet #{msg_id} from Client{self.id:02d} ".encode()
# 第二部分:二进制数据
binary_part = bytes([(i + msg_id) % 256 for i in range(256)])
# 第三部分:重复填充以达到指定大小
repeat_count = (remaining - len(text_part)) // len(binary_part) + 1
padding = text_part + (binary_part * repeat_count)
padding = padding[:remaining]
else:
padding = b""
# 组合完整消息
message = header + padding + b"\r\n"
# 确保消息大小正确
if len(message) != self.message_size:
# 简单填充
padding_len = self.message_size - len(header) - 2
padding = bytes([random.randint(0, 255) for _ in range(padding_len)])
message = header + padding + b"\r\n"
return message
def log_send_progress(self, sent_bytes):
"""记录发送进度"""
if self.test_mode == 'long_term':
# 长时间测试减少日志输出
if self.sent_count % 100 == 0:
elapsed = time.time() - self.send_times[0] if self.send_times else 0
if elapsed > 0:
rate = self.sent_count / elapsed
throughput = self.bytes_sent / elapsed
print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, "
f"运行{elapsed:.0f}秒, 速率: {rate:.1f}条/秒")
elif self.test_mode == 'recovery':
# 异常恢复测试减少日志输出
if self.sent_count % 20 == 0:
elapsed = time.time() - self.send_times[0] if self.send_times else 0
if elapsed > 0:
rate = self.sent_count / elapsed
print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, 阶段: {stats.current_phase}")
elif self.test_mode == 'mixed_load':
log_interval = max(5, int(5 / self.message_interval))
if self.sent_count % log_interval == 0:
elapsed = time.time() - self.send_times[0] if self.send_times else 0
if elapsed > 0:
rate = self.sent_count / elapsed
throughput = self.bytes_sent / elapsed
print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, "
f"速率: {rate:.1f}条/秒, 吞吐: {throughput/1024:.1f}KB/s")
elif self.test_mode == 'high_freq':
if self.sent_count % 50 == 0:
elapsed = time.time() - self.send_times[0] if self.send_times else 0
if elapsed > 0:
rate = self.sent_count / elapsed
print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, 速率: {rate:.1f}条/秒")
elif self.test_mode == 'large_packet':
if self.sent_count % 5 == 0:
print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条 ({sent_bytes}字节)")
else:
if self.sent_count % 20 == 0:
print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条")
def receive_messages(self, buffer):
"""接收消息"""
if not self.connected or not self.socket:
# 尝试重连
if self.running and not self.disconnect_logged:
print(f"[Client{self.id:02d}] 接收时连接已断开,尝试重连...")
if self.reconnect():
print(f"[Client{self.id:02d}] 重连成功")
return buffer
else:
print(f"[Client{self.id:02d}] 重连失败,停止接收")
return buffer
try:
# 根据消息大小调整接收缓冲区
recv_size = max(1024, self.message_size * 2)
data = self.socket.recv(recv_size)
if data:
buffer += data
self.bytes_received += len(data)
stats.total_bytes_received += len(data)
while b'\r\n' in buffer:
idx = buffer.find(b'\r\n')
message = buffer[:idx]
buffer = buffer[idx+2:]
if message:
self.recv_count += 1
stats.total_received += 1
# 根据测试模式调整日志输出
self.log_receive_progress(message)
except socket.timeout:
pass
except (ConnectionResetError, BrokenPipeError):
if not self.disconnect_logged:
print(f"[Client{self.id:02d}] 接收时连接已断开,尝试重连...")
self.disconnect_logged = True
self.connected = False
stats.add_disconnection()
# 尝试立即重连
if self.running:
if self.reconnect():
self.disconnect_logged = False
else:
print(f"[Client{self.id:02d}] 重连失败")
stats.receive_errors += 1
except Exception as e:
err_str = str(e)
if "10038" in err_str or "10053" in err_str or "10054" in err_str:
if not self.disconnect_logged:
print(f"[Client{self.id:02d}] 接收时连接已关闭,尝试重连...")
self.disconnect_logged = True
self.connected = False
stats.add_disconnection()
# 尝试立即重连
if self.running:
if self.reconnect():
self.disconnect_logged = False
else:
print(f"[Client{self.id:02d}] 重连失败")
elif "timed out" not in err_str:
print(f"[Client{self.id:02d}] 接收错误: {e}")
stats.receive_errors += 1
return buffer
def log_receive_progress(self, message):
"""记录接收进度"""
if self.test_mode == 'long_term':
# 长时间测试特殊处理
if self.recv_count % 50 == 0:
try:
msg_str = message.decode('utf-8', errors='ignore')
if msg_str.startswith("ACK"):
print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count}")
except:
if self.recv_count % 100 == 0:
print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
elif self.test_mode == 'recovery':
# 异常恢复测试特殊处理
try:
msg_str = message.decode('utf-8', errors='ignore')
if msg_str.startswith("ACK"):
if self.recv_count % 10 == 0:
print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count} (阶段: {stats.current_phase})")
except:
if self.recv_count % 5 == 0:
print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
elif self.test_mode == 'mixed_load':
log_interval = max(10, int(10 / self.message_interval))
if self.recv_count % log_interval == 0:
try:
msg_str = message.decode('utf-8', errors='ignore')
if msg_str.startswith("ACK"):
print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count} ({len(message)}字节)")
except:
print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
elif self.test_mode == 'high_freq':
if self.recv_count % 100 == 0:
try:
msg_str = message.decode('utf-8', errors='ignore')
if msg_str.startswith("ACK"):
print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count}")
except:
pass
elif self.test_mode == 'large_packet':
if self.recv_count % 5 == 0:
print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
else:
if self.recv_count % 10 == 0:
try:
msg_str = message.decode('utf-8', errors='ignore')
if msg_str.startswith("ACK"):
print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count}")
except:
pass
def stop_client(self):
"""停止客户端"""
if not self.running:
return
# 尝试发送断开通知
if self.connected and self.socket:
try:
disconnect_msg = f"BYE Client{self.id:02d}\r\n"
self.socket.settimeout(0.1)
self.socket.send(disconnect_msg.encode())
except:
pass
# 关闭socket
if self.socket:
try:
self.socket.close()
except:
pass
if self.connected:
stats.clients_connected -= 1
self.connected = False
# 计算统计信息
avg_interval = 0
send_rate = 0
throughput = 0
if len(self.send_times) > 1:
total_time = self.send_times[-1] - self.send_times[0]
avg_interval = total_time / (len(self.send_times) - 1)
if total_time > 0:
send_rate = self.sent_count / total_time
throughput = self.bytes_sent / total_time
# 根据测试模式显示不同统计信息
if self.test_mode == 'long_term':
print(f"[Client{self.id:02d}] 停止,运行{total_time:.0f}秒, 发送: {self.sent_count}, "
f"接收: {self.recv_count}, 平均间隔: {avg_interval:.2f}s, "
f"平均大小: {self.bytes_sent/self.sent_count if self.sent_count > 0 else 0:.0f}字节")
elif self.test_mode == 'recovery':
status = "重连成功" if self.reconnection_successful else "未重连"
print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
f"重连尝试: {self.reconnect_attempts}, 状态: {status}")
elif self.test_mode == 'mixed_load':
print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
f"间隔: {self.message_interval}s, 大小: {self.message_size}字节, "
f"吞吐: {throughput/1024:.1f}KB/s")
elif self.test_mode == 'high_freq':
print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
f"速率: {send_rate:.1f}条/秒")
elif self.test_mode == 'large_packet':
print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
f"吞吐: {throughput/1024:.1f}KB/s")
else:
print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
f"间隔: {avg_interval:.2f}秒")
self.running = False
# ==================== 主程序 ====================
def main_v35():
print("W5500 Windows客户端 - V3.5 (增加长时间稳定性测试)")
print("="*60)
print(f"服务器: {CONFIG['server_ip']}:{CONFIG['server_port']}")
# 选择测试模式
print("\n选择测试模式:")
print("1. 常规压力测试 (1秒/条, 64字节)")
print("2. 高频消息测试 (0.2秒/条, 64字节)")
print("3. 大数据包测试 (2秒/条, 1024字节)")
print("4. 混合负载测试 (8种不同配置)")
print("5. 异常恢复测试 (断开重连验证)")
print("6. 长时间稳定性测试 (1小时随机负载)")
choice = input("请输入选择 (1, 2, 3, 4, 5 或 6): ").strip()
if choice == "2":
CONFIG['test_mode'] = 'high_freq'
print("\n⚠️ 警告:高频测试将产生大量网络流量")
print(f" 理论吞吐量: {TEST_MODES['high_freq']['client_count'] / TEST_MODES['high_freq']['message_interval']:.0f} 条/秒")
elif choice == "3":
CONFIG['test_mode'] = 'large_packet'
print("\n⚠️ 警告:大数据包测试将产生大量网络数据")
print(f" 理论数据量: {TEST_MODES['large_packet']['client_count'] / TEST_MODES['large_packet']['message_interval'] * TEST_MODES['large_packet']['message_size']/1024:.1f} KB/s")
elif choice == "4":
CONFIG['test_mode'] = 'mixed_load'
print("\n⚠️ 警告:混合负载测试将模拟真实场景")
print(" 客户端配置:")
for i, config in enumerate(TEST_MODES['mixed_load']['clients_config'], 1):
print(f" 客户端{i}: 间隔{config['interval']}s, 大小{config['size']}字节")
elif choice == "5":
CONFIG['test_mode'] = 'recovery'
print("\n⚠️ 异常恢复测试 - 验证W5500断开重连能力")
print(" 测试阶段:")
for phase in TEST_MODES['recovery']['test_phases']:
print(f" {phase['name']}: {phase['duration']}秒 ({phase['action']})")
print("\n 测试流程:")
print(" 1. 正常连接8个客户端 (10秒)")
print(" 2. 随机断开2个客户端 (5秒)")
print(" 3. 尝试重连断开的客户端 (5秒)")
print(" 4. 验证所有客户端状态 (10秒)")
elif choice == "6":
CONFIG['test_mode'] = 'long_term'
CONFIG['test_duration'] = TEST_MODES['long_term']['duration']
print("\n⚠️ 长时间稳定性测试 - 验证W5500持续运行能力")
print(f" 测试时长: {TEST_MODES['long_term']['duration']/60:.0f}分钟")
print(f" 间隔范围: {TEST_MODES['long_term']['interval_range'][0]}-{TEST_MODES['long_term']['interval_range'][1]}秒")
print(f" 消息大小: {TEST_MODES['long_term']['size_range'][0]}-{TEST_MODES['long_term']['size_range'][1]}字节")
print("\n 监控设置:")
print(f" 内存检查: 每{TEST_MODES['long_term']['monitoring']['memory_check']}秒")
print(f" 连接检查: 每{TEST_MODES['long_term']['monitoring']['connection_check']}秒")
print("\n ⚠️ 建议在稳定的网络环境下进行此测试")
else:
CONFIG['test_mode'] = 'normal'
test_config = TEST_MODES[CONFIG['test_mode']]
stats.test_mode = CONFIG['test_mode']
print(f"\n测试模式: {test_config['test_name']}")
print(f"测试时长: {CONFIG['test_duration']}秒")
if CONFIG['test_mode'] == 'mixed_load':
print(f"客户端数: {test_config['client_count']} (8种不同配置)")
elif CONFIG['test_mode'] == 'recovery':
print(f"客户端数: {test_config['client_count']} (包含断开重连测试)")
print(f"测试阶段: {len(test_config['test_phases'])}个阶段")
elif CONFIG['test_mode'] == 'long_term':
print(f"客户端数: {test_config['client_count']} (随机配置)")
print(f"测试时长: {CONFIG['test_duration']/3600:.1f}小时")
else:
print(f"客户端数: {test_config['client_count']}")
if 'message_interval' in test_config:
print(f"消息间隔: {test_config['message_interval']}秒")
if 'message_size' in test_config:
print(f"消息大小: {test_config['message_size']}字节")
print("="*60)
if CONFIG['test_mode'] == 'long_term':
confirm = input(f"\n⚠️ 长时间测试将运行{CONFIG['test_duration']/3600:.1f}小时,确认开始?(y/n): ").strip().lower()
if confirm != 'y':
print("测试已取消")
return
input("按 Enter 开始测试...")
# 创建客户端
if CONFIG['test_mode'] == 'mixed_load':
clients = []
for i, client_config in enumerate(test_config['clients_config'], 1):
clients.append(ClientV35(i, client_config))
else:
clients = [ClientV35(i+1) for i in range(test_config['client_count'])]
# 启动客户端
print(f"\n启动{len(clients)}个客户端...")
test_start_time = time.time()
stats.start_test()
# 快速连续启动(微小延迟)
for client in clients:
client.start()
time.sleep(0.1) # 仅微小延迟,避免完全同时
# 监控循环
try:
start_monitor_time = time.time()
last_print_time = start_monitor_time
last_sent_count = 0
last_bytes_sent = 0
last_memory_check = start_monitor_time
last_connection_check = start_monitor_time
while time.time() - test_start_time < CONFIG['test_duration']:
active = sum(1 for c in clients if c.running)
if active == 0:
print("\n所有客户端已停止")
break
current_time = time.time()
elapsed = current_time - test_start_time
progress = min(100, (elapsed / CONFIG['test_duration']) * 100)
# 计算实时统计
total_sent = sum(c.sent_count for c in clients)
total_recv = sum(c.recv_count for c in clients)
total_bytes_sent = stats.total_bytes_sent
# 长时间测试的特殊监控
if CONFIG['test_mode'] == 'long_term':
# 定期内存和连接检查
if current_time - last_memory_check >= TEST_MODES['long_term']['monitoring']['memory_check']:
# 记录连接数
stats.record_long_term_stat('connections', active)
last_memory_check = current_time
if current_time - last_connection_check >= TEST_MODES['long_term']['monitoring']['connection_check']:
# 记录吞吐量
throughput = total_bytes_sent / elapsed if elapsed > 0 else 0
stats.record_long_term_stat('throughput', throughput)
last_connection_check = current_time
# 每2秒打印一次监控信息(长时间测试改为每10秒)
monitor_interval = 10.0 if CONFIG['test_mode'] == 'long_term' else 2.0
if current_time - last_print_time >= monitor_interval:
# 计算瞬时速率
time_diff = current_time - last_print_time
sent_diff = total_sent - last_sent_count
bytes_diff = total_bytes_sent - last_bytes_sent
instant_rate = sent_diff / time_diff if time_diff > 0 else 0
instant_throughput = bytes_diff / time_diff if time_diff > 0 else 0
# 计算平均速率
total_elapsed = current_time - start_monitor_time
avg_rate = total_sent / total_elapsed if total_elapsed > 0 else 0
avg_throughput = total_bytes_sent / total_elapsed if total_elapsed > 0 else 0
# 显示监控信息
if CONFIG['test_mode'] == 'long_term':
hours = elapsed / 3600
minutes = (elapsed % 3600) / 60
print(f"\r运行: {hours:.0f}:{minutes:02.0f} | 进度: {progress:.1f}% | 发送: {total_sent} | "
f"接收: {total_recv} | 活动: {active}/{len(clients)} | "
f"吞吐: {avg_throughput/1024:.1f}KB/s", end="")
elif CONFIG['test_mode'] in ['mixed_load', 'large_packet', 'recovery']:
phase_info = f"阶段: {stats.current_phase} | " if CONFIG['test_mode'] == 'recovery' else ""
print(f"\r进度: {progress:.1f}% | {phase_info}发送: {total_sent} | 接收: {total_recv} | "
f"活动: {active}/{len(clients)} | 瞬时: {instant_throughput/1024:.1f}KB/s | "
f"平均: {avg_throughput/1024:.1f}KB/s", end="")
else:
print(f"\r进度: {progress:.1f}% | 发送: {total_sent} | 接收: {total_recv} | "
f"活动: {active}/{len(clients)} | 瞬时: {instant_rate:.1f}条/秒 | "
f"平均: {avg_rate:.1f}条/秒", end="")
last_print_time = current_time
last_sent_count = total_sent
last_bytes_sent = total_bytes_sent
time.sleep(0.1)
print("\n\n测试完成!")
except KeyboardInterrupt:
print("\n\n测试中断")
finally:
# 停止所有客户端
print("\n停止客户端...")
for client in clients:
client.running = False
for client in clients:
if client.thread:
client.thread.join(timeout=1)
stats.end_test()
display_results_v35(clients, test_config)
def display_results_v35(clients, test_config):
print("\n" + "="*60)
print(f"测试结果 - {test_config['test_name']}")
print("="*60)
duration = stats.get_duration()
total_sent = sum(c.sent_count for c in clients)
total_recv = sum(c.recv_count for c in clients)
print(f"测试模式: {test_config['test_name']}")
print(f"测试时长: {duration:.1f}秒 ({duration/3600:.2f}小时)")
print(f"客户端数: {len(clients)}")
print(f"成功连接: {stats.clients_connected}")
print(f"发送消息: {total_sent}")
print(f"接收消息: {total_recv}")
print(f"发送字节: {stats.total_bytes_sent:,}")
print(f"接收字节: {stats.total_bytes_received:,}")
if total_sent > 0:
response_rate = total_recv / total_sent * 100
print(f"响应率: {response_rate:.1f}%")
if duration > 0:
avg_send_rate = total_sent / duration
avg_recv_rate = total_recv / duration
avg_throughput_send = stats.total_bytes_sent / duration
avg_throughput_recv = stats.total_bytes_received / duration
print(f"\n速率统计:")
print(f" 平均发送速率: {avg_send_rate:.1f} 条/秒")
print(f" 平均接收速率: {avg_recv_rate:.1f} 条/秒")
print(f" 发送吞吐量: {avg_throughput_send/1024:.1f} KB/s")
print(f" 接收吞吐量: {avg_throughput_recv/1024:.1f} KB/s")
# 长时间测试额外统计
if stats.test_mode == 'long_term':
if stats.long_term_stats['connection_counts']:
min_conn = min(c[1] for c in stats.long_term_stats['connection_counts'])
max_conn = max(c[1] for c in stats.long_term_stats['connection_counts'])
avg_conn = sum(c[1] for c in stats.long_term_stats['connection_counts']) / len(stats.long_term_stats['connection_counts'])
print(f"\n连接稳定性:")
print(f" 最小连接数: {min_conn}")
print(f" 最大连接数: {max_conn}")
print(f" 平均连接数: {avg_conn:.1f}")
if stats.long_term_stats['throughput_history']:
min_tp = min(t[1] for t in stats.long_term_stats['throughput_history'])
max_tp = max(t[1] for t in stats.long_term_stats['throughput_history'])
avg_tp = sum(t[1] for t in stats.long_term_stats['throughput_history']) / len(stats.long_term_stats['throughput_history'])
print(f"\n吞吐量稳定性:")
print(f" 最小吞吐量: {min_tp/1024:.1f} KB/s")
print(f" 最大吞吐量: {max_tp/1024:.1f} KB/s")
print(f" 平均吞吐量: {avg_tp/1024:.1f} KB/s")
# 异常恢复测试额外统计
if stats.test_mode == 'recovery':
print(f"\n恢复统计:")
print(f" 断开次数: {stats.disconnections}")
print(f" 重连次数: {stats.reconnections}")
print(f" 重连成功率: {stats.reconnections/(stats.disconnections or 1)*100:.1f}%")
# 效率分析
if stats.test_mode == 'high_freq':
theoretical_rate = 1/test_config['message_interval'] * len(clients)
efficiency = avg_send_rate / theoretical_rate * 100
print(f"\n效率分析:")
print(f" 理论吞吐量: {theoretical_rate:.1f} 条/秒")
print(f" 实际吞吐量: {avg_send_rate:.1f} 条/秒")
print(f" 效率: {efficiency:.1f}%")
elif stats.test_mode == 'large_packet':
theoretical_throughput = (1/test_config['message_interval'] * len(clients) *
test_config['message_size'])
actual_throughput = stats.total_bytes_sent / duration
efficiency = actual_throughput / theoretical_throughput * 100
print(f"\n效率分析:")
print(f" 理论吞吐量: {theoretical_throughput/1024:.1f} KB/s")
print(f" 实际吞吐量: {actual_throughput/1024:.1f} KB/s")
print(f" 效率: {efficiency:.1f}%")
elif stats.test_mode == 'mixed_load':
# 计算混合负载的理论吞吐量
theoretical_throughput = 0
for client in clients:
theoretical_throughput += (1/client.message_interval * client.message_size)
actual_throughput = stats.total_bytes_sent / duration
efficiency = actual_throughput / theoretical_throughput * 100
print(f"\n效率分析:")
print(f" 理论吞吐量: {theoretical_throughput/1024:.1f} KB/s")
print(f" 实际吞吐量: {actual_throughput/1024:.1f} KB/s")
print(f" 效率: {efficiency:.1f}%")
print(f"\n错误统计:")
print(f" 连接错误: {stats.connection_errors}")
print(f" 发送错误: {stats.send_errors}")
print(f" 接收错误: {stats.receive_errors}")
print(f"\n客户端详情:")
client_stats = []
for client in clients:
if len(client.send_times) > 1:
total_time = client.send_times[-1] - client.send_times[0]
avg_interval = total_time / (len(client.send_times) - 1)
send_rate = len(client.send_times) / total_time if total_time > 0 else 0
throughput = client.bytes_sent / total_time if total_time > 0 else 0
else:
avg_interval = 0
send_rate = 0
throughput = 0
client_stats.append({
'id': client.id,
'sent': client.sent_count,
'recv': client.recv_count,
'bytes_sent': client.bytes_sent,
'interval': client.message_interval if stats.test_mode in ['mixed_load', 'recovery', 'long_term'] else avg_interval,
'size': client.message_size if stats.test_mode in ['mixed_load', 'long_term'] else 0,
'rate': send_rate,
'throughput': throughput,
'reconnect_attempts': client.reconnect_attempts if stats.test_mode in ['recovery', 'long_term'] else 0
})
if stats.test_mode == 'long_term':
avg_size = client.bytes_sent / client.sent_count if client.sent_count > 0 else 0
print(f" 客户端{client.id:02d}: 运行{total_time:.0f}秒, 发送{client.sent_count:6d}, "
f"接收{client.recv_count:6d}, 平均间隔{avg_interval:.2f}s, 平均大小{avg_size:.0f}字节")
elif stats.test_mode == 'recovery':
status = "重连成功" if client.reconnection_successful else "正常"
print(f" 客户端{client.id:02d}: 发送{client.sent_count:4d}, 接收{client.recv_count:4d}, "
f"重连尝试: {client.reconnect_attempts}, 状态: {status}")
elif stats.test_mode == 'mixed_load':
print(f" 客户端{client.id:02d}: 间隔{client.message_interval}s, "
f"大小{client.message_size}字节, 发送{client.sent_count:4d}, "
f"接收{client.recv_count:4d}, 吞吐{throughput/1024:.1f}KB/s")
elif stats.test_mode == 'high_freq':
print(f" 客户端{client.id:02d}: 发送{client.sent_count:6d}, "
f"接收{client.recv_count:6d}, 速率{send_rate:.1f}条/秒")
elif stats.test_mode == 'large_packet':
print(f" 客户端{client.id:02d}: 发送{client.sent_count:4d}, "
f"接收{client.recv_count:4d}, 吞吐{throughput/1024:.1f}KB/s")
else:
print(f" 客户端{client.id:02d}: 发送{client.sent_count:4d}, "
f"接收{client.recv_count:4d}, 间隔{avg_interval:.2f}秒")
# 计算均匀性
if client_stats and stats.test_mode not in ['mixed_load', 'recovery', 'long_term']:
sent_counts = [s['sent'] for s in client_stats]
max_sent = max(sent_counts)
min_sent = min(sent_counts)
avg_sent = sum(sent_counts) / len(sent_counts)
if avg_sent > 0:
uniformity = (1 - (max_sent - min_sent) / avg_sent) * 100
print(f"\n消息分发均匀性: {uniformity:.1f}%")
print(f" 最多: {max_sent}, 最少: {min_sent}, 平均: {avg_sent:.1f}")
print("="*60)
# 保存测试结果
save_test_results_v35(clients, test_config, duration)
def save_test_results_v35(clients, test_config, duration):
"""保存测试结果到文件"""
try:
result = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'test_mode': test_config['test_name'],
'duration': duration,
'clients': len(clients),
'total_sent': sum(c.sent_count for c in clients),
'total_received': sum(c.recv_count for c in clients),
'total_bytes_sent': stats.total_bytes_sent,
'total_bytes_received': stats.total_bytes_received,
'connection_errors': stats.connection_errors,
'send_errors': stats.send_errors,
'receive_errors': stats.receive_errors,
'disconnections': stats.disconnections,
'reconnections': stats.reconnections,
'client_details': []
}
if CONFIG['test_mode'] == 'mixed_load':
result['clients_config'] = TEST_MODES['mixed_load']['clients_config']
elif CONFIG['test_mode'] == 'recovery':
result['test_phases'] = TEST_MODES['recovery']['test_phases']
elif CONFIG['test_mode'] == 'long_term':
result['monitoring'] = TEST_MODES['long_term']['monitoring']
result['interval_range'] = TEST_MODES['long_term']['interval_range']
result['size_range'] = TEST_MODES['long_term']['size_range']
result['long_term_stats'] = stats.long_term_stats
for client in clients:
if len(client.send_times) > 1:
total_time = client.send_times[-1] - client.send_times[0]
send_rate = len(client.send_times) / total_time if total_time > 0 else 0
throughput = client.bytes_sent / total_time if total_time > 0 else 0
else:
send_rate = 0
throughput = 0
client_detail = {
'id': client.id,
'sent': client.sent_count,
'received': client.recv_count,
'bytes_sent': client.bytes_sent,
'send_rate': send_rate,
'throughput': throughput
}
if CONFIG['test_mode'] == 'mixed_load':
client_detail['interval'] = client.message_interval
client_detail['message_size'] = client.message_size
elif CONFIG['test_mode'] in ['recovery', 'long_term']:
client_detail['reconnect_attempts'] = client.reconnect_attempts
client_detail['reconnection_successful'] = client.reconnection_successful
if CONFIG['test_mode'] == 'long_term':
client_detail['final_interval'] = client.message_interval
client_detail['final_message_size'] = client.message_size
result['client_details'].append(client_detail)
filename = f"w5500_{test_config['test_name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"\n测试结果已保存到: {filename}")
except Exception as e:
print(f"保存测试结果失败: {e}")
if __name__ == "__main__":
main_v35()
W5500 TCP压力测试服务器 - V3.5 (增强socket回收版) socket回收仍有小问题需修改
"""
W5500 TCP压力测试服务器 - V3.5 (增强socket回收版)
基于V3.2成功经验,简化逻辑
"""
import time
import gc
import wiznet5k_socket as socket_module
try:
import uasyncio as asyncio
except ImportError:
import asyncio
# ==================== 服务器配置 ====================
SERVER_CONFIG = {
'port': 9050, # 服务器端口
'max_clients': 8, # 真正的8路并发
'test_duration': 120, # 测试时长
'backlog': 10, # 监听队列大小
'heartbeat_interval': 8.0,
'client_timeout': 10.0, # 10秒超时
'max_packet_size': 2048, # 支持最大2KB数据包
}
# ==================== 全局状态 ====================
class ServerStatsV35:
"""服务器统计信息"""
def __init__(self):
self.reset()
self._lock = asyncio.Lock()
self.start_time = 0
self.connections = 0
self.messages_received = 0
self.messages_sent = 0
self.bytes_received = 0
self.bytes_sent = 0
self.connection_errors = 0
self.socket_alloc_errors = 0
self.eighth_client_success = 0 # 第8路成功次数
self.listen_socket_consumed = 0 # 监听socket被使用次数
self.active_clients = 0
self.client_handlers = {}
self.bye_messages_received = 0 # 收到的BYE消息数
self.large_packets_received = 0 # 大数据包计数
self._lock = asyncio.Lock()
def reset(self):
"""重置所有统计信息(服务器重启时调用)"""
self.start_time = 0
self.connections = 0
self.messages_received = 0
self.messages_sent = 0
self.bytes_received = 0
self.bytes_sent = 0
self.connection_errors = 0
self.socket_alloc_errors = 0
self.eighth_client_success = 0 # 第8路成功次数
self.listen_socket_consumed = 0 # 监听socket被使用次数
self.active_clients = 0
self.client_handlers = {}
self.bye_messages_received = 0 # 收到的BYE消息数
self.large_packets_received = 0 # 大数据包计数
async def add_connection(self):
async with self._lock:
self.connections += 1
async def add_message_received(self, bytes_count):
async with self._lock:
self.messages_received += 1
self.bytes_received += bytes_count
# 统计大数据包
if bytes_count >= 512: # 512字节以上算大数据包
self.large_packets_received += 1
async def add_message_sent(self, bytes_count):
async with self._lock:
self.messages_sent += 1
self.bytes_sent += bytes_count
async def add_connection_error(self):
async with self._lock:
self.connection_errors += 1
async def add_socket_alloc_error(self):
async with self._lock:
self.socket_alloc_errors += 1
async def add_eighth_client_success(self):
async with self._lock:
self.eighth_client_success += 1
async def add_listen_socket_consumed(self):
async with self._lock:
self.listen_socket_consumed += 1
async def add_bye_message(self):
async with self._lock:
self.bye_messages_received += 1
async def client_connected(self, client_id, is_eighth_client=False):
async with self._lock:
self.active_clients += 1
self.client_handlers[client_id] = {
'connected_time': time.time(),
'message_count': 0,
'bytes_received': 0,
'last_activity': time.time(),
'is_eighth_client': is_eighth_client
}
if is_eighth_client:
self.eighth_client_success += 1
async def client_disconnected(self, client_id):
async with self._lock:
if client_id in self.client_handlers:
self.active_clients -= 1
# 保留记录用于统计,但标记为已断开
self.client_handlers[client_id]['disconnected'] = True
self.client_handlers[client_id]['disconnect_time'] = time.time()
async def update_client_activity(self, client_id, message_size=0):
async with self._lock:
if client_id in self.client_handlers:
self.client_handlers[client_id]['last_activity'] = time.time()
if message_size > 0:
self.client_handlers[client_id]['message_count'] += 1
self.client_handlers[client_id]['bytes_received'] += message_size
async def get_stats(self):
async with self._lock:
client_details = {}
for client_id, info in self.client_handlers.items():
client_details[client_id] = {
'uptime': time.time() - info['connected_time'],
'message_count': info['message_count'],
'bytes_received': info['bytes_received'],
'idle_time': time.time() - info['last_activity'],
'is_eighth_client': info.get('is_eighth_client', False),
'disconnected': info.get('disconnected', False)
}
return {
'uptime': time.time() - self.start_time if self.start_time > 0 else 0,
'connections': self.connections,
'messages_received': self.messages_received,
'messages_sent': self.messages_sent,
'bytes_received': self.bytes_received,
'bytes_sent': self.bytes_sent,
'connection_errors': self.connection_errors,
'socket_alloc_errors': self.socket_alloc_errors,
'eighth_client_success': self.eighth_client_success,
'listen_socket_consumed': self.listen_socket_consumed,
'bye_messages_received': self.bye_messages_received,
'large_packets_received': self.large_packets_received,
'active_clients': self.active_clients,
'client_details': client_details
}
# 全局统计实例
stats = ServerStatsV35()
# 服务器运行状态
server_running = False
# W5500硬件限制:只有8个socket
MAX_W5500_SOCKETS = 8
# ==================== 优化的服务器协程 ====================
async def tcp_server_v35():
"""
TCP服务器主协程(V3.5版 - 增强socket回收)
基于V3.2成功经验
"""
global server_running
print("🚀 TCP压力测试服务器启动 (V3.5 - 增强socket回收版)")
print("="*60)
print(f"⚠️ W5500硬件限制: 最多{MAX_W5500_SOCKETS}个socket")
print(f"🔥 核心功能: 第8路连接建立后,监听socket转为客户端socket")
print("="*60)
# 获取网络接口
try:
from main import global_vars
nic = global_vars.get('nic')
except:
print("❌ 无法获取网络接口")
return
if not nic:
print("❌ 网络接口不可用")
return
try:
# 设置socket接口
socket_module.set_interface(nic)
server_ip = nic.pretty_ip(nic.ip_address)
# 创建服务器socket
server = socket_module.socket()
server.settimeout(0.5)
# 绑定端口
port = SERVER_CONFIG['port']
server.bind((server_ip, port))
server.listen(SERVER_CONFIG['max_clients'])
# 初始化统计
stats.start_time = time.time()
server_running = True
print(f"✅ 服务器: {server_ip}:{port}")
print(f"📡 最大连接: {SERVER_CONFIG['max_clients']}路 (真正的8路并发)")
print(f"📦 最大包大小: {SERVER_CONFIG['max_packet_size']}字节")
print(f"⏱️ 测试时长: {SERVER_CONFIG['test_duration']}秒")
print(f"初始监听socket编号: {server.socknum}")
print("="*60)
print("等待客户端连接...\n")
# 启动监控协程
asyncio.create_task(stats_monitor_v35())
# 启动socket状态监控
async def socket_recycle_monitor():
"""监控socket回收状态"""
print("🔄 Socket回收监控启动")
while server_running:
await asyncio.sleep(10) # 每10秒检查一次
try:
# 打印当前socket使用情况
from main import global_vars
nic = global_vars.get('nic')
if nic and hasattr(nic, '_debug_sockets'):
print(f"🔄 Socket状态检查 ({time.time() - stats.start_time:.0f}秒):")
nic._debug_sockets()
except:
pass
asyncio.create_task(socket_recycle_monitor())
client_id = 1
listen_socket_active = True
listen_socket_used_as_client = False
# 主服务器循环
while server_running:
try:
# 检查当前连接数
stat_data = await stats.get_stats()
active_clients = stat_data['active_clients']
# 检查是否达到第8路情况
if active_clients >= MAX_W5500_SOCKETS - 1:
if listen_socket_active and not listen_socket_used_as_client:
print(f"🔥 第8路情况:当前连接 {active_clients}/{MAX_W5500_SOCKETS}")
# 只在需要监听时才监听
if not listen_socket_active:
await asyncio.sleep(1.0)
continue
try:
# 尝试接受新连接
result = server.accept()
if result is not None:
client_sock, addr = result
# 检查socket是否有效
if not hasattr(client_sock, 'connected') or not client_sock.connected:
print(f"[{client_id}] Socket无效,关闭")
try:
client_sock.close()
except:
pass
continue
# 检查并处理第8路客户端
is_eighth_client = False
if active_clients >= MAX_W5500_SOCKETS - 1:
print(f"🎯 第{client_id}路客户端激活!")
if active_clients == MAX_W5500_SOCKETS - 1:
is_eighth_client = True
listen_socket_active = False
listen_socket_used_as_client = True
await stats.add_listen_socket_consumed()
print(f" ⚠️ 监听socket已用于第{client_id}路客户端")
# 创建客户端处理协程
asyncio.create_task(
handle_client_v35(client_id, client_sock, addr, is_eighth_client)
)
print(f"[{client_id}] 已连接 (socket {client_sock.socknum}), " +
f"当前连接: {active_clients + 1}/{MAX_W5500_SOCKETS}")
client_id += 1
await asyncio.sleep(0.05)
except Exception as e:
err_str = str(e)
if "Failed to allocate socket" in err_str:
await stats.add_socket_alloc_error()
print(f"⛔ W5500 socket分配失败: 所有{MAX_W5500_SOCKETS}个socket都在使用中")
print(f" 当前连接数: {active_clients}/{MAX_W5500_SOCKETS}")
# 关键修复:等待并重试机制
print(f" ⏳ 等待socket资源释放...")
for retry_count in range(3): # 最多重试3次
try:
# 等待一段时间让socket完全释放
await asyncio.sleep(2.0) # 每次等待2秒
# 检查当前连接数
stat_data = await stats.get_stats()
current_active = stat_data['active_clients']
print(f" 重试 {retry_count+1}/3: 当前活动连接 {current_active}/{MAX_W5500_SOCKETS}")
# 如果有socket释放了,尝试重新接受连接
if current_active < MAX_W5500_SOCKETS:
print(f" ✅ 检测到socket资源已释放,继续接受连接")
break
except Exception as retry_err:
print(f" 重试过程中出错: {retry_err}")
try:
status = server.status
print(f" 监听socket状态: 0x{status:02X}")
if status == 0x17: # 已建立连接
print(f" ✅ 监听socket已建立连接(第{client_id}个客户端)")
try:
remote_ip = server.getpeername()
print(f" 客户端地址: {remote_ip}")
addr = remote_ip
except:
print(f" 无法获取客户端地址,使用默认地址")
addr = ("0.0.0.0", 0)
listen_socket_active = False
listen_socket_used_as_client = True
await stats.add_listen_socket_consumed()
is_eighth_client = (active_clients == MAX_W5500_SOCKETS - 1)
asyncio.create_task(
handle_client_v35(client_id, server, addr, is_eighth_client)
)
if is_eighth_client:
print(f"🎯 [{client_id}] 第8路客户端已连接 (使用监听socket)")
await stats.add_eighth_client_success()
else:
print(f"[{client_id}] 客户端已连接 (使用监听socket)")
print(f" ⚠️ 监听socket已转为客户端socket")
client_id += 1
elif status == 0x13: # 监听状态
print(f" ⏳ 监听socket仍在监听状态")
await asyncio.sleep(0.5)
else:
print(f" ❓ 监听socket状态异常: 0x{status:02X}")
await asyncio.sleep(1.0)
except Exception as status_e:
print(f" 状态检查失败: {status_e}")
await asyncio.sleep(1.0)
elif "timeout" not in err_str:
print(f"接受连接错误: {e}")
await stats.add_connection_error()
await asyncio.sleep(0.5)
await asyncio.sleep(0.01)
except asyncio.CancelledError:
break
except Exception as e:
print(f"服务器循环错误: {e}")
await asyncio.sleep(0.5)
print("服务器主循环结束")
except Exception as e:
print(f"❌ 服务器启动失败: {e}")
import sys
sys.print_exception(e)
finally:
if server_running:
await stop_server_v35()
# ==================== V3.5客户端处理器 ====================
async def handle_client_v35(client_id, client_sock, addr, is_eighth_client=False):
"""处理客户端连接(V3.5版)"""
client_ip, client_port = addr if isinstance(addr, tuple) and len(addr) == 2 else ("0.0.0.0", 0)
if is_eighth_client:
print(f"🎯 [{client_id}] 第8路客户端连接: {client_ip}:{client_port} (使用原监听socket)")
else:
print(f"[{client_id}] 客户端连接: {client_ip}:{client_port}")
await stats.client_connected(client_id, is_eighth_client)
await stats.add_connection()
try:
# 发送欢迎消息
if is_eighth_client:
welcome = f"服务器客户端 #{client_id} (第8路特殊连接)\r\n>\r\n"
else:
welcome = f"服务器客户端 #{client_id} (8路压力测试)\r\n>\r\n"
try:
client_sock.settimeout(1.0)
sent = client_sock.send(welcome.encode())
if sent > 0:
await stats.add_message_sent(sent)
except Exception as send_err:
print(f"[{client_id}] 发送欢迎消息失败: {send_err}")
buffer = bytearray()
last_heartbeat = time.time()
message_count = 0
client_active = True
last_successful_recv = time.time()
consecutive_timeouts = 0
bye_received = False # 标记是否收到BYE消息
while server_running and client_active:
current_time = time.time()
# 更新客户端活动时间
await stats.update_client_activity(client_id)
# 检查客户端超时
stat_data = await stats.get_stats()
client_info = stat_data['client_details'].get(client_id, {})
idle_time = client_info.get('idle_time', 0)
if idle_time > SERVER_CONFIG['client_timeout']:
print(f"[{client_id}] 客户端超时 (空闲{idle_time:.1f}秒)")
break
# 发送心跳
if current_time - last_heartbeat > SERVER_CONFIG['heartbeat_interval']:
try:
# 发送前检查socket状态
if hasattr(client_sock, 'status'):
heartbeat_status = client_sock.status
if heartbeat_status != 0x17:
print(f"[{client_id}] 发送心跳前socket状态异常: 0x{heartbeat_status:02X}")
client_active = False
break
heartbeat = f"PING {time.ticks_ms()}\r\n"
client_sock.settimeout(0.5)
sent = client_sock.send(heartbeat.encode())
if sent > 0:
await stats.add_message_sent(sent)
print(f"💓 [{client_id}] 发送心跳包")
last_heartbeat = current_time
except Exception as heartbeat_err:
print(f"[{client_id}] 发送心跳失败: {heartbeat_err}")
break
# 接收数据
try:
client_sock.settimeout(0.5)
# 新增:检查socket状态 BEFORE 尝试接收
if hasattr(client_sock, 'status'):
status = client_sock.status
if status != 0x17: # 不是ESTABLISHED状态
print(f"[{client_id}] ⚠️ 接收前检查: socket状态异常 0x{status:02X}")
# 如果是可恢复状态,尝试修复
if status in [0x1C, 0x22]: # UDP或CLOSE_WAIT状态
try:
print(f"[{client_id}] 尝试重置异常socket状态...")
# 先尝试优雅关闭
client_sock.close()
await asyncio.sleep(0.1)
# 然后立即重新创建socket(如果可能)
# 但这里我们先标记断开
client_active = False
break
except:
client_active = False
break
if hasattr(client_sock, 'available') and client_sock.available() > 0:
# 动态调整接收大小 - 增加限制
available_bytes = client_sock.available()
# 关键修复:限制单次接收大小,避免缓冲区溢出
max_safe_recv = min(
SERVER_CONFIG['max_packet_size'] * 2,
available_bytes,
1024 # 新增:硬性限制,避免过大读取
)
if max_safe_recv > 0:
data = client_sock.recv(max_safe_recv)
if data:
buffer.extend(data)
message_count += 1
consecutive_timeouts = 0
last_successful_recv = current_time
await stats.add_message_received(len(data))
await stats.update_client_activity(client_id, len(data))
# 处理完整的消息
while b'\r\n' in buffer:
idx = buffer.index(b'\r\n')
message = bytes(buffer[:idx])
buffer = buffer[idx+2:]
if message:
try:
msg_str = message.decode('utf-8', errors='ignore').strip()
# 检查是否是心跳响应
if msg_str.startswith("PONG") or msg_str.startswith("PING"):
print(f"💓 [{client_id}] 收到心跳响应: {msg_str[:30]}...")
continue
# 检查是否是BYE消息
bye_keywords = ["BYE", "bye", "Bye", "goodbye", "Goodbye"]
if any(keyword in msg_str for keyword in bye_keywords):
print(f"👋 [{client_id}] 收到断开通知: {msg_str}")
await stats.add_bye_message()
bye_received = True
client_active = False
break
# 检查是否是ACK响应
if msg_str.startswith("ACK"):
print(f"✓ [{client_id}] 收到ACK: {msg_str[:30]}...")
continue
# 普通消息显示
if len(message) >= 512:
print(f"[{client_id}] 收到#{message_count}: {len(message)}字节 (大数据包)")
elif message_count % 10 == 0:
if len(msg_str) > 30:
msg_str = msg_str[:30] + "..."
print(f"[{client_id}] 收到#{message_count}: {msg_str}")
except:
# 二进制数据,只显示大小
if len(message) >= 512:
print(f"[{client_id}] 收到#{message_count}: {len(message)}字节 (二进制大数据包)")
elif message_count % 10 == 0:
print(f"[{client_id}] 收到#{message_count}: {len(message)}字节 (二进制数据)")
# 发送ACK响应 - 增加错误处理
try:
response = f"ACK {len(message)} {time.ticks_ms()}\r\n".encode()
# 发送前检查socket状态
if hasattr(client_sock, 'status'):
send_status = client_sock.status
if send_status == 0x17:
client_sock.send(response)
await stats.add_message_sent(len(response))
else:
print(f"[{client_id}] 发送ACK时socket状态异常: 0x{send_status:02X}")
client_active = False
break
else:
client_sock.send(response)
await stats.add_message_sent(len(response))
except Exception as ack_err:
print(f"[{client_id}] 发送ACK失败: {ack_err}")
client_active = False
break
else:
# 没有数据可读
consecutive_timeouts += 1
# 新增:长时间无数据时检查socket健康状态
if consecutive_timeouts > 10:
try:
if hasattr(client_sock, 'status'):
status = client_sock.status
if status != 0x17:
print(f"[{client_id}] 长时间无数据且状态异常: 0x{status:02X}")
client_active = False
break
except:
pass
if consecutive_timeouts > 30: # 15秒没有数据
print(f"[{client_id}] 连续超时次数过多,断开连接")
client_active = False
break
await asyncio.sleep(0.1)
else:
# 没有可用数据
consecutive_timeouts += 1
if consecutive_timeouts > 10:
await asyncio.sleep(0.2) # 增加等待时间
else:
await asyncio.sleep(0.1)
except Exception as recv_err:
err_str = str(recv_err)
if "timeout" not in err_str:
print(f"[{client_id}] 接收错误: {recv_err}")
client_active = False
await asyncio.sleep(0.01)
# 客户端断开连接
if bye_received:
if is_eighth_client:
print(f"🎯 [{client_id}] 第8路客户端正常断开 (收到BYE),接收消息: {message_count}条")
else:
print(f"[{client_id}] 客户端正常断开 (收到BYE),接收消息: {message_count}条")
else:
if is_eighth_client:
print(f"🎯 [{client_id}] 第8路客户端异常断开,接收消息: {message_count}条")
else:
print(f"[{client_id}] 客户端异常断开,接收消息: {message_count}条")
except Exception as e:
print(f"[{client_id}] 客户端处理异常: {e}")
import sys
sys.print_exception(e)
finally:
# 清理资源 - 增强版socket回收
try:
if hasattr(client_sock, 'socknum'):
socknum = client_sock.socknum
print(f"[{client_id}] 开始深度清理socket {socknum}...")
# 第一步:获取当前状态
if hasattr(client_sock, 'status'):
status = client_sock.status
print(f"[{client_id}] socket {socknum} 当前状态: 0x{status:02X}")
# 第二步:尝试优雅关闭
try:
# 先尝试shutdown(如果支持)
if hasattr(client_sock, 'shutdown'):
client_sock.shutdown(2) # SHUT_RDWR
await asyncio.sleep(0.05)
except:
pass
# 第三步:正式关闭socket
for attempt in range(3): # 尝试3次确保关闭
try:
client_sock.close()
await asyncio.sleep(0.1) # 关键:给W5500硬件足够时间
# 检查是否真的关闭了
if hasattr(client_sock, 'connected'):
if not client_sock.connected:
print(f"[{client_id}] socket {socknum} 第{attempt+1}次关闭成功")
break
else:
print(f"[{client_id}] socket {socknum} 仍显示连接中,继续关闭...")
else:
print(f"[{client_id}] socket {socknum} 关闭完成")
break
except Exception as close_err:
print(f"[{client_id}] socket {socknum} 关闭出错: {close_err}")
# 第四步:强制释放资源(如果可用)
try:
# 如果socket有reset方法,调用它
if hasattr(client_sock, 'reset'):
client_sock.reset()
print(f"[{client_id}] socket {socknum} 已重置")
except:
pass
print(f"[{client_id}] socket {socknum} 深度清理完成")
except Exception as final_err:
print(f"[{client_id}] 清理过程中出错: {final_err}")
await stats.client_disconnected(client_id)
# ==================== 统计监控协程 ====================
async def stats_monitor_v35():
"""统计信息监控协程(V3.5版)"""
print("📊 统计监控启动 (V3.5)")
last_print_time = 0
while server_running:
current_time = time.time()
if current_time - last_print_time >= 15.0:
stat_data = await stats.get_stats()
uptime = stat_data['uptime']
print("\n" + "="*60)
print("服务器统计 (V3.5 - 增强socket回收版)")
print("="*60)
print(f"运行时间: {uptime:.1f}秒")
print(f"连接总数: {stat_data['connections']}")
print(f"活动连接: {stat_data['active_clients']}/{MAX_W5500_SOCKETS}")
print(f"接收消息: {stat_data['messages_received']}")
print(f"发送消息: {stat_data['messages_sent']}")
print(f"接收字节: {stat_data['bytes_received']:,}")
print(f"发送字节: {stat_data['bytes_sent']:,}")
if stat_data['bytes_received'] > 0 and uptime > 0:
throughput = stat_data['bytes_received'] / uptime
print(f"接收吞吐: {throughput/1024:.1f} KB/s")
print(f"大数据包: {stat_data['large_packets_received']}")
print(f"第8路成功: {stat_data['eighth_client_success']}次")
print(f"监听socket被用: {stat_data['listen_socket_consumed']}次")
print(f"BYE消息收到: {stat_data['bye_messages_received']}")
if stat_data['client_details']:
print("\n客户端详情:")
for client_id, details in sorted(stat_data['client_details'].items()):
eighth_mark = "🎯 " if details.get('is_eighth_client') else " "
status_mark = "❌" if details.get('disconnected') else "✅"
bytes_per_msg = details['bytes_received'] / details['message_count'] if details['message_count'] > 0 else 0
print(f" {eighth_mark}{status_mark}客户端{client_id}: " +
f"运行{details['uptime']:.0f}秒, " +
f"消息{details['message_count']}条, " +
f"{details['bytes_received']}字节, " +
f"平均{bytes_per_msg:.0f}字节/条")
try:
free_mem = gc.mem_free()
alloc_mem = gc.mem_alloc()
total_mem = free_mem + alloc_mem
print(f"\n内存使用: {alloc_mem/(1024):.1f}KB / {total_mem/(1024):.1f}KB " +
f"({alloc_mem/total_mem*100:.1f}%)")
except:
pass
print("="*60)
last_print_time = current_time
await asyncio.sleep(2)
# ==================== 服务器控制函数 ====================
async def stop_server_v35():
"""停止服务器(V3.5版)"""
global server_running
if not server_running:
return
print("\n🛑 停止服务器...")
print("开始清理资源...")
# 标记服务器停止
server_running = False
# 给客户端处理协程时间完成
await asyncio.sleep(3) # 增加到3秒,确保所有socket都关闭
# 强制垃圾回收
gc.collect()
# 关键修复:等待W5500硬件资源完全释放
print(f"⏳ 等待W5500硬件资源释放...")
await asyncio.sleep(2.0) # 额外等待2秒
print(f"✅ 内存清理完成: {gc.mem_free()/1024:.1f}KB 可用")
stat_data = await stats.get_stats()
print("\n" + "="*60)
print("最终统计 (V3.5测试)")
print("="*60)
print(f"运行时间: {stat_data['uptime']:.1f}秒")
print(f"连接总数: {stat_data['connections']}")
print(f"最大并发: {MAX_W5500_SOCKETS}路")
print(f"接收消息: {stat_data['messages_received']:,}")
print(f"发送消息: {stat_data['messages_sent']:,}")
print(f"接收字节: {stat_data['bytes_received']:,}")
print(f"发送字节: {stat_data['bytes_sent']:,}")
if stat_data['bytes_received'] > 0 and stat_data['uptime'] > 0:
throughput = stat_data['bytes_received'] / stat_data['uptime']
print(f"平均吞吐: {throughput/1024:.1f} KB/s")
print(f"大数据包: {stat_data['large_packets_received']}")
print(f"第8路成功: {stat_data['eighth_client_success']}次")
print(f"监听socket被用: {stat_data['listen_socket_consumed']}次")
print(f"BYE消息收到: {stat_data['bye_messages_received']}")
if stat_data['eighth_client_success'] > 0:
print(f"\n✅ 成功实现了真正的8路并发!")
print(f" 第8个客户端使用了监听socket")
else:
print(f"\n⚠️ 未触发第8路情况")
print("="*60)
print("\nV3.5测试完成!")
# ==================== 主入口函数 ====================
def main_v35():
"""主函数(V3.5版)"""
global server_running
print("TCP压力测试服务器 - V3.5 (增强socket回收版)")
print("="*60)
print("修复内容:")
print("1. 手动Ctrl+C退出(无时间限制)")
print("2. 修复BYE消息统计")
print("3. 增强socket关闭和回收机制")
print("4. 增加连接失败重试")
print("="*60)
# 重置所有状态
stats.reset()
server_running = False
# 强制垃圾回收
gc.collect()
# 关键修复:等待之前的socket完全释放
print(f"⏳ 等待任何未释放的socket资源...")
import time as sys_time
sys_time.sleep(1.0) # 同步等待1秒
print(f"✅ 统计信息已重置")
print(f"✅ 内存清理完成: {gc.mem_free()/1024:.1f}KB 可用")
print("="*60)
try:
loop = asyncio.get_event_loop()
# 启动服务器协程
loop.create_task(tcp_server_v35())
print("✅ 服务器已启动")
print("⏰ 运行时间: 无限制 (Ctrl+C退出)")
print("="*60)
# 移除自动停止功能,改为无限运行
loop.run_forever()
except KeyboardInterrupt:
print("\n👤 用户中断 (Ctrl+C)")
except Exception as e:
print(f"❌ 错误: {e}")
import sys
sys.print_exception(e)
finally:
if server_running:
try:
loop.run_until_complete(stop_server_v35())
except:
pass
# 关键修复:服务器完全停止后等待
print(f"\n⏳ 服务器停止完成,等待3秒确保资源释放...")
import time as sys_time
sys_time.sleep(3.0)
if __name__ == "__main__":
main_v35()
测试结果:
[Client07] 收到ACK #1700
[Client03] 配置变化: 间隔0.8s, 大小242字节
运行: 1:59 | 进度: 98.3% | 发送: 10848 | 接收: 14006 | 活动: 8/8 | 吞吐: 1.6KB/s[Client06] 已发送 1300 条, 运行3541秒, 速率: 0.4条/秒
[Client06] 收到ACK #1700
[Client01] 收到ACK #1750
运行: 1:59 | 进度: 98.5% | 发送: 10900 | 接收: 14065 | 活动: 8/8 | 吞吐: 1.6KB/s[Client05] 收到ACK #1650
[Client04] 配置变化: 间隔2.5s, 大小462字节
运行: 1:59 | 进度: 98.8% | 发送: 10949 | 接收: 14125 | 活动: 8/8 | 吞吐: 1.6KB/s[Client05] 配置变化: 间隔3.8s, 大
小335字节
[Client01] 配置变化: 间隔4.9s, 大小807字节
[Client02] 配置变化: 间隔1.7s, 大小256字节
[Client06] 配置变化: 间隔4.9s, 大小289字节
[Client07] 配置变化: 间隔0.6s, 大小262字节
[Client08] 配置变化: 间隔1.3s, 大小393字节
[Client03] 配置变化: 间隔0.8s, 大小344字节
运行: 1:60 | 进度: 99.4% | 发送: 11026 | 接收: 14216 | 活动: 8/8 | 吞吐: 1.6KB/s[Client03] 已发送 1400 条, 运行3583秒, 速率: 0.4条/秒
[Client03] 收到ACK #1800
[Client04] 配置变化: 间隔3.0s, 大小845字节
[Client04] 已发送 1400 条, 运行3584秒, 速率: 0.4条/秒
[Client04] 收到ACK #1800
运行: 1:60 | 进度: 99.7% | 发送: 11060 | 接收: 14260 | 活动: 8/8 | 吞吐: 1.6KB/s[Client05] 配置变化: 间隔3.4s, 大
小566字节
[Client02] 配置变化: 间隔4.9s, 大小981字节
[Client01] 配置变化: 间隔4.2s, 大小904字节
[Client06] 配置变化: 间隔0.7s, 大小817字节
[Client07] 配置变化: 间隔2.3s, 大小428字节
[Client08] 配置变化: 间隔2.5s, 大小653字节
[Client02] 收到ACK #1900
[Client03] 配置变化: 间隔2.2s, 大小560字节
运行: 1:60 | 进度: 99.9% | 发送: 11094 | 接收: 14302 | 活动: 8/8 | 吞吐: 1.6KB/s
测试完成!
停止客户端...
============================================================
测试结果 - 长时间稳定性测试
============================================================
测试模式: 长时间稳定性测试
测试时长: 3601.0秒 (1.00小时)
客户端数: 8
成功连接: 8
发送消息: 11100
接收消息: 14312
发送字节: 5,990,242
接收字节: 247,866
响应率: 128.9%
速率统计:
平均发送速率: 3.1 条/秒
平均接收速率: 4.0 条/秒
发送吞吐量: 1.6 KB/s
接收吞吐量: 0.1 KB/s
连接稳定性:
最小连接数: 8
最大连接数: 8
平均连接数: 8.0
吞吐量稳定性:
最小吞吐量: 1.6 KB/s
最大吞吐量: 2.1 KB/s
平均吞吐量: 1.7 KB/s
错误统计:
连接错误: 0
发送错误: 0
连接稳定性:
最小连接数: 8
最大连接数: 8
平均连接数: 8.0
吞吐量稳定性:
最小吞吐量: 1.6 KB/s
最大吞吐量: 2.1 KB/s
平均吞吐量: 1.7 KB/s
错误统计:
连接错误: 0
发送错误: 0
吞吐量稳定性:
最小吞吐量: 1.6 KB/s
最大吞吐量: 2.1 KB/s
平均吞吐量: 1.7 KB/s
错误统计:
连接错误: 0
发送错误: 0
平均吞吐量: 1.7 KB/s
错误统计:
连接错误: 0
发送错误: 0
错误统计:
连接错误: 0
发送错误: 0
发送错误: 0
接收错误: 0
客户端详情:
客户端01: 运行3597秒, 发送 1373, 接收 1775, 平均间隔2.62s, 平均大小554字节
客户端02: 运行3596秒, 发送 1499, 接收 1901, 平均间隔2.40s, 平均大小504字节
客户端03: 运行3598秒, 发送 1412, 接收 1814, 平均间隔2.55s, 平均大小535字节
客户端04: 运行3597秒, 发送 1404, 接收 1805, 平均间隔2.56s, 平均大小508字节
客户端05: 运行3597秒, 发送 1271, 接收 1672, 平均间隔2.83s, 平均大小549字节
客户端06: 运行3599秒, 发送 1327, 接收 1729, 平均间隔2.71s, 平均大小568字节
客户端07: 运行3599秒, 发送 1347, 接收 1748, 平均间隔2.67s, 平均大小547字节
客户端08: 运行3598秒, 发送 1467, 接收 1868, 平均间隔2.45s, 平均大小557字节
============================================================
测试结果已保存到: w5500_长时间稳定性测试_20260203_152849.json
(.venv) PS D:\python-ljs\.venv>
============================================================
服务器统计 (V3.5 - 增强socket回收版)
============================================================
运行时间: 4222.0秒
连接总数: 8
活动连接: 0/8
接收消息: 11100
发送消息: 14307
接收字节: 5,990,242
发送字节: 247,911
接收吞吐: 1.4 KB/s
大数据包: 5664
第8路成功: 2次
监听socket被用: 1次
BYE消息收到: 0
客户端详情:
❌客户端1: 运行4200秒, 消息1373条, 760840字节, 平均554字节/条
❌客户端2: 运行4200秒, 消息1499条, 755658字节, 平均504字节/条
❌客户端3: 运行4200秒, 消息1412条, 755826字节, 平均535字节/条
❌客户端4: 运行4200秒, 消息1271条, 697871字节, 平均549字节/条
❌客户端5: 运行4199秒, 消息1347条, 736391字节, 平均547字节/条
❌客户端6: 运行4199秒, 消息1404条, 712799字节, 平均508字节/条
❌客户端7: 运行4199秒, 消息1327条, 753507字节, 平均568字节/条
🎯 ❌客户端8: 运行4197秒, 消息1467条, 817350字节, 平均557字节/条
内存使用: 133.8KB / 8058.0KB (1.7%)
============================================================
============================================================
服务器统计 (V3.5 - 增强socket回收版)
============================================================
运行时间: 4238.0秒
连接总数: 8
活动连接: 0/8
接收消息: 11100
发送消息: 14307
接收字节: 5,990,242
发送字节: 247,911
接收吞吐: 1.4 KB/s
大数据包: 5664
第8路成功: 2次
监听socket被用: 1次
BYE消息收到: 0
客户端详情:
❌客户端1: 运行4216秒, 消息1373条, 760840字节, 平均554字节/条
❌客户端2: 运行4216秒, 消息1499条, 755658字节, 平均504字节/条
❌客户端3: 运行4216秒, 消息1412条, 755826字节, 平均535字节/条
❌客户端4: 运行4216秒, 消息1271条, 697871字节, 平均549字节/条
❌客户端5: 运行4215秒, 消息1347条, 736391字节, 平均547字节/条
❌客户端6: 运行4215秒, 消息1404条, 712799字节, 平均508字节/条
❌客户端7: 运行4215秒, 消息1327条, 753507字节, 平均568字节/条
🎯 ❌客户端8: 运行4213秒, 消息1467条, 817350字节, 平均557字节/条
内存使用: 133.8KB / 8058.0KB (1.7%)
============================================================
截图不及时导致显示的客户端都离线了。
第四部分:一些概念的反思
W5500硬件特殊性:
8个物理Socket硬件实现。W5500芯片内置8个独立的物理Socket(编号0-7) 每个Socket独立工作,有自己的寄存器组。硬件限制:最大并发连接数 = 8(包括监听Socket)。
Socket状态机严格 # W5500 Socket状态编码 SNSR_SOCK_CLOSED = 0x00 # 关闭 SNSR_SOCK_INIT = 0x13 # 初始化 SNSR_SOCK_LISTEN = 0x14 # 监听 SNSR_SOCK_ESTABLISHED = 0x17 # 已连接 SNSR_SOCK_UDP = 0x22 # UDP模式
监听与数据Socket分离 监听Socket(LISTEN状态)专门用于接受连接 数据Socket(ESTABLISHED状态)专门用于数据传输 accept()返回的是新的数据Socket
三次握手正确完成:CLOSED → INIT → LISTEN → SYNRECV → ESTABLISHED
Socket编号交换机制:监听socket和数据socket正确分离 多端口监听支持:可同时监听多个端口(受限于8个物理socket) Socket管理相关注意事项 Socket编号交换机制 accept() 方法会交换socket编号
监听socket使用新分配的编号继续监听 数据socket使用原来的编号进行通信
如果多次调用accept(),socket编号会不断变化
思考:后面一段的“Socket管理相关注意事项”与前面“监听与数据Socket分离”不同,到底哪种是正确的?
实际情况分析
1. W5500硬件限制(正确)
-
8个物理Socket硬件实现(Socket 0-7)
-
每个Socket在同一时刻只能处于一种状态
-
无法同时既是LISTEN又是ESTABLISHED
2. 代码中的实现机制
在您的代码中(accept()方法部分):
python
def accept(self):
# ... 等待连接建立 ...
# 关键代码:
new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
current_socknum = self.socknum
# 创建新socket对象
client_sock = socket()
client_sock._socknum = current_socknum # 原来的监听socket编号给客户端
self._socknum = new_listen_socknum # 新分配的socket编号给自己(继续监听)
# 重新绑定和监听
self.bind((None, self._listen_port))
self.listen()
3. 两种描述的对比
描述1("监听与数据Socket分离") ❌ 部分正确但表述不准确
-
正确点:确实有监听Socket(LISTEN状态)和数据Socket(ESTABLISHED状态)
-
不准确点:不是"分离",而是状态转换和编号交换
描述2("Socket编号交换机制") ✅ 更准确
-
精确描述了代码的实际行为
-
accept()确实执行了socket编号交换
-
原来的监听socket变成了数据socket给客户端使用
-
分配的新socket用于继续监听
4. 实际的工作流程
text
初始状态:
服务器: Socket 2 (状态: LISTEN, 端口: 8080)
客户端连接 ->
W5500硬件: 分配新的Socket 3来处理连接
Socket 3进入SYNRECV/ESTABLISHED状态
accept()执行:
1. 获取新Socket编号(3)和客户端地址
2. 创建client_sock对象,使用原来的Socket 2编号
3. 服务器socket对象使用新的Socket 3编号
4. Socket 3重新绑定并进入LISTEN状态
5. 客户端通过Socket 2进行通信
结果:
client_sock: Socket 2 (状态: ESTABLISHED) -> 与客户端通信
服务器socket: Socket 3 (状态: LISTEN) -> 继续接受新连接
5. 总结
-
硬件层面:W5500每个Socket在同一时刻只能有一种状态
-
代码层面:通过编号交换实现了"一个socket对象支持持续监听"
-
实际效果:用户感知上是同一个socket在持续监听,但底层硬件socket编号已经发生了变化
最准确的说法应该是:
"W5500的监听机制通过socket编号交换实现,accept()方法会将当前监听socket转为客户端连接socket,并分配一个新的socket用于继续监听。"
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)