【无标题】关于 webrtc P2P 音视频通话前端flutter后端go
go简单信令服务
package main import ( "encoding/json" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" ) type SignalMessage map[string]interface{} type Client struct { UserID string Conn *websocket.Conn Send chan []byte } var ( clients = make(map[string]*Client) // userId -> client clientsMu sync.RWMutex upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // DEMO: 允许所有来源;生产必须校验 Origin CheckOrigin: func(r *http.Request) bool { return true }, } ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxMessageSize = 1024 * 64 ) func main() { http.HandleFunc("/ws", wsHandler) http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) addr := ":8080" log.Printf("signal server started at ws://0.0.0.0%s/ws", addr) log.Fatal(http.ListenAndServe(addr, nil)) } func wsHandler(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("upgrade error:", err) return } client := &Client{ Conn: conn, Send: make(chan []byte, 256), } go client.writePump() client.readPump() } func (c *Client) readPump() { defer func() { c.cleanup() }() c.Conn.SetReadLimit(maxMessageSize) _ = c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { return c.Conn.SetReadDeadline(time.Now().Add(pongWait)) }) for { _, data, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("read error user=%s err=%v", c.UserID, err) } break } var msg SignalMessage if err := json.Unmarshal(data, &msg); err != nil { log.Printf("invalid json: %v", err) continue } msgType, _ := msg["type"].(string) switch msgType { case "login": // {type:"login", from:"alice"} from, _ := msg["from"].(string) if from == "" { c.sendJSON(SignalMessage{ "type": "error", "error": "login.from required", }) continue } c.bindUser(from) c.sendJSON(SignalMessage{ "type": "login_ok", "from": "server", "to": from, }) default: // 透传:必须有 to to, _ := msg["to"].(string) if to == "" { c.sendJSON(SignalMessage{ "type": "error", "error": "field 'to' required", }) continue } forwardToUser(to, data) } } } func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() _ = c.Conn.Close() }() for { select { case message, ok := <-c.Send: _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { _ = c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil { return } case <-ticker.C: _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } func (c *Client) bindUser(userID string) { c.UserID = userID clientsMu.Lock() defer clientsMu.Unlock() // 挤掉旧连接(同一账号后登录) if old, ok := clients[userID]; ok && old != c { close(old.Send) _ = old.Conn.Close() } clients[userID] = c log.Printf("user online: %s", userID) } func (c *Client) cleanup() { if c.UserID != "" { clientsMu.Lock() if cur, ok := clients[c.UserID]; ok && cur == c { delete(clients, c.UserID) log.Printf("user offline: %s", c.UserID) } clientsMu.Unlock() } close(c.Send) _ = c.Conn.Close() } func (c *Client) sendJSON(v interface{}) { b, err := json.Marshal(v) if err != nil { return } select { case c.Send <- b: default: // 发送队列满,断开慢连接 close(c.Send) } } func forwardToUser(to string, raw []byte) { clientsMu.RLock() target, ok := clients[to] clientsMu.RUnlock() if !ok { // 对端不在线可忽略,或按需做离线通知 return } select { case target.Send <- raw: default: // 对端过慢,断开 close(target.Send) } }flutter客户端调试
import 'dart:convert'; import 'package:flutter/material.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; //https://28xin.com/ 藏宝库 void main() { runApp(const MyApp()); } /// ===================== /// 你的信令服务器地址 /// ===================== const String kSignalingUrl = 'ws://YOUR_SIGNAL_SERVER:8080/ws'; const String kSelfId = 'alice'; // 当前用户ID,实际项目请登录后动态赋值 const String kPeerId = 'bob'; // 对端用户ID,实际项目从通讯录/会话选择 class MyApp extends StatelessWidget { const MyApp({super.key}); @override Widget build(BuildContext context) { return MaterialApp( title: 'Flutter WebRTC P2P Demo', theme: ThemeData(useMaterial3: true, colorSchemeSeed: Colors.blue), home: const CallPage(), ); } } enum CallState { idle, calling, ringing, connecting, connected, ended, } class CallPage extends StatefulWidget { const CallPage({super.key}); @override State<CallPage> createState() => _CallPageState(); } class _CallPageState extends State<CallPage> { final RTCVideoRenderer _localRenderer = RTCVideoRenderer(); final RTCVideoRenderer _remoteRenderer = RTCVideoRenderer(); WebSocketChannel? _ws; RTCPeerConnection? _pc; MediaStream? _localStream; CallState _state = CallState.idle; bool _micEnabled = true; bool _camEnabled = true; final List<RTCIceCandidate> _remoteCandidatesBuffer = []; // 生产请使用你自己的 STUN/TURN final Map<String, dynamic> _pcConfig = { 'iceServers': [ {'urls': 'stun:stun.l.google.com:19302'}, // { // 'urls': 'turn:your.turn.server:3478?transport=udp', // 'username': 'user', // 'credential': 'pass', // }, // { // 'urls': 'turns:your.turn.server:5349?transport=tcp', // 'username': 'user', // 'credential': 'pass', // } ], 'sdpSemantics': 'unified-plan', }; @override void initState() { super.initState(); _initAll(); } Future<void> _initAll() async { await _localRenderer.initialize(); await _remoteRenderer.initialize(); await _connectSignal(); await _createLocalStream(); } Future<void> _connectSignal() async { _ws = WebSocketChannel.connect(Uri.parse(kSignalingUrl)); _ws!.stream.listen((event) async { final Map<String, dynamic> msg = jsonDecode(event); await _onSignal(msg); }, onDone: () { debugPrint('signal closed'); }, onError: (e) { debugPrint('signal error: $e'); }); _send({ 'type': 'login', 'from': kSelfId, }); } Future<void> _createLocalStream() async { final mediaConstraints = <String, dynamic>{ 'audio': true, 'video': { 'facingMode': 'user', 'width': {'ideal': 1280}, 'height': {'ideal': 720}, 'frameRate': {'ideal': 24}, } }; _localStream = await navigator.mediaDevices.getUserMedia(mediaConstraints); _localRenderer.srcObject = _localStream; setState(() {}); } Future<void> _createPeerConnectionIfNeeded() async { if (_pc != null) return; _pc = await createPeerConnection(_pcConfig); // 添加本地轨道 if (_localStream != null) { for (var track in _localStream!.getTracks()) { await _pc!.addTrack(track, _localStream!); } } _pc!.onIceCandidate = (candidate) { _send({ 'type': 'webrtc_candidate', 'from': kSelfId, 'to': kPeerId, 'candidate': candidate.toMap(), }); }; _pc!.onTrack = (event) { if (event.streams.isNotEmpty) { _remoteRenderer.srcObject = event.streams.first; setState(() {}); } }; _pc!.onConnectionState = (state) { debugPrint('pc connection state: $state'); if (state == RTCPeerConnectionState.RTCPeerConnectionStateConnected) { setState(() => _state = CallState.connected); } if (state == RTCPeerConnectionState.RTCPeerConnectionStateFailed || state == RTCPeerConnectionState.RTCPeerConnectionStateDisconnected || state == RTCPeerConnectionState.RTCPeerConnectionStateClosed) { setState(() => _state = CallState.ended); } }; } Future<void> startCall() async { await _createPeerConnectionIfNeeded(); setState(() => _state = CallState.calling); _send({ 'type': 'call_invite', 'from': kSelfId, 'to': kPeerId, 'media': 'video', }); final offer = await _pc!.createOffer({ 'offerToReceiveAudio': 1, 'offerToReceiveVideo': 1, }); await _pc!.setLocalDescription(offer); _send({ 'type': 'webrtc_offer', 'from': kSelfId, 'to': kPeerId, 'sdp': offer.sdp, }); setState(() => _state = CallState.connecting); } Future<void> _acceptCallAndAnswer(String offerSdp, String from) async { await _createPeerConnectionIfNeeded(); await _pc!.setRemoteDescription( RTCSessionDescription(offerSdp, 'offer'), ); for (final c in _remoteCandidatesBuffer) { await _pc!.addCandidate(c); } _remoteCandidatesBuffer.clear(); final answer = await _pc!.createAnswer({ 'offerToReceiveAudio': 1, 'offerToReceiveVideo': 1, }); await _pc!.setLocalDescription(answer); _send({ 'type': 'webrtc_answer', 'from': kSelfId, 'to': from, 'sdp': answer.sdp, }); setState(() => _state = CallState.connecting); } Future<void> _onSignal(Map<String, dynamic> msg) async { final type = msg['type']; switch (type) { case 'call_invite': // 收到来电 if (msg['to'] == kSelfId) { setState(() => _state = CallState.ringing); _showIncomingDialog(msg['from'] as String); } break; case 'call_reject': if (msg['to'] == kSelfId) { _showToast('对方已拒绝'); await _endCallLocal(); } break; case 'call_hangup': if (msg['to'] == kSelfId) { _showToast('对方已挂断'); await _endCallLocal(); } break; case 'webrtc_offer': if (msg['to'] == kSelfId) { final from = msg['from'] as String; final sdp = msg['sdp'] as String; // 自动接听(你也可以改成用户点击“接听”再执行) await _acceptCallAndAnswer(sdp, from); } break; case 'webrtc_answer': if (msg['to'] == kSelfId) { final sdp = msg['sdp'] as String; await _pc?.setRemoteDescription( RTCSessionDescription(sdp, 'answer'), ); } break; case 'webrtc_candidate': if (msg['to'] == kSelfId) { final c = msg['candidate']; final candidate = RTCIceCandidate( c['candidate'], c['sdpMid'], c['sdpMLineIndex'], ); final hasRemoteDesc = _pc?.getRemoteDescription() != null; if (hasRemoteDesc) { await _pc?.addCandidate(candidate); } else { _remoteCandidatesBuffer.add(candidate); } } break; } } void _send(Map<String, dynamic> data) { _ws?.sink.add(jsonEncode(data)); } Future<void> hangup() async { _send({ 'type': 'call_hangup', 'from': kSelfId, 'to': kPeerId, }); await _endCallLocal(); } Future<void> _endCallLocal() async { await _pc?.close(); _pc = null; _remoteRenderer.srcObject = null; _remoteCandidatesBuffer.clear(); setState(() => _state = CallState.ended); await Future.delayed(const Duration(milliseconds: 300)); if (mounted) setState(() => _state = CallState.idle); } void toggleMic() { if (_localStream == null) return; _micEnabled = !_micEnabled; for (var t in _localStream!.getAudioTracks()) { t.enabled = _micEnabled; } setState(() {}); } void toggleCamera() { if (_localStream == null) return; _camEnabled = !_camEnabled; for (var t in _localStream!.getVideoTracks()) { t.enabled = _camEnabled; } setState(() {}); } Future<void> switchCamera() async { final videoTracks = _localStream?.getVideoTracks(); if (videoTracks != null && videoTracks.isNotEmpty) { await Helper.switchCamera(videoTracks.first); } } void _showIncomingDialog(String from) { showDialog( context: context, barrierDismissible: false, builder: (_) => AlertDialog( title: const Text('来电'), content: Text('$from 邀请你视频通话'), actions: [ TextButton( onPressed: () { Navigator.pop(context); _send({ 'type': 'call_reject', 'from': kSelfId, 'to': from, }); setState(() => _state = CallState.idle); }, child: const Text('拒绝'), ), FilledButton( onPressed: () { Navigator.pop(context); // 真正 offer 在 webrtc_offer 里处理,这里只更新状态 setState(() => _state = CallState.connecting); }, child: const Text('接听'), ), ], ), ); } void _showToast(String text) { ScaffoldMessenger.of(context).showSnackBar(SnackBar(content: Text(text))); } @override void dispose() { _ws?.sink.close(); _pc?.close(); _localStream?.dispose(); _localRenderer.dispose(); _remoteRenderer.dispose(); super.dispose(); } @override Widget build(BuildContext context) { final remoteReady = _remoteRenderer.srcObject != null; return Scaffold( appBar: AppBar( title: Text('P2P视频通话 - ${_state.name}'), ), body: Stack( children: [ Positioned.fill( child: Container( color: Colors.black, child: remoteReady ? RTCVideoView( _remoteRenderer, objectFit: RTCVideoViewObjectFit.RTCVideoViewObjectFitCover, ) : const Center( child: Text( '等待远端视频…', style: TextStyle(color: Colors.white70), ), ), ), ), Positioned( right: 12, top: 12, width: 120, height: 180, child: Container( decoration: BoxDecoration( border: Border.all(color: Colors.white24), borderRadius: BorderRadius.circular(8), ), clipBehavior: Clip.antiAlias, child: RTCVideoView( _localRenderer, mirror: true, objectFit: RTCVideoViewObjectFit.RTCVideoViewObjectFitCover, ), ), ), ], ), bottomNavigationBar: SafeArea( child: Padding( padding: const EdgeInsets.fromLTRB(12, 8, 12, 12), child: Wrap( alignment: WrapAlignment.center, spacing: 12, runSpacing: 8, children: [ FilledButton.icon( onPressed: (_state == CallState.idle || _state == CallState.ended) ? startCall : null, icon: const Icon(Icons.call), label: const Text('发起'), ), FilledButton.tonalIcon( onPressed: (_state == CallState.connecting || _state == CallState.connected || _state == CallState.calling) ? hangup : null, icon: const Icon(Icons.call_end), label: const Text('挂断'), ), OutlinedButton.icon( onPressed: toggleMic, icon: Icon(_micEnabled ? Icons.mic : Icons.mic_off), label: Text(_micEnabled ? '麦克风开' : '麦克风关'), ), OutlinedButton.icon( onPressed: toggleCamera, icon: Icon(_camEnabled ? Icons.videocam : Icons.videocam_off), label: Text(_camEnabled ? '摄像头开' : '摄像头关'), ), OutlinedButton.icon( onPressed: switchCamera, icon: const Icon(Icons.cameraswitch), label: const Text('切换前后摄'), ), ], ), ), ), ); } }最近在研究P2P音视频通话,发现打洞成功率及低 综合成功率15-20%.
有什么能节省服务器中继带宽的其他方案吗?
详细测试环境报告 关于webrtcP2P音视频通话问题测试 - 藏宝库It社区
