当前位置: 首页 > news >正文

Python网络编程与Socket通信

Python网络编程与Socket通信

一、Socket基础概念

Socket(套接字)是网络通信的端点,提供了进程间通信的机制。Python的socket模块提供了对底层网络接口的访问。

import socket

# 创建TCP Socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 创建UDP Socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 地址族
# AF_INET - IPv4
# AF_INET6 - IPv6
# AF_UNIX - Unix域套接字

# Socket类型
# SOCK_STREAM - TCP(面向连接,可靠)
# SOCK_DGRAM - UDP(无连接,不可靠但快速)


二、TCP服务器与客户端

2.1 简单TCP服务器

import socket
import threading

class TCPServer:
def __init__(self, host='0.0.0.0', port=8080):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

def start(self):
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器启动在 {self.host}:{self.port}")

try:
while True:
client_socket, address = self.server_socket.accept()
print(f"新连接来自 {address}")
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("服务器关闭")
finally:
self.server_socket.close()

def handle_client(self, client_socket, address):
try:
while True:
data = client_socket.recv(4096)
if not data:
break
message = data.decode('utf-8')
print(f"收到来自 {address}: {message}")
response = f"服务器收到: {message}"
client_socket.send(response.encode('utf-8'))
except ConnectionResetError:
print(f"客户端 {address} 断开连接")
finally:
client_socket.close()

2.2 TCP客户端

class TCPClient:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

def connect(self):
self.socket.connect((self.host, self.port))
print(f"已连接到 {self.host}:{self.port}")

def send(self, message):
self.socket.send(message.encode('utf-8'))
response = self.socket.recv(4096)
return response.decode('utf-8')

def close(self):
self.socket.close()

# 使用
client = TCPClient()
client.connect()
response = client.send("Hello, Server!")
print(response)
client.close()


三、带协议的通信

网络通信需要定义消息边界,常见方案:

3.1 固定长度头部协议

import struct
import json

class MessageProtocol:
"""消息格式:4字节长度头 + JSON消息体"""
HEADER_SIZE = 4

@staticmethod
def encode(data: dict) -> bytes:
body = json.dumps(data).encode('utf-8')
header = struct.pack('!I', len(body)) # 大端序4字节无符号整数
return header + body

@staticmethod
def decode(socket_conn) -> dict:
# 读取头部
header = MessageProtocol._recv_exact(socket_conn, MessageProtocol.HEADER_SIZE)
if not header:
return None
body_length = struct.unpack('!I', header)[0]

# 读取消息体
body = MessageProtocol._recv_exact(socket_conn, body_length)
return json.loads(body.decode('utf-8'))

@staticmethod
def _recv_exact(conn, length):
"""确保接收指定长度的数据"""
data = b''
while len(data) < length:
chunk = conn.recv(length - len(data))
if not chunk:
return None
data += chunk
return data

3.2 使用协议的服务器

class ProtocolServer:
def __init__(self, host='0.0.0.0', port=9000):
self.host = host
self.port = port
self.handlers = {}

def register_handler(self, action, handler):
self.handlers[action] = handler

def start(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(5)
print(f"协议服务器启动在 {self.host}:{self.port}")

while True:
conn, addr = server.accept()
threading.Thread(target=self._handle, args=(conn, addr)).start()

def _handle(self, conn, addr):
try:
while True:
message = MessageProtocol.decode(conn)
if message is None:
break

action = message.get('action')
handler = self.handlers.get(action)

if handler:
response = handler(message.get('data'))
else:
response = {'error': f'未知操作: {action}'}

conn.send(MessageProtocol.encode(response))
finally:
conn.close()

# 注册处理器
server = ProtocolServer()
server.register_handler('echo', lambda data: {'echo': data})
server.register_handler('time', lambda data: {'time': str(datetime.now())})


四、UDP通信

import socket

class UDPServer:
def __init__(self, host='0.0.0.0', port=8888):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((host, port))
print(f"UDP服务器启动在 {host}:{port}")

def run(self):
while True:
data, addr = self.socket.recvfrom(65535)
message = data.decode('utf-8')
print(f"收到来自 {addr}: {message}")
response = f"ACK: {message}"
self.socket.sendto(response.encode('utf-8'), addr)

class UDPClient:
def __init__(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(5.0) # 设置超时

def send(self, message, host='localhost', port=8888):
self.socket.sendto(message.encode('utf-8'), (host, port))
try:
data, addr = self.socket.recvfrom(65535)
return data.decode('utf-8')
except socket.timeout:
return None


五、select/poll多路复用

import select

class SelectServer:
"""使用select实现非阻塞IO多路复用"""
def __init__(self, host='0.0.0.0', port=8080):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setblocking(False)
self.server.bind((host, port))
self.server.listen(100)

self.inputs = [self.server]
self.outputs = []
self.message_queues = {}

def run(self):
print("Select服务器启动")
while self.inputs:
readable, writable, exceptional = select.select(
self.inputs, self.outputs, self.inputs, 1.0
)

for sock in readable:
if sock is self.server:
# 新连接
conn, addr = sock.accept()
conn.setblocking(False)
self.inputs.append(conn)
self.message_queues[conn] = []
else:
# 已有连接的数据
data = sock.recv(4096)
if data:
self.message_queues[sock].append(data)
if sock not in self.outputs:
self.outputs.append(sock)
else:
self._remove_connection(sock)

for sock in writable:
if self.message_queues.get(sock):
msg = self.message_queues[sock].pop(0)
sock.send(msg)
else:
self.outputs.remove(sock)

for sock in exceptional:
self._remove_connection(sock)

def _remove_connection(self, sock):
if sock in self.inputs:
self.inputs.remove(sock)
if sock in self.outputs:
self.outputs.remove(sock)
del self.message_queues[sock]
sock.close()


六、selectors高级接口

import selectors

class SelectorServer:
"""使用selectors模块(推荐方式)"""
def __init__(self, host='0.0.0.0', port=8080):
self.sel = selectors.DefaultSelector()
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setblocking(False)
self.server.bind((host, port))
self.server.listen(100)
self.sel.register(self.server, selectors.EVENT_READ, self._accept)

def _accept(self, sock, mask):
conn, addr = sock.accept()
conn.setblocking(False)
self.sel.register(conn, selectors.EVENT_READ, self._read)

def _read(self, conn, mask):
data = conn.recv(4096)
if data:
conn.send(data) # echo
else:
self.sel.unregister(conn)
conn.close()

def run(self):
print("Selector服务器启动")
while True:
events = self.sel.select(timeout=1)
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)


七、HTTP客户端实现

class SimpleHTTPClient:
"""手动实现HTTP/1.1客户端"""
def __init__(self):
self.socket = None

def request(self, method, url, headers=None, body=None):
from urllib.parse import urlparse

parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or 80
path = parsed.path or '/'

# 建立连接
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))

# 构建请求
request_line = f"{method} {path} HTTP/1.1\r\n"
default_headers = {
'Host': host,
'Connection': 'close',
'User-Agent': 'PythonSocket/1.0',
}
if headers:
default_headers.update(headers)
if body:
default_headers['Content-Length'] = str(len(body))

header_str = ''.join(f"{k}: {v}\r\n" for k, v in default_headers.items())
request = f"{request_line}{header_str}\r\n"

if body:
request += body

self.socket.send(request.encode('utf-8'))

# 接收响应
response = b''
while True:
chunk = self.socket.recv(4096)
if not chunk:
break
response += chunk

self.socket.close()
return self._parse_response(response)

def _parse_response(self, raw):
header_end = raw.find(b'\r\n\r\n')
header_part = raw[:header_end].decode('utf-8')
body = raw[header_end + 4:]

lines = header_part.split('\r\n')
status_line = lines[0]
status_code = int(status_line.split(' ')[1])

headers = {}
for line in lines[1:]:
key, value = line.split(': ', 1)
headers[key.lower()] = value

return {'status': status_code, 'headers': headers, 'body': body}


八、WebSocket简易实现

import hashlib
import base64

class WebSocketServer:
MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

def __init__(self, host='0.0.0.0', port=8765):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((host, port))
self.server.listen(5)

def _handshake(self, conn):
"""WebSocket握手"""
data = conn.recv(4096).decode('utf-8')
key = None
for line in data.split('\r\n'):
if 'Sec-WebSocket-Key' in line:
key = line.split(': ')[1].strip()
break

if not key:
return False

accept_key = base64.b64encode(
hashlib.sha1((key + self.MAGIC_STRING).encode()).digest()
).decode()

response = (
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Accept: {accept_key}\r\n\r\n"
)
conn.send(response.encode())
return True

def _decode_frame(self, data):
"""解码WebSocket帧"""
if len(data) < 2:
return None

opcode = data[0] & 0x0F
masked = data[1] & 0x80
payload_length = data[1] & 0x7F

offset = 2
if payload_length == 126:
payload_length = struct.unpack('!H', data[2:4])[0]
offset = 4
elif payload_length == 127:
payload_length = struct.unpack('!Q', data[2:10])[0]
offset = 10

if masked:
mask = data[offset:offset+4]
offset += 4
payload = bytearray(data[offset:offset+payload_length])
for i in range(len(payload)):
payload[i] ^= mask[i % 4]
else:
payload = data[offset:offset+payload_length]

return {'opcode': opcode, 'payload': bytes(payload)}

def _encode_frame(self, message):
"""编码WebSocket帧"""
payload = message.encode('utf-8')
frame = bytearray()
frame.append(0x81) # FIN + text opcode

length = len(payload)
if length < 126:
frame.append(length)
elif length < 65536:
frame.append(126)
frame.extend(struct.pack('!H', length))
else:
frame.append(127)
frame.extend(struct.pack('!Q', length))

frame.extend(payload)
return bytes(frame)


九、实用工具函数

def get_local_ip():
"""获取本机IP地址"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
return s.getsockname()[0]
finally:
s.close()

def port_scan(host, start_port, end_port, timeout=0.5):
"""端口扫描"""
open_ports = []
for port in range(start_port, end_port + 1):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
if result == 0:
open_ports.append(port)
sock.close()
return open_ports

def resolve_dns(hostname):
"""DNS解析"""
try:
ip = socket.gethostbyname(hostname)
all_ips = socket.gethostbyname_ex(hostname)
return {'ip': ip, 'all': all_ips}
except socket.gaierror as e:
return {'error': str(e)}


十、总结

网络编程要点:
1. TCP适合需要可靠传输的场景,UDP适合实时性要求高的场景
2. 必须处理消息边界问题(TCP是字节流,不保证消息完整性)
3. 生产环境使用selectors或asyncio处理并发连接
4. 注意设置超时,避免永久阻塞
5. 实际项目中优先使用高层库(requests、aiohttp、websockets)

http://www.gsyq.cn/news/1499275.html

相关文章:

  • 想用 Claude Fable 5?AWS Bedrock 用户得把数据交给 Anthropic 30 天,我看完蚌埠住了
  • 如果有一副眼镜,你打手语,它帮你“说”出来,有人需要吗?
  • 出海企业如何高效匹配全球市场调研供应商?
  • 拼多多代运营电话_拼多多代运营公司联系方式_杭州百推官方热线 13968060425 - 品牌榜中榜
  • 被忽略的“生命附属品”:脐带胎盘干细胞,解锁再生医学新可能
  • 眉山全屋定制衣柜品牌排行:实测维度对比解析 - 起跑123
  • 2026贵港防水补漏哪家靠谱?正规公司排名及避坑价格指南 - 苏易修缮
  • 2026年6月技术好的景区游乐设施直销厂家哪家权威,健身器材/景区游乐设施/游乐设备/篮球架,景区游乐设施生产厂家选哪家 - 品牌推荐师
  • Java Swing学生信息管理系统(带MySQL连接与完整CRUD功能)
  • 纯前端二维码 / 条码生成器:从协议拼装到批量 ZIP 下载完整拆解
  • DeepLocals v3.3.0 发布:打通知识库、微信与多模态文档处理的关键一步
  • 一个工业级无锁的C++队列
  • AI赋能学术提质:百考通AI助力高校课程论文高效合规创作
  • 广州黄金名表钻石一站式回收靠谱机构推荐(1) - 奢侈品回收
  • 【MATLAB+word】ZVS全桥移相控制系统设计
  • 原厂官方授权|北京和远科技获德国 fleXstructures IPS 全系列软件中国区代理商
  • 河北年产能领先铸钢厂排行:5家实力企业盘点 - 起跑123
  • 2026年海口GEO优化深度解析:权威内容构建的破局之道 - 环岛AI智推GEO系统
  • 北京闲置黄金首饰回收实测:奢二网大盘价减 3 元报价透明无扣费 - 讯息早知道
  • 如何零成本解锁Wand游戏修改器的全部高级功能?✨
  • 孩子夏天总过敏?别乱涂药膏!这几款中成药对症用,安全又管用
  • UniApp跨端开发实战:从核心语法到性能优化的工程化闭环
  • iOS 26.4越狱完整教程:安全解锁iPhone隐藏功能的终极指南
  • 2026防城港防水补漏哪家靠谱?正规公司排名及避坑价格指南 - 苏易修缮
  • MQTT服务器搭建(windows环境)
  • 2026年上海雨水PP模块工厂:海绵城市、雨水收集系统与蓄水模块制造商实力解析 - 品牌发掘
  • 海口黄金回收避坑实测篇|本地卖金正规辨别技巧与机构实测 - 薛定谔的梨花猫
  • 精工铸标杆 引领中国厨房水槽品质升级 - 玖叁鹿
  • 免费通达信数据接口:Python金融分析的终极解决方案
  • 2026钦州防水补漏哪家靠谱?正规公司排名及避坑价格指南 - 苏易修缮