TCP & UDP 协议 + Python Socket 编程 全面总结
·
“TCP 是流协议,就像打电话:你说一句,我说一句,但没人规定‘一句’有多长。”
而 UDP 就像发短信:每条短信是独立的,边界清晰。




































服务器端代码
# -*- coding: utf-8 -*-
# ==============================================================================
# 文件编码声明说明:
# # -*- coding: utf-8 -*- 是 Python 的编码声明,告诉解释器:
# - 这个 .py 文件是以 UTF-8 编码保存的
# - 允许文件中包含中文字符(如注释或字符串)
# - Python 3.7+ 默认使用 UTF-8,但保留此行可提高兼容性和可读性
# ==============================================================================
import socket
# 设置服务器监听的 IP 地址
# HOST = '127.0.0.1'
# 所有合法 host 取值:
# - '127.0.0.1' : 本地回环地址(loopback),仅本机可访问
# - '0.0.0.0' : 监听所有网络接口(所有网卡),外部可访问
# - '' : 等价于 '0.0.0.0'
# - '192.168.1.100' : 本机局域网 IP(需实际存在)
# - '::1' : IPv6 回环地址(配合 AF_INET6 使用)
# - '::' : IPv6 版的 '0.0.0.0',监听所有 IPv6 接口
# 注意:
# - 绑定 '0.0.0.0' 后,客户端仍需用具体 IP(如 192.168.1.100)连接
# - 绑定 '127.0.0.1' 则只能本机连接
HOST = '127.0.0.1'
# 设置服务器监听的端口号
# PORT = 65432
# 端口号范围:0 ~ 65535(16 位无符号整数)
# - 0 : 特殊值,系统自动分配可用端口(调用 bind 后可用 getsockname() 查看)
# - 1~1023 : 系统保留端口(Well-known Ports),需管理员权限才能绑定
# 常见服务:
# 21: FTP, 22: SSH, 23: Telnet, 25: SMTP,
# 53: DNS, 80: HTTP, 110: POP3, 443: HTTPS
# - 1024~49151 : 注册端口(Registered Ports),建议避开知名服务
# - 49152~65535 : 动态/私有端口(Dynamic/Private Ports),推荐用于测试
#
# 注意:
# - 同一时间一个 (IP, PORT) 只能被一个程序绑定
# - 若报错 "Address already in use",可用 netstat 或 lsof 查看占用进程
PORT = 65432
# 定义每次接收数据的最大字节数
# BUFFER_SIZE = 1024
# recv(bufsize) 参数 bufsize 的合法值:
# - 通常为 2 的幂次:512, 1024, 2048, 4096, 8192
# - 最小值:1 字节(不推荐)
# - 最大值:受系统限制,通常不超过 64KB(65536)
# - 实际建议:1024 ~ 8192 之间平衡性能与内存
#
# 注意:
# - bufsize 只是“最多读取”字节数,实际可能返回更少
# - TCP 是流协议,recv 可能分多次返回完整消息
BUFFER_SIZE = 1024
# 创建服务器 socket 对象
# socket.socket(family, type, proto, fileno)
# 参数详解(完整取值范围):
# 1. family: 地址族(Address Family)
# 常见取值:
# - socket.AF_INET : IPv4 协议(最常用)
# - socket.AF_INET6 : IPv6 协议(如 ::1, 支持更大地址空间)
# - socket.AF_UNIX : Unix 域套接字(Linux/Unix 本地进程通信,文件路径)
# - socket.AF_UNSPEC : 未指定,用于 getaddrinfo() 等函数
# - socket.AF_PACKET : 原始套接字(链路层,如抓包工具)
#
# 非法组合:
# - AF_INET + 连接 IPv6 地址 → 报错
# - AF_UNIX + host:port 元组 → 报错(需用路径字符串)
# 2. type: 套接字类型
# 常见取值:
# - socket.SOCK_STREAM : 流式套接字 → TCP(面向连接、可靠)
# - socket.SOCK_DGRAM : 数据报套接字 → UDP(无连接、不可靠)
# - socket.SOCK_RAW : 原始套接字 → 可自定义 IP 头(如 ping)
# - socket.SOCK_RDM : 可靠数据报(极少使用)
# - socket.SOCK_SEQPACKET : 有序分组流(如 SCTP)
# 3. proto: 传输层协议号
# 常见取值(IANA 定义):
# - socket.IPPROTO_TCP = 6 : TCP 协议
# - socket.IPPROTO_UDP = 17 : UDP 协议
# - socket.IPPROTO_ICMP = 1 : ICMP 协议(ping 使用)
# - socket.IPPROTO_RAW = 255 : 原始 IP 包
# - 0 : 自动推断(默认)
#
# 注意:
# - proto 必须与 family 和 type 兼容
# - 例如:SOCK_STREAM 通常配 IPPROTO_TCP
# 4. fileno: 文件描述符
# 取值:
# - None: 新建 socket(最常见)
# - 整数(如 3, 4): 从已有文件描述符创建 socket 对象
# 用于进程间传递 socket 或底层系统调用
# 错误用法:
# - 传入已关闭的 fd → 报错
# - 传入非 socket 的 fd(如普通文件)→ 行为未定义
server_socket = socket.socket(
family=socket.AF_INET, # 使用 IPv4
type=socket.SOCK_STREAM, # 使用 TCP(可靠传输)
proto=socket.IPPROTO_TCP, # 显式指定 TCP 协议(IP 协议号 6)
fileno=None
#新建 socket,不复用已有描述符 (如果你传入一个已有的文件描述符(如 3、4、5...),Python 会用它来创建一个新的 socket 对象,而不创建新的 socket)
)
# 设置 socket 选项:允许重用本地地址
# setsockopt(level, optname, value)
# level: 选项所属协议层
# 常见 level:
# - socket.SOL_SOCKET : Socket 层通用选项
# - socket.IPPROTO_IP : IPv4 层选项
# - socket.IPPROTO_IPV6 : IPv6 层选项
# - socket.IPPROTO_TCP : TCP 层选项
# - socket.SOL_RAW : 原始套接字层
# =============================================================================
# socket.setsockopt(level, optname, value) 各 level 详细说明
#
# 功能:设置 socket 的底层协议栈行为
# 参数:
# level -> 选项所属的协议层(决定该选项作用于哪一层网络协议)
# optname -> 具体选项名称
# value -> 选项值(类型根据选项而定)
#
# 每个 level 对应不同的协议层级,控制不同层次的网络行为
# =============================================================================
# =============================================================================
# 1. socket.SOL_SOCKET - Socket 层通用选项(最常用)
# 说明:这是 socket 的抽象层,适用于所有类型的 socket(TCP、UDP、RAW 等)
# 控制 socket 本身的通用行为,与具体传输协议无关
# 常见选项:
# - SO_REUSEADDR : 允许重用本地地址(避免 "Address already in use" 错误)
# - SO_REUSEPORT : 允许多个 socket 绑定同一端口(Linux/macOS 多进程场景)
# - SO_RCVBUF : 设置接收缓冲区大小(字节),影响吞吐量
# - SO_SNDBUF : 设置发送缓冲区大小(字节)
# - SO_LINGER : 控制 close() 时未发送数据的处理方式(优雅关闭)
# - SO_KEEPALIVE : 启用 TCP 保活机制,检测断线连接
# - SO_BROADCAST : 允许发送 UDP 广播包
# - SO_ERROR : 获取 socket 最近错误状态(只读)
# - SO_TYPE : 获取 socket 类型(如 SOCK_STREAM,只读)
# =============================================================================
# =============================================================================
# 2. socket.IPPROTO_IP - IPv4 协议层选项
# 说明:作用于 IPv4 协议层,仅适用于 AF_INET(IPv4)socket
# 控制 IP 包的封装行为,如服务类型、多播、自定义 IP 头等
# 常见选项:
# - IP_TOS : 设置 IP 包的服务类型(Type of Service),用于 QoS(服务质量)
# - IP_TTL : 设置 IP 包的生存时间(Time To Live)
# - IP_HDRINCL : 在原始套接字中包含 IP 头(Windows 特有)
# - IP_MULTICAST_TTL : 设置多播包的 TTL(生存时间)
# - IP_MULTICAST_LOOP: 是否允许多播包在本地回环(loopback)
# - IP_MULTICAST_IF : 指定多播包的出站网卡接口
# - IP_ADD_MEMBERSHIP : 加入一个 IPv4 多播组(需传入 struct 地址)
# - IP_DROP_MEMBERSHIP: 离开一个 IPv4 多播组
# =============================================================================
# =============================================================================
# 3. socket.IPPROTO_IPV6 - IPv6 协议层选项
# 说明:作用于 IPv6 协议层,仅适用于 AF_INET6(IPv6)socket
# 功能与 IPPROTO_IP 类似,但针对 IPv6 协议
# 常见选项:
# - IPV6_V6ONLY : 强制 socket 只使用 IPv6(禁用 IPv4 映射地址)
# - IPV6_TCLASS : 设置 IPv6 流量类别(Traffic Class),类似 IPv4 的 TOS
# - IPV6_UNICAST_HOPS: 设置单播包的跳数限制
# - IPV6_MULTICAST_HOPS: 设置多播包的跳数
# - IPV6_MULTICAST_LOOP: 是否允许多播包本地回环
# - IPV6_ADD_MEMBERSHIP: 加入一个 IPv6 多播组
# - IPV6_DROP_MEMBERSHIP: 离开一个 IPv6 多播组
# =============================================================================
# =============================================================================
# 4. socket.IPPROTO_TCP - TCP 协议层选项
# 说明:作用于 TCP 协议层,仅适用于 SOCK_STREAM(TCP)socket
# 控制 TCP 连接的行为,如 Nagle 算法、保活机制、拥塞控制等
# 常见选项:
# - TCP_NODELAY : 禁用 Nagle 算法,立即发送小数据包(低延迟场景)
# - TCP_KEEPIDLE : TCP 保活开始前的空闲时间(秒,Linux)
# - TCP_KEEPINTVL : 保活探测包的发送间隔(秒)
# - TCP_KEEPCNT : 保活探测失败的最大次数
# - TCP_CORK : 暂时阻止发送小包(Linux,用于拼包优化)
# - TCP_QUICKACK : 立即发送 ACK(禁用延迟确认)
# =============================================================================
# =============================================================================
# 5. socket.SOL_RAW - 原始套接字层(Raw Socket)
# 说明:用于原始套接字(SOCK_RAW),允许手动构造 IP/TCP/UDP 包头
# 需管理员权限(root / Administrator),用于网络扫描、自定义协议等
# 大部分系统不常用,主要用于高级网络工具
# 常见选项:
# - IP_HDRINCL : 在原始 socket 中包含 IP 头(Windows)
# - SO_ATTACH_FILTER : 附加 BPF(Berkeley Packet Filter)过滤器字节码
# - SO_DETACH_FILTER : 移除已附加的 BPF 过滤器
# - ICMP_FILTER : 设置 ICMP 消息过滤规则(如屏蔽 ping)
# =============================================================================
# =============================================================================
# 使用原则总结:
# - SO_* 开头的选项 → 使用 level = socket.SOL_SOCKET
# - IP_* 开头的选项 → 使用 level = socket.IPPROTO_IP
# - IPV6_* 开头的选项 → 使用 level = socket.IPPROTO_IPV6
# - TCP_* 开头的选项 → 使用 level = socket.IPPROTO_TCP
# - 原始套接字相关 → 使用 level = socket.SOL_RAW 或 socket.SOL_SOCKET
#
# 错误的 level 会导致:
# - OSError: Invalid argument
# - 设置无效(静默失败)
# - 行为不符合预期
#
# 建议:查文档或使用 help(socket.SO_REUSEADDR) 确认选项所属层级
# optname: 选项名(以 SOL_SOCKET 为例)
# 常见选项:
# - socket.SO_REUSEADDR : 允许重用本地地址(TIME_WAIT 时可用)
# - socket.SO_REUSEPORT : 允许多个 socket 绑定同一端口(Linux 3.9+)
# - socket.SO_KEEPALIVE : 启用 TCP 心跳检测(保活)
# - socket.SO_RCVBUF : 设置接收缓冲区大小
# - socket.SO_SNDBUF : 设置发送缓冲区大小
# - socket.SO_LINGER : 延迟关闭连接(struct linger)
# - socket.SO_BROADCAST : 允许发送广播消息(UDP)
# - socket.SO_OOBINLINE : 将带外数据(OOB)放入正常流
# - socket.SO_ERROR : 获取 socket 错误状态(只读)
#
# value: 选项值类型
# - int: 如 SO_REUSEADDR, SO_RCVBUF
# - tuple: 如 SO_LINGER → (onoff, linger_seconds)
# - bytes: 低层选项(极少用)
# value 的不同类型是为了精确控制 TCP/IP 协议栈的底层行为:
# • int:控制开关和数值(最常用)
# • tuple:设置复合参数(如 linger)
# • bytes:传递原始二进制结构(高级用途)
# 它们让你能:
# • 避免端口冲突(SO_REUSEADDR)
# • 提升性能(TCP_NODELAY)
# • 优雅关闭连接(SO_LINGER)
# • 实现高级功能(BPF 过滤、自定义 IP 头)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 设置接收缓冲区大小为 65536 字节(64KB)
# SO_RCVBUF 取值范围:
# - 最小值:系统限制(通常 1024)
# - 最大值:受系统配置限制(/proc/sys/net/core/rmem_max)
# - 实际值可能被系统翻倍(如设 65536,实际为 131072)
# - 可通过 getsockopt 验证实际大小
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536)
# 设置发送缓冲区大小为 65536 字节
# SO_SNDBUF 取值范围:同 SO_RCVBUF
# 注意:缓冲区大小影响性能,但不能无限增大
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)
# 将 socket 绑定到指定的 IP 和端口
# bind(address)
# address 元组格式:
# - AF_INET: (host, port)
# - host: str(IP 字符串)
# - port: int(0~65535)
# - AF_INET6: (host, port, flowinfo, scopeid)
# - flowinfo: 流量标签(IPv6)
# - scopeid: 作用域 ID(如 link-local 地址)
# - AF_UNIX: path_str(如 '/tmp/mysocket.sock')
#
# 绑定失败常见原因:
# - 端口被占用 → 换端口或 kill 进程
# - 权限不足(<1024 端口)→ 用 sudo
# - 地址非法 → 检查 IP 格式
ADDR = (HOST, PORT)
server_socket.bind(ADDR)
# 开始监听客户端连接
# listen(backlog)
# backlog 参数说明:
# - 类型:int
# - 取值:
# - <=0: 系统使用默认值(通常 5)
# - 1~5: 小型服务
# - 10~128: 中大型服务
# - 过大:浪费资源,无意义
# - 实际队列长度受系统限制(/proc/sys/net/core/somaxconn)
# - Linux 5.4+:backlog=0 表示最大值
# backlog 是“等待被 accept() 的连接队列长度
server_socket.listen(5)
# 打印提示信息
print(f"服务器已启动,正在监听 {ADDR}...")
# 设置 accept 超时时间为 30 秒
# settimeout(seconds)
# seconds 取值:
# - None: 阻塞模式(永久等待)
# - 0.0: 非阻塞模式(立即返回异常)
# - >0.0: 超时秒数(浮点数,支持小数如 0.5)
# 注意:超时后抛出 socket.timeout 异常,需 try-except 捕获
server_socket.settimeout(30.0)
# =============================================================================
# accept() - 接受一个传入的客户端连接请求
# 原型:client_socket, client_address = server_socket.accept()
# 返回:一个元组 (client_socket, client_address)
#
# 详细说明:
# - 该函数是 **阻塞式** 的(默认行为),会一直等待,直到有客户端连接到达
# - 每次调用 accept() 只接受 **一个** 客户端连接
# - 成功后:
# 1. 完成 TCP 三次握手(如果使用 SOCK_STREAM)
# 2. 创建一个新的 socket 对象(client_socket)用于与该客户端通信
# 3. 原始的 server_socket 仍可用于接受其他连接
# - 这是实现“并发服务器”的基础:每个客户端由一个独立的 client_socket 处理
# =============================================================================
# =============================================================================
# 1. 返回值详解
# =============================================================================
# (1) client_socket: 新的 socket 对象
# - 类型:socket.socket
# - 用途:专门用于与 **这个特定客户端** 通信
# - 特点:
# • 与 server_socket 是不同的对象
# • 绑定到相同的本地端口
# • 连接到客户端的 IP 和端口
# • 可以 recv()/send()/close()
# • 服务器可以同时持有多个 client_socket(每个客户端一个)
# (2) client_address: 客户端地址信息(元组)
# 格式根据 socket 的地址族(address family)不同而变化:
# - AF_INET(IPv4):
# 格式: (ip: str, port: int)
# 示例: ('192.168.1.100', 54321)
# 说明: 最常见,ip 是字符串,port 是整数
# - AF_INET6(IPv6):
# 格式: (ip: str, port: int, flowinfo: int, scopeid: int)
# 示例: ('fe80::1%eth0', 80, 0, 2)
# 说明:
# • flowinfo: 流量类别(Traffic Class)和流标签(Flow Label),通常为 0
# • scopeid: 范围标识符(如链路本地地址需要),表示网卡接口索引
# • 在实际编程中,flowinfo 和 scopeid 通常可以忽略(但必须接收)
# - AF_UNIX(Unix 域套接字):
# 格式: path_str: str
# 示例: '/tmp/my_socket'
# 说明: 用于同一台机器上的进程间通信(IPC),不走网络协议栈
# =============================================================================
# =============================================================================
# 2. accept() 的阻塞行为
# =============================================================================
# - 默认情况下,accept() 是 **阻塞调用**:
# 程序会停在 accept() 这一行,直到有客户端连接。
#
# - 如何控制阻塞行为?
# (1) 设置超时:
# server_socket.settimeout(5.0) # 5秒后抛出 socket.timeout
#
# (2) 设置为非阻塞模式:
# server_socket.setblocking(False)
# try:
# client_socket, addr = server_socket.accept()
# except BlockingIOError:
# # 没有连接到达,继续做其他事
# pass
#
# - 在并发服务器中:
# • 多线程:每个线程调用 accept(),竞争新连接
# • 多进程:通常由主进程 accept() 后分发给子进程
# • 异步(asyncio):使用事件循环非阻塞等待连接
# =============================================================================
# =============================================================================
# 3. accept() 可能抛出的异常
# =============================================================================
# - socket.timeout:
# • 原因:server_socket 设置了超时(settimeout()),且在超时时间内无连接
# • 处理:可以重试或退出
#
# - BlockingIOError:
# • 原因:socket 设置为非阻塞(setblocking(False)),但无连接到达
# • 处理:跳过,继续轮询或进入事件循环
#
# - OSError:
# • 原因:系统级错误,例如:
# - EINTR: 系统调用被信号中断(如 Ctrl+C)
# - EMFILE: 进程打开的文件描述符已达上限
# - ENFILE: 系统打开的文件总数已达上限
# - ENOMEM: 内存不足
# • 处理:通常需要记录日志并优雅退出
#
# - 其他可能:
# • KeyboardInterrupt: 用户按 Ctrl+C(Python 中通常转为 KeyboardInterrupt)
# • SystemExit: 系统退出信号
# =============================================================================
# =============================================================================
# 4. 实际使用注意事项
# =============================================================================
# - 每次 accept() 只处理一个连接:
# 必须在循环中调用,才能持续接受新连接:
#
# while True:
# client_socket, addr = server_socket.accept()
# print(f"新连接: {addr}")
# # 处理客户端(可开启线程/进程/协程)
# client_socket.close()
#
# - 资源管理:
# • 必须在通信结束后调用 client_socket.close()
# • 否则会导致文件描述符泄漏,最终无法接受新连接
#
# - 安全建议:
# • 记录客户端 IP 和端口(用于日志、限流、黑名单)
# • 验证客户端身份(如 TLS 证书、认证协议)
#
# - 性能提示:
# • 高并发场景下,避免在主线程直接处理客户端(会阻塞 accept())
# • 应使用多线程、多进程、或异步 I/O 将连接分发出去
# =============================================================================
# =============================================================================
# 5. 总结:accept() 的核心作用
# =============================================================================
# accept() 是服务器的“门卫”:
# - 等待客户端“敲门”(连接请求)
# - 完成“握手”(TCP 三次握手)
# - 分配一个独立的“接待员”(client_socket)
# - 将客户端交给接待员处理,自己继续等待下一个
#
# 它是实现网络服务的基础,理解其行为是编写健壮服务器的关键。
# =============================================================================
try:
# accept() 的作用是:从等待队列中取出一个已完成连接的客户端,返回一个专属的 client_socket 和其地址信息,以便服务器与之通信
client_conn, client_addr = server_socket.accept()
print(f"客户端 {client_addr} 已连接")
# 设置客户端连接的超时时间,防止 recv() 永久阻塞
client_conn.settimeout(10.0)
# 获取客户端连接的接收缓冲区大小(用于调试)
# getsockopt(level, optname) -> value
# 可用于验证 setsockopt 是否生效
rcvbuf_size = client_conn.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print(f"客户端连接接收缓冲区大小: {rcvbuf_size} 字节")
# 服务端主动发送第一条消息
# send(bytes_data) -> int
# - 参数必须是 bytes 类型
# - 返回值:实际发送的字节数(可能 < len(data))
# - 若未发送完,需循环调用 send(TCP 通常一次完成)
# - send() 不保证立即发出,数据在发送缓冲区中
init_message = b"Hello World"
message_length = len(init_message) # 计算字节数
try:
sent = client_conn.send(init_message)
print(f"已发送初始消息: {init_message.decode()} (共 {sent} 字节 / 原始长度 {message_length} 字节)")
except Exception as e:
print(f"发送初始消息失败: {e}")
client_conn.close()
server_socket.close()
exit()
# 进入主循环:接收并回显消息
while True:
try:
# 接收数据
# recv(bufsize) -> bytes
# - bufsize: 最大接收字节数
# - 返回值:
# - bytes: 接收到的数据
# - b'': 对方已关闭连接(正常关闭)
# - 可能抛出异常:
# - socket.timeout: 超时
# - ConnectionResetError: 对方强制关闭(RST)
data = client_conn.recv(BUFFER_SIZE)
# 检查连接是否关闭
if not data:
print("客户端已断开连接")
break # 退出循环
# 计算接收到的数据字节数
received_bytes = len(data)
print(f"收到 {received_bytes} 字节数据")
# 解码数据(尝试 UTF-8)
try:
received_text = data.decode('utf-8')
print(f"收到消息: {received_text}")
except UnicodeDecodeError:
received_text = data.hex() # 非文本数据显示为十六进制
print(f"收到非文本数据(十六进制): {received_text}")
# 构造响应并发送
response = b"Echo: " + data
response_length = len(response) # 计算响应总字节数
try:
sent = client_conn.send(response)
print(f"已回显 {sent} 字节数据 (响应总长度: {response_length} 字节)")
except Exception as e:
print(f"回显失败: {e}")
break # 发送失败,退出
# 异常处理
except socket.timeout:
print("接收数据超时,关闭连接")
break
except ConnectionResetError:
print("客户端强制关闭连接(RST)")
break
except Exception as e:
print(f"通信过程中发生错误: {e}")
break
except socket.timeout:
print("等待客户端连接超时(30秒)")
except Exception as e:
print(f"接受连接失败: {e}")
# 资源释放(必须)
# shutdown() 参数说明:
# - socket.SHUT_RD : 关闭读(不再 recv)
# - socket.SHUT_WR : 关闭写(不再 send)
# - socket.SHUT_RDWR : 同时关闭读写
# close() 释放 socket 资源
try:
if 'client_conn' in locals() and client_conn:
client_conn.shutdown(socket.SHUT_RDWR)
client_conn.close()
print("客户端连接已关闭")
except Exception as e:
print(f"关闭客户端连接时出错: {e}")
try:
server_socket.close()
print("服务器 socket 已关闭")
except Exception as e:
print(f"关闭服务器 socket 时出错: {e}")
print("服务器程序结束")
客户端代码和解释
# -*- coding: utf-8 -*-
# ==============================================================================
# 文件编码声明说明:
# # -*- coding: utf-8 -*- 是 Python 的编码声明,告诉解释器:
# - 这个 .py 文件是以 UTF-8 编码保存的
# - 允许文件中包含中文字符(如注释或字符串)
# - Python 3.7+ 默认使用 UTF-8,但保留此行可提高兼容性和可读性
# - 如果不写,且文件含中文,可能在旧版 Python 中报错
# ==============================================================================
# 导入 socket 模块
# socket 是 Python 内置的标准库模块,提供了对操作系统底层网络通信接口(BSD Socket API)的封装。
# 通过该模块可以实现 TCP、UDP 等网络协议的编程。
# 所有网络通信操作,如创建套接字、连接、发送、接收、关闭等,都依赖此模块。
import socket
# 服务器的 IP 地址(本机)
# HOST = '127.0.0.1'
# - '127.0.0.1' 是本地回环地址(loopback address),用于本机内部通信。
# - 只有本机上的程序可以连接此地址。
# - 如果服务器运行在其他机器上,需替换为对应机器的局域网 IP 或公网 IP。
# - 不可使用 '0.0.0.0',因为这是服务器监听时使用的通配地址,客户端不能连接它。
HOST = '127.0.0.1'
# 服务器的端口号(必须与 server.py 一致)
# PORT = 65432
# - 端口号用于标识服务器上具体的服务进程。
# - 取值范围:0 ~ 65535
# - 0 ~ 1023:系统保留端口(如 HTTP 80,HTTPS 443),通常需要管理员权限才能绑定。
# - 1024 ~ 49151:注册端口,用于常规服务。
# - 49152 ~ 65535:动态或私有端口,推荐用于测试和开发。
# - 客户端必须使用与服务器绑定(bind)的端口号一致,否则连接将被拒绝。
PORT = 65432
# 接收缓冲区大小
# BUFFER_SIZE = 1024
# - 定义每次调用 recv() 方法时最多从接收缓冲区中读取的字节数。
# - 单位:字节(byte)
# - 常见取值:512、1024(1KB)、2048、4096
# - 太小会导致频繁调用 recv(),增加系统调用开销。
# - 太大会浪费内存,且可能增加延迟。
# - 1024 字节是通用的平衡选择,适用于大多数文本通信场景。
BUFFER_SIZE = 1024
# 构造服务器地址元组
# ADDR = (HOST, PORT)
# - connect() 方法的参数必须是一个元组,格式为 (host, port)。
# - host:字符串类型,可以是 IP 地址(如 '192.168.1.100')或主机名(如 'localhost')。
# - port:整数类型,表示目标端口号。
# - 对于 IPv6,格式为 (host, port, flowinfo, scopeid)。
ADDR = (HOST, PORT)
# 创建客户端 socket
# socket.socket(family, type, proto, fileno)
# 参数详解:
# 1. family=socket.AF_INET
# - 指定地址族(Address Family)。
# - AF_INET 表示使用 IPv4 协议。
# - 若需连接 IPv6 服务器,应使用 socket.AF_INET6。
#
# 2. type=socket.SOCK_STREAM
# - 指定套接字类型(Socket Type)。
# - SOCK_STREAM 表示使用 TCP 协议,提供面向连接、可靠、有序的字节流传输。
# - 若使用 UDP 协议,应使用 socket.SOCK_DGRAM(数据报套接字)。
#
# 3. proto=socket.IPPROTO_TCP
# - 指定传输层协议号。
# - IPPROTO_TCP 表示使用 TCP 协议(IP 协议号为 6)。
# - 通常可以省略,系统会根据 type 参数自动推断协议。
# - 显式指定有助于代码清晰,尤其在教学或复杂场景中。
#
# 4. fileno=None
# - 指定文件描述符(File Descriptor)。
# - None 表示创建一个新的 socket 对象。
# - 若传入一个整数(如 3),表示从已有的文件描述符重建 socket 对象。
# - 该功能用于高级场景,如进程间通信、热重启等。
client_socket = socket.socket(
family=socket.AF_INET, # 使用 IPv4 地址族
type=socket.SOCK_STREAM, # 使用 TCP 流式套接字
proto=socket.IPPROTO_TCP, # 显式指定 TCP 传输协议
fileno=None # 创建新的 socket,不基于现有文件描述符
)
# 设置接收和发送缓冲区大小
# setsockopt(level, optname, value)
# - level:选项所属的协议层,如 socket.SOL_SOCKET(通用套接字层)。
# - optname:具体选项名称。
# - socket.SO_RCVBUF:设置接收缓冲区大小(单位:字节)。
# - socket.SO_SNDBUF:设置发送缓冲区大小(单位:字节)。
# - value:选项值,此处为 32768 字节(32KB)。
# - 注意:
# - 实际缓冲区大小可能大于设置值,因为操作系统可能自动翻倍以提高性能。
# - 可通过 getsockopt() 获取实际值进行验证。
# - 缓冲区越大,吞吐量越高,但占用内存也越多。
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 32768)
# 设置连接和接收超时时间为 10 秒
# settimeout(seconds)
# - seconds:超时时间,单位为秒,可以是整数或浮点数(如 0.5 表示 500 毫秒)。
# - 功能:
# - connect():若在指定时间内未完成连接,则抛出 socket.timeout 异常。
# - recv():若在指定时间内未收到数据,则抛出 socket.timeout 异常。
# - 特殊值:
# - None:阻塞模式,调用将一直等待,直到操作完成或出错。
# - 0.0:非阻塞模式,调用立即返回,若无法完成则抛出 BlockingIOError。
# - >0.0:超时模式,等待指定时间后超时。
# - 设置超时可防止程序因网络问题无限期挂起。
client_socket.settimeout(10.0)
# 尝试连接服务器
# connect(address)
# - address:目标服务器地址,格式为 (host, port) 的元组。
# - 该方法是阻塞的,直到连接成功或失败。
# - 成功:完成 TCP 三次握手,进入 ESTABLISHED 状态。
# - 失败:抛出异常。
try:
client_socket.connect(ADDR)
print(f"成功连接到服务器 {ADDR}")
except ConnectionRefusedError:
# 常见原因:
# - 服务器程序未启动。
# - 服务器未在指定端口监听。
# - 防火墙或安全组阻止了连接。
print("连接失败:服务器未启动或地址错误")
client_socket.close()
exit()
except socket.timeout:
# 可能原因:
# - 网络延迟过高。
# - 服务器响应缓慢。
# - 网络中断或丢包严重。
print("连接超时(10秒内未响应)")
client_socket.close()
exit()
except Exception as e:
# 捕获其他可能的异常,如网络不可达(Network is unreachable)、地址格式错误等。
print(f"连接失败: {e}")
client_socket.close()
exit()
# 获取当前 socket 的发送和接收缓冲区大小
# getsockopt(level, optname) -> int
# - level:选项层,此处为 socket.SOL_SOCKET。
# - optname:选项名,SO_SNDBUF 和 SO_RCVBUF 分别获取发送和接收缓冲区大小。
# - 返回值:整数,表示缓冲区大小(字节)。
# - 注意:
# - 实际值可能大于 setsockopt 设置的值(系统自动调整)。
# - 不同操作系统默认值不同(如 Linux 通常为 8KB ~ 128KB)。
try:
sndbuf = client_socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
rcvbuf = client_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
print(f"客户端缓冲区 - 发送: {sndbuf} 字节, 接收: {rcvbuf} 字节")
except Exception as e:
print(f"获取 socket 选项失败: {e}")
# 【关键】接收服务端发送的第一条消息 "Hello World"
# 许多服务器在客户端连接成功后会立即发送一条欢迎消息或状态信息。
# 客户端应主动接收,否则数据将堆积在接收缓冲区,可能导致后续接收混乱。
try:
init_data = client_socket.recv(BUFFER_SIZE)
if init_data:
# recv() 返回非空字节串,表示收到数据
try:
msg = init_data.decode('utf-8')
print(f"收到服务端消息: {msg}")
except UnicodeDecodeError:
# 如果数据不是有效的 UTF-8 文本(如二进制数据),则以十六进制格式显示
print(f"收到非文本数据(十六进制): {init_data.hex()}")
else:
# recv() 返回 b''(空字节串)表示对端已关闭连接(正常关闭)
print("收到空数据,服务端已关闭连接")
client_socket.close()
exit()
except socket.timeout:
# 在 settimeout 设置的 10 秒内未收到任何数据
print("接收初始消息超时(10秒)")
client_socket.close()
exit()
except Exception as e:
print(f"接收消息失败: {e}")
client_socket.close()
exit()
# 定义要发送的测试消息列表(bytes 类型)
# TCP 是字节流协议,只能传输 bytes 类型的数据,不能直接传输 str。
# 因此所有消息必须以字节字符串(b"")形式定义。
test_messages = [b"Hello", b"TCP", b"Test", b"Message"]
# 循环发送每条消息并接收回显
# 模拟客户端与服务器的典型交互流程:
# 发送 → 等待回显 → 处理响应 → 下一条
for msg in test_messages:
try:
# 发送消息
# send(data) -> int
# - data:要发送的字节数据(bytes 类型)。
# - 返回值:实际成功发送的字节数(int 类型)。
# - 通常等于 len(data),表示全部发送成功。
# - 可能小于 len(data)(如发送缓冲区满),此时需循环调用 send 直到全部发送。
# - 注意:send() 成功返回仅表示数据已拷贝到发送缓冲区,并不代表对方已收到。
client_socket.send(msg)
print(f"已发送: {msg.decode()}")
except Exception as e:
# 发送失败的可能原因:
# - 网络中断或连接断开。
# - 对端已关闭连接。
# - 发送缓冲区满且超时(在阻塞/超时模式下)。
print(f"发送失败: {e}")
break # 发送失败,退出循环,避免后续操作无效
try:
# 接收回显
# recv(bufsize) -> bytes
# - bufsize:建议的最大接收字节数,实际返回可能更少。
# - 返回值:接收到的字节数据(bytes)。
# - 若返回非空,表示收到数据。
# - 若返回 b'',表示对端已关闭连接。
# - 行为:阻塞直到有数据到达、连接关闭或超时。
response = client_socket.recv(BUFFER_SIZE)
if response:
try:
reply = response.decode('utf-8')
print(f"收到回显: {reply}")
except UnicodeDecodeError:
print(f"收到非文本回显(十六进制): {response.hex()}")
else:
# 对端关闭连接
print("服务端关闭连接")
break
except socket.timeout:
# 超过 10 秒未收到服务器响应
print("接收响应超时(10秒)")
except Exception as e:
print(f"接收失败: {e}")
break
# 每次发送后暂停 0.5 秒,便于观察程序执行流程
# 避免消息发送过快导致日志输出混乱。
# 模拟真实用户或系统操作的合理间隔。
import time
time.sleep(0.5)
# 演示 fileno 高级用法(可选)
# 文件描述符(File Descriptor)是操作系统用于标识打开文件或 I/O 资源的非负整数。
# socket 在操作系统中被视为一种特殊的“文件”,因此也有文件描述符。
# 此功能主要用于高级场景,如:
# - 进程间传递 socket(通过 Unix 域套接字)。
# - 使用 os.dup() 复制文件描述符。
# - 在底层系统调用中直接操作 socket。
try:
fd = client_socket.fileno()
print(f"当前 socket 文件描述符: {fd}")
# 复制文件描述符
# os.dup(fd) 创建一个新的文件描述符,指向同一个 socket 资源。
# 原始和复制的 fd 都可以操作同一 socket。
# 常用于 fork() 后父子进程共享 socket。
import os
new_fd = os.dup(fd)
print(f"复制的文件描述符: {new_fd}")
# 关闭原 socket
# 注意:close() 并不立即释放 socket 资源,而是减少引用计数。
# 只有当所有指向该 socket 的文件描述符都被关闭后,资源才会真正释放。
client_socket.close()
print("原 socket 已关闭(但 new_fd 仍有效)")
# 从文件描述符重建 socket
# socket.socket(fileno=new_fd) 从已有的文件描述符创建一个新的 socket 对象。
# 常用于进程恢复、热重启、资源迁移等高级场景。
restored_socket = socket.socket(fileno=new_fd)
print("从文件描述符重建 socket 成功")
# 尝试发送和接收(可能失败,仅用于演示)
# 注意:如果服务器端已关闭连接,send 和 recv 将失败。
try:
restored_socket.send(b"Final test")
reply = restored_socket.recv(BUFFER_SIZE)
print(f"重建后收到: {reply.decode('utf-8', errors='ignore')}")
except Exception as e:
print(f"重建后操作失败: {e}")
finally:
restored_socket.close()
except Exception as e:
print(f"fileno 操作失败: {e}")
# 最终关闭
# 确保所有 socket 资源都被正确释放。
# 即使前面已调用 close(),也在此处再次确保,防止文件描述符泄漏。
# 文件描述符是有限资源,泄漏可能导致程序无法创建新连接。
print("客户端测试完成")

附带小工具一个
# -*- coding: utf-8 -*- """ 互联工具:基于 UDP 广播发现设备,TCP 连接通信(安全加固版) ✅ 已修复: - 路径穿越(Path Traversal) - 内存 DoS(大文件攻击) - 消息解析逻辑错误 - 无输入长度限制 - UDP 源地址未过滤 - TCP 端口硬编码 - 状态竞争条件 """
""" 互联工具:基于 UDP 广播发现设备,TCP 连接通信 功能: - 自动发现局域网内运行本程序的设备(通过 UDP 广播) - 支持 TCP 双向文本聊天 - 支持文件发送与接收(带进度提示) - 图形化界面(Tkinter) - 实时显示本机信息、发现的设备、UDP 广播消息 - 跨平台兼容(Windows/Linux/macOS) 使用说明见文末。 """
import tkinter as tk
from tkinter import filedialog, messagebox, scrolledtext
import socket
import subprocess
import time
import threading
import random
import os
import re
from typing import Optional
# ==================== 日志开关 ====================
ENABLE_LOGGING = False
COLOR_RESET = '\033[0m'
COLOR_FUNC = '\033[1;36m'
COLOR_INPUT = '\033[32m'
COLOR_RETURN = '\033[34m'
try:
from colorama import init
init(autoreset=True)
except ImportError:
pass
def log_call(func):
if not ENABLE_LOGGING:
return func
def wrapper(*args, **kwargs):
print(f"{COLOR_FUNC}调用函数: {func.__name__}{COLOR_RESET}")
print(f"{COLOR_INPUT} 输入: args={args}, kwargs={kwargs}{COLOR_RESET}")
result = func(*args, **kwargs)
print(f"{COLOR_RETURN} 返回: {result!r}{COLOR_RESET}")
return result
wrapper.__name__ = func.__name__
return wrapper
# ==================== 安全辅助函数 ====================
def sanitize_filename(name: str) -> str:
"""安全过滤文件名,防止路径穿越"""
if not name:
return "unnamed_file"
# 只保留安全字符
clean = re.sub(r'[^a-zA-Z0-9._\-]', '_', name)
# 移除开头的点(防 .bashrc 覆盖)
clean = clean.lstrip('.')
# 限制长度
return clean[:255] or "unnamed_file"
def is_private_ip(ip: str) -> bool:
"""检查是否为私有 IP 地址"""
try:
parts = list(map(int, ip.split('.')))
if len(parts) != 4:
return False
a, b, c, d = parts
return (
(a == 10) or
(a == 172 and 16 <= b <= 31) or
(a == 192 and b == 168) or
(a == 127) # 允许本地回环(用于测试)
)
except:
return False
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
MAX_MESSAGE_LEN = 1024 # 单条消息最大长度
# ======================= 主窗口类 ================================
class WindowTk:
def __init__(self):
self.root = tk.Tk()
self.root.title('互联 - 局域网通信工具(安全版)')
self.root.configure(background='white')
self.root.geometry("1200x800+200+100")
self.root.attributes('-alpha', 0.95)
self.running = True
self.tcp_server_running = False
self.connected = False
self.current_conn = None
self.target_ip = None
self.target_port = None
self.udp_listener_socket: Optional[socket.socket] = None
self.discovery_socket: Optional[socket.socket] = None
self.listener_thread: Optional[threading.Thread] = None
self.discovery_thread: Optional[threading.Thread] = None
self.tcp_server_thread: Optional[threading.Thread] = None
self.devices = {}
self.selected_device = tk.StringVar(value="")
self.menu_widgets = []
self.local_info = {
"username": "未知用户",
"ip": "127.0.0.1",
"port": 65001
}
# 文件接收状态(增加锁防止并发)
self.file_receiving = False
self.file_expected_size = 0
self.file_received_size = 0
self.file_name = ""
self.file_buffer = b''
self.file_receiving_lock = threading.Lock() # 新增锁
self._init_local_info()
self.setup_ui()
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
self.start_tcp_server()
self.start_udp_listener()
def _init_local_info(self):
info_text = self.get_system_info()
lines = info_text.split('\n')
for line in lines:
if line.startswith("用户名:"):
self.local_info["username"] = line.split(":", 1)[1].strip()
elif line.startswith("IP:"):
self.local_info["ip"] = line.split(":", 1)[1].strip()
self.local_info["port"] = self.get_free_port()
@log_call
def setup_ui(self):
self.tishi()
self.show_wenben()
self.create_load_button()
self.show_wenben2()
self.find_load_button()
self.show_wenben3()
self.create_listen_button()
self.create_menu()
self.sectle_button()
self.create_chat_area()
# === UI 方法(保持不变,省略以节省空间)===
@log_call
def tishi(self):
self.tip_label = tk.Label(
self.root,
text="这是一个局域网聊天室(安全加固版)",
font=("微软雅黑", 14, "bold"),
fg="#FFD700",
bg="#1E1E1E",
anchor="center",
relief="flat",
highlightbackground='pink',
highlightthickness=2
)
self.tip_label.place(relx=0.14, rely=0.0, relwidth=0.86, relheight=0.05)
return self.tip_label
@log_call
def show_wenben(self):
self.label = tk.Label(
self.root,
font=('Consolas', 12, 'bold'),
fg='lime', bg='black', anchor='nw',
padx=10, pady=5,
highlightbackground='blue', highlightthickness=2, bd=0,
justify='left'
)
self.label.place(relx=0, rely=0.05, relwidth=0.14, relheight=0.20)
self.update_info_display()
return '左上方文本已创建'
@log_call
def get_system_info(self):
username = '未知用户'
try:
result = subprocess.run('echo %USERNAME%', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, encoding='gbk', timeout=3)
if result.returncode == 0:
name = result.stdout.strip()
if name and name != '%USERNAME%':
username = name
except Exception as e:
print(f"echo %USERNAME% 失败: {e}")
if username in ['未知用户', '%USERNAME%']:
try:
result = subprocess.run('whoami', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
encoding='gbk', timeout=3)
if result.returncode == 0:
username = result.stdout.strip().split('\\')[-1]
except Exception as e:
print(f"whoami 失败: {e}")
if username in ['未知用户', '%USERNAME%']:
try:
import getpass
username = getpass.getuser()
except:
pass
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
ip = "无法获取IP"
ports = [65001, 65002, 65003]
status = "\n".join(
f"端口 {port}: {'空闲' if self.is_port_free(port) else '占用'}"
for port in ports
)
info = f"本机信息:\n用户名: {username}\nIP: {ip}\n\n{status}"
return info
def is_port_free(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
try:
sock.bind(("", port))
return True
except OSError:
return False
@log_call
def update_info_display(self):
info_text = self.get_system_info()
if hasattr(self, 'label') and self.label.winfo_exists():
self.label.config(text=info_text)
lines = info_text.split('\n')
for line in lines:
if line.startswith("用户名:"):
self.local_info["username"] = line.split(":", 1)[1].strip()
elif line.startswith("IP:"):
self.local_info["ip"] = line.split(":", 1)[1].strip()
self.local_info["port"] = self.get_free_port()
@log_call
def create_load_button(self):
btn = tk.Button(
self.root,
text="获取本机信息",
command=self.update_info_display,
font=('Consolas', 10, 'bold'),
bg='blue', fg='white', relief='flat'
)
btn.place(relx=0, rely=0.25, relwidth=0.14, relheight=0.04)
return btn
@log_call
def show_wenben2(self):
self.label1 = tk.Label(
self.root,
font=('Consolas', 11, 'bold'),
fg='lime', bg='black', anchor='nw',
padx=10, pady=5,
highlightbackground='pink', highlightthickness=2, bd=0,
justify='left'
)
self.label1.place(relx=0, rely=0.29, relwidth=0.14, relheight=0.18)
return '左中文本框已创建'
@log_call
def find_load_button(self):
btn = tk.Button(
self.root,
text="发现设备",
command=self.start_discovery,
font=('Consolas', 10, 'bold'),
bg='orange', fg='white', relief='flat'
)
btn.place(relx=0, rely=0.47, relwidth=0.14, relheight=0.04)
return btn
@log_call
def get_free_port(self) -> int:
for port in [65001, 65002, 65003]:
if self.is_port_free(port):
return port
return 65001
@log_call
def start_discovery(self):
def discovery_task():
self.update_info_display()
local_ip = self.local_info["ip"]
local_port = self.local_info["port"]
username = self.local_info["username"]
broad_ip_str = ".".join(local_ip.split('.')[:3]) + ".255"
if self.discovery_socket:
try:
self.discovery_socket.close()
except:
pass
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(1)
try:
sock.bind((local_ip, 65000))
except Exception as e:
print(f"绑定 65000 失败: {e}")
return
self.discovery_socket = sock
message = f"HELLO:{username}:{local_ip}:{local_port}"
self.devices.clear()
attempts = 0
max_retries = 3
self.root.after(0, lambda: self.label1.config(text="正在搜索..."))
while attempts < max_retries and self.running:
found = False
for _ in range(3):
if not self.running:
break
try:
if len(message.encode('utf-8')) > MAX_MESSAGE_LEN:
print("广播消息过长,跳过")
continue
sock.sendto(message.encode('utf-8', errors='ignore'), (broad_ip_str, 65004))
except Exception as e:
print(f"广播发送失败: {e}")
time.sleep(random.uniform(0.5, 1.5))
start_time = time.time()
while self.running and time.time() - start_time < 5:
try:
data, addr = sock.recvfrom(MAX_MESSAGE_LEN)
if not is_private_ip(addr[0]):
continue # 忽略非私有 IP
msg = data.decode('utf-8', errors='ignore').strip()
if len(msg) > MAX_MESSAGE_LEN:
continue
if msg.startswith("HELLO:"):
parts = msg.split(":", 3)
if len(parts) != 4:
continue
_, r_name, r_ip, r_port = parts
if r_ip == local_ip and r_port == str(local_port):
continue
if not is_private_ip(r_ip):
continue
self.devices[r_ip] = f"{r_name}:{r_ip}:{r_port}"
found = True
except socket.timeout:
continue
except Exception as e:
print(f"接收错误: {e}")
if found:
break
attempts += 1
time.sleep(1)
self.discovery_socket = None
try:
sock.close()
except:
pass
final_text = "发现设备:\n" + "\n".join(self.devices.values()) if self.devices else "未发现设备"
self.root.after(0, lambda: self.label1.config(text=final_text))
self.root.after(0, self.create_menu)
if self.discovery_thread and self.discovery_thread.is_alive():
print("发现任务已在运行")
return
self.discovery_thread = threading.Thread(target=discovery_task, daemon=True)
self.discovery_thread.start()
@log_call
def show_wenben3(self):
self.label2 = tk.Label(
self.root,
font=('Consolas', 10),
fg='yellow', bg='black', anchor='nw',
padx=10, pady=5,
highlightbackground='cyan', highlightthickness=2, bd=0,
wraplength=150, justify='left'
)
self.label2.place(relx=0, rely=0.51, relwidth=0.14, relheight=0.18)
self.label2.config(text="等待UDP广播...\n")
return '左下文本框(label2)已创建'
@log_call
def start_udp_listener(self):
def listen_task():
if self.udp_listener_socket:
try:
self.udp_listener_socket.close()
except:
pass
LISTEN_PORT = 65004
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
sock.bind(('', LISTEN_PORT))
except Exception as e:
print(f"绑定 {LISTEN_PORT} 失败: {e}")
return
sock.settimeout(1)
self.udp_listener_socket = sock
local_ip = self.local_info["ip"]
local_port = self.local_info["port"]
while self.running:
try:
data, addr = sock.recvfrom(MAX_MESSAGE_LEN)
if not is_private_ip(addr[0]):
continue
msg = data.decode('utf-8', errors='ignore').strip()
if len(msg) > MAX_MESSAGE_LEN:
continue
if msg.startswith("HELLO:"):
parts = msg.split(":", 3)
if len(parts) != 4:
continue
_, name, ip, port = parts
if ip == local_ip and port == str(local_port):
continue
if not is_private_ip(ip):
continue
display_msg = f"{name}@{ip}:{port}"
current = self.label2.cget("text").split('\n')[-9:]
current.append(f"← {display_msg}")
self.root.after(0, lambda t='\n'.join(current): self.label2.config(text=t))
self.devices[ip] = f"{name}:{ip}:{port}"
self.root.after(0, self.create_menu)
except socket.timeout:
continue
except Exception as e:
if self.running:
print(f"UDP监听错误: {e}")
continue
try:
sock.close()
except:
pass
self.udp_listener_socket = None
if self.listener_thread and self.listener_thread.is_alive():
print("UDP监听已在运行")
return
self.listener_thread = threading.Thread(target=listen_task, daemon=True)
self.listener_thread.start()
@log_call
def create_listen_button(self):
btn = tk.Button(
self.root,
text="开始监听UDP",
command=self.start_udp_listener,
font=('Consolas', 10, 'bold'),
bg='purple', fg='white', relief='flat'
)
btn.place(relx=0, rely=0.69, relwidth=0.14, relheight=0.04)
return btn
@log_call
def create_menu(self):
for widget in self.menu_widgets:
widget.destroy()
self.menu_widgets.clear()
devices = list(self.devices.values()) if self.devices else []
title_label = tk.Label(
self.root, text="选择目标设备:",
font=('Consolas', 10, 'bold'), fg='cyan', bg='gray10', anchor='w'
)
title_label.place(relx=0, rely=0.73, relwidth=0.14, relheight=0.04)
self.menu_widgets.append(title_label)
canvas = tk.Canvas(self.root, bg='gray12', highlightthickness=1, highlightbackground='gray20')
canvas.place(relx=0, rely=0.77, relwidth=0.14, relheight=0.15)
self.menu_widgets.append(canvas)
scrollbar = tk.Scrollbar(self.root, orient="vertical", command=canvas.yview)
scrollbar.place(relx=0.119, rely=0.77, relwidth=0.021, relheight=0.15)
self.menu_widgets.append(scrollbar)
canvas.configure(yscrollcommand=scrollbar.set)
inner_frame = tk.Frame(canvas, bg='gray10')
canvas.create_window((0, 0), window=inner_frame, anchor='nw')
if not devices:
no_label = tk.Label(inner_frame, text="无设备可选", font=('Consolas', 9), fg='red', bg='gray10')
no_label.pack(fill='x')
self.selected_device.set("")
else:
for dev in devices:
rb = tk.Radiobutton(
inner_frame, text=dev, font=('Consolas', 9), variable=self.selected_device,
value=dev, fg='lime', bg='gray10', selectcolor='darkgreen',
anchor='w', padx=10, pady=4
)
rb.pack(fill='x')
def update_scroll(event=None):
canvas.configure(scrollregion=canvas.bbox("all"))
inner_frame.bind("<Configure>", update_scroll)
update_scroll()
@log_call
def start_tcp_server(self):
if self.tcp_server_running:
return
def server_task():
port = self.local_info["port"]
if not self.is_port_free(port):
print(f"端口 {port} 已被占用,无法启动 TCP 服务")
return
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_sock.bind(("", port))
server_sock.listen(1)
print(f"本地 TCP 服务监听端口 {port}")
self.tcp_server_running = True
except Exception as e:
print(f"TCP服务绑定失败: {e}")
self.tcp_server_running = False
return
while self.tcp_server_running:
try:
conn, addr = server_sock.accept()
if not is_private_ip(addr[0]):
conn.close()
continue
print(f"来自 {addr} 的连接")
self.current_conn = conn
self.connected = True
threading.Thread(target=self.receive_messages, args=(conn,), daemon=True).start()
except Exception as e:
if self.tcp_server_running:
print(f"服务器 accept 错误: {e}")
break
try:
server_sock.close()
except:
pass
self.tcp_server_running = False
self.tcp_server_thread = threading.Thread(target=server_task, daemon=True)
self.tcp_server_thread.start()
# =============== 核心:消息与文件接收逻辑(已加固)==============
@log_call
def receive_messages(self, conn):
buffer = b''
while self.connected:
try:
data = conn.recv(1024)
if not data:
break
buffer += data
# 文件接收模式(加锁)
if self.file_receiving:
with self.file_receiving_lock:
self.file_buffer += data
self.file_received_size = len(self.file_buffer)
if self.file_received_size >= self.file_expected_size:
full_file_data = self.file_buffer[:self.file_expected_size]
remainder = self.file_buffer[self.file_expected_size:]
buffer = remainder
self.root.after(0, lambda: self._save_received_file(full_file_data))
# 重置状态
self.file_receiving = False
self.file_buffer = b''
self.file_name = ""
self.file_expected_size = 0
self.file_received_size = 0
continue
# 处理文本消息
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
if len(line) > MAX_MESSAGE_LEN:
self.add_message_to_chat("消息过长,已丢弃", "system")
continue
try:
msg = line.decode('utf-8', errors='ignore').strip()
if msg.startswith("FILE:"):
parts = msg.split(':', 2) # 正确拆分为3部分
if len(parts) == 3:
filename = sanitize_filename(parts[1])
try:
filesize = int(parts[2])
except ValueError:
self.add_message_to_chat("文件大小无效", "system")
continue
if filesize <= 0 or filesize > MAX_FILE_SIZE:
self.add_message_to_chat(f"文件大小非法或过大(>{MAX_FILE_SIZE//1024//1024}MB)", "system")
continue
with self.file_receiving_lock:
self.file_name = filename
self.file_expected_size = filesize
self.file_receiving = True
self.file_buffer = b''
self.file_received_size = 0
self.add_message_to_chat(f"正在接收文件: {filename} ({filesize} 字节)", "system")
else:
self.add_message_to_chat("文件头格式错误", "system")
else:
self.add_message_to_chat(f"对方: {msg}", "received")
except Exception as e:
self.add_message_to_chat(f"解析消息失败: {str(e)}", "system")
except Exception as e:
print(f"接收消息错误: {e}")
break
self.connected = False
self.current_conn = None
def _save_received_file(self, file_data):
def do_save():
save_path = filedialog.asksaveasfilename(
title="保存接收到的文件",
initialfile=self.file_name,
defaultextension=""
)
if save_path:
try:
with open(save_path, 'wb') as f:
f.write(file_data)
self.add_message_to_chat(f"文件已保存至: {save_path}", "system")
except Exception as e:
self.add_message_to_chat(f"保存文件失败: {str(e)}", "system")
else:
self.add_message_to_chat("文件接收已取消", "system")
# 重置状态(双重保险)
self.file_receiving = False
self.file_buffer = b''
self.file_name = ""
self.file_expected_size = 0
self.file_received_size = 0
self.root.after(0, do_save)
# =============== 发送消息和文件 ===============
@log_call
def send_message(self):
if not self.connected or not self.current_conn:
messagebox.showwarning("警告", "尚未连接到任何设备")
return
message = self.input_text.get("1.0", "end-1c").strip()
if not message:
return
if len(message) > MAX_MESSAGE_LEN:
messagebox.showwarning("警告", f"消息过长(最大 {MAX_MESSAGE_LEN} 字符)")
return
try:
full_message = f"{message}\n"
self.current_conn.send(full_message.encode('utf-8'))
self.add_message_to_chat(f"我: {message}", "sent")
self.input_text.delete("1.0", "end")
except Exception as e:
messagebox.showerror("错误", f"发送消息失败: {e}")
self.connected = False
self.current_conn = None
@log_call
def send_file(self):
if not self.connected or not self.current_conn:
messagebox.showwarning("警告", "尚未连接到任何设备")
return
file_path = filedialog.askopenfilename(title="选择要发送的文件")
if not file_path:
return
try:
filename = sanitize_filename(os.path.basename(file_path))
filesize = os.path.getsize(file_path)
if filesize > MAX_FILE_SIZE:
messagebox.showerror("错误", f"文件过大(最大 {MAX_FILE_SIZE//1024//1024} MB)")
return
header = f"FILE:{filename}:{filesize}\n"
if len(header.encode('utf-8')) > MAX_MESSAGE_LEN:
messagebox.showerror("错误", "文件名过长")
return
self.current_conn.send(header.encode('utf-8'))
with open(file_path, 'rb') as f:
while True:
chunk = f.read(2048)
if not chunk:
break
self.current_conn.send(chunk)
self.add_message_to_chat(f"已发送文件: {filename}", "system")
except Exception as e:
messagebox.showerror("错误", f"发送文件失败: {e}")
@log_call
def add_message_to_chat(self, message, msg_type):
self.chat_text.config(state='normal')
tag = msg_type if msg_type in ["sent", "received", "system"] else "system"
self.chat_text.insert("end", message + "\n", tag)
self.chat_text.see("end")
self.chat_text.config(state='disabled')
@log_call
def create_chat_area(self):
self.chat_frame = tk.Frame(self.root, highlightbackground='pink', highlightthickness=2)
self.chat_frame.place(relx=0.14, rely=0.05, relwidth=0.86, relheight=0.6)
self.chat_text = scrolledtext.ScrolledText(
self.chat_frame,
font=('Consolas', 10),
bg='black', fg='white',
wrap=tk.WORD,
state='disabled',
insertbackground='white',
padx=5, pady=5
)
self.chat_text.pack(expand=True, fill='both')
self.chat_text.tag_configure("sent", foreground="blue")
self.chat_text.tag_configure("received", foreground="green")
self.chat_text.tag_configure("system", foreground="orange")
self.input_text = tk.Text(
self.root,
font=('Consolas', 10),
bg='gray10', fg='white',
height=3,
wrap=tk.WORD
)
self.input_text.place(relx=0.14, rely=0.66, relwidth=0.76, relheight=0.12)
self.send_btn = tk.Button(
self.root,
text="发送",
command=self.send_message,
font=('Consolas', 10, 'bold'),
bg='green', fg='white', relief='flat'
)
self.send_btn.place(relx=0.9, rely=0.66, relwidth=0.1, relheight=0.06)
self.file_btn = tk.Button(
self.root,
text="选择文件",
command=self.send_file,
font=('Consolas', 10, 'bold'),
bg='purple', fg='white', relief='flat'
)
self.file_btn.place(relx=0.9, rely=0.72, relheight=0.06, relwidth=0.1)
self.add_message_to_chat("欢迎使用局域网聊天室(安全加固版)!", "system")
self.add_message_to_chat("已启用:文件名过滤、100MB 限制、私有 IP 验证", "system")
@log_call
def sectle_button(self):
btn = tk.Button(
self.root,
text="连接选中设备",
command=self.connect_to_device,
font=('Consolas', 10, 'bold'),
bg='green', fg='white', relief='flat'
)
btn.place(relx=0, rely=0.92, relwidth=0.14, relheight=0.04)
return btn
@log_call
def connect_to_device(self):
if self.connected:
messagebox.showinfo("提示", "已连接到设备")
return
selected = self.selected_device.get()
if not selected:
self.root.after(0, lambda: self.label1.config(text="未选择设备"))
return
try:
username, ip, port_str = selected.split(":")
port = int(port_str)
except Exception as e:
self.root.after(0, lambda: self.label1.config(text="格式错误"))
return
if ip == self.local_info["ip"] and port == self.local_info["port"]:
self.root.after(0, lambda: self.label1.config(text="不能连接自己!"))
return
if self.current_conn:
try:
self.current_conn.close()
except:
pass
self.current_conn = None
max_retries = 3
connected = False
for i in range(max_retries):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
print(f"尝试连接到 {ip}:{port}") # ✅ 使用对方声明的 port,不是硬编码 65001
sock.connect((ip, port)) # ← 关键修复!
message = f"Hello from {self.local_info['username']}\n"
sock.send(message.encode('utf-8', errors='ignore'))
self.current_conn = sock
self.connected = True
self.target_ip = ip
self.target_port = port
threading.Thread(target=self.receive_messages, args=(sock,), daemon=True).start()
success_text = f"已连接\n{username}@{ip}:{port}"
self.root.after(0, lambda t=success_text: self.label1.config(text=t))
self.root.after(0, lambda: self.add_message_to_chat(f"已连接到 {username}", "system"))
connected = True
break
except Exception as e:
print(f"连接失败 {i + 1}/3: {e}")
time.sleep(1)
if not connected:
fail_msg = f"连接 {ip}:{port} 失败"
self.root.after(0, lambda: self.label1.config(text=fail_msg))
self.root.after(0, lambda: self.add_message_to_chat("连接失败,请重试", "system"))
@log_call
def on_closing(self):
print("开始清理...")
self.running = False
self.tcp_server_running = False
self.connected = False
if self.current_conn:
try:
self.current_conn.close()
except:
pass
self.current_conn = None
for sock in [self.udp_listener_socket, self.discovery_socket]:
if sock:
try:
sock.close()
except:
pass
for t in [self.listener_thread, self.discovery_thread, self.tcp_server_thread]:
if t and t.is_alive():
t.join(timeout=2)
self.root.destroy()
@log_call
def tk_end(self):
self.root.mainloop()
return "程序退出"
# ==================== 使用说明 ====================
"""
🌟 安全加固说明:
1. 文件名过滤:只允许字母、数字、点、下划线、连字符,防止路径穿越
2. 文件大小限制:最大 100 MB,防内存 DoS
3. 消息长度限制:所有网络消息 ≤ 1024 字节
4. IP 地址验证:只接受私有 IP(192.168.x.x, 10.x.x.x, 172.16-31.x.x)
5. TCP 端口修复:使用对方广播中的端口,而非硬编码 65001
6. 状态锁:防止文件接收状态并发冲突
7. 资源清理:确保所有 socket 和线程安全退出
适用于家庭/可信局域网环境。
"""
if __name__ == "__main__":
app = WindowTk()
app.tk_end()
openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构
更多推荐
所有评论(0)