mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-02-17 11:45:25 +00:00
simple-server: Add support for multi-party rooms
Also add a new room-client.py to test the protocol which is documented in Protocol.md
This commit is contained in:
parent
2db85c41cc
commit
d687ff3d91
3 changed files with 237 additions and 25 deletions
|
@ -20,7 +20,7 @@ Basic websockets server implemented in Python that manages the peers list and sh
|
||||||
|
|
||||||
This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
|
This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
|
||||||
|
|
||||||
### Peer registration and calling
|
### Peer registration
|
||||||
|
|
||||||
Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
|
Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
|
||||||
|
|
||||||
|
@ -31,13 +31,29 @@ This protocol builds upon https://github.com/shanet/WebRTC-Example/
|
||||||
* Receive `HELLO`
|
* Receive `HELLO`
|
||||||
* Any other message starting with `ERROR` is an error.
|
* Any other message starting with `ERROR` is an error.
|
||||||
|
|
||||||
* To connect to a peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
|
### 1-1 calls with a 'session'
|
||||||
|
|
||||||
|
* To connect to a single peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
|
||||||
* All further messages will be forwarded to the peer
|
* All further messages will be forwarded to the peer
|
||||||
* The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
|
* The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
|
||||||
|
|
||||||
* Closure of the server connection means the call has ended; either because the other peer ended it or went away
|
* Closure of the server connection means the call has ended; either because the other peer ended it or went away
|
||||||
* To end the call, disconnect from the server. You may reconnect again whenever you wish.
|
* To end the call, disconnect from the server. You may reconnect again whenever you wish.
|
||||||
|
|
||||||
|
### Multi-party calls with a 'room'
|
||||||
|
|
||||||
|
* To create a multi-party call, you must first register (or join) a room. Send `ROOM <room_id>` where `<room_id>` is a unique room name
|
||||||
|
* Receive `ROOM_OK ` from the server if this is a new room, or `ROOM_OK <peer1_id> <peer2_id> ...` where `<peerN_id>` are unique identifiers for the peers already in the room
|
||||||
|
* To send messages to a specific peer within the room for call negotiation (or any other purpose, use `ROOM_PEER_MSG <peer_id> <msg>`
|
||||||
|
* When a new peer joins the room, you will receive a `ROOM_PEER_JOINED <peer_id>` message
|
||||||
|
- For the purposes of convention and to avoid overwhelming newly-joined peers, offers must only be sent by the newly-joined peer
|
||||||
|
* When a peer leaves the room, you will receive a `ROOM_PEER_LEFT <peer_id>` message
|
||||||
|
- You should stop sending/receiving media from/to this peer
|
||||||
|
* To get a list of all peers currently in the room, send `ROOM_PEER_LIST` and receive `ROOM_PEER_LIST <peer1_id> ...`
|
||||||
|
- This list will never contain your own `<uid>`
|
||||||
|
- In theory you should never need to use this since you are guaranteed to receive JOINED and LEFT messages for all peers in a room
|
||||||
|
* You may stay connected to a room for as long as you like
|
||||||
|
|
||||||
### Negotiation
|
### Negotiation
|
||||||
|
|
||||||
Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
|
Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
|
||||||
|
|
107
webrtc/signalling/room-client.py
Executable file
107
webrtc/signalling/room-client.py
Executable file
|
@ -0,0 +1,107 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# Test client for simple room-based multi-peer p2p calling
|
||||||
|
#
|
||||||
|
# Copyright (C) 2017 Centricular Ltd.
|
||||||
|
#
|
||||||
|
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
|
||||||
|
#
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import ssl
|
||||||
|
import json
|
||||||
|
import uuid
|
||||||
|
import asyncio
|
||||||
|
import websockets
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
|
parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to')
|
||||||
|
parser.add_argument('--room', default=None, help='the room to join')
|
||||||
|
|
||||||
|
options = parser.parse_args(sys.argv[1:])
|
||||||
|
|
||||||
|
SERVER_ADDR = options.url
|
||||||
|
PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6]
|
||||||
|
ROOM_ID = options.room
|
||||||
|
if ROOM_ID is None:
|
||||||
|
print('--room argument is required')
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
sslctx = False
|
||||||
|
if SERVER_ADDR.startswith(('wss://', 'https://')):
|
||||||
|
sslctx = ssl.create_default_context()
|
||||||
|
# FIXME
|
||||||
|
sslctx.check_hostname = False
|
||||||
|
sslctx.verify_mode = ssl.CERT_NONE
|
||||||
|
|
||||||
|
def get_answer_sdp(offer, peer_id):
|
||||||
|
# Here we'd parse the incoming JSON message for ICE and SDP candidates
|
||||||
|
print("Got: " + offer)
|
||||||
|
sdp = json.dumps({'sdp': 'reply sdp'})
|
||||||
|
answer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp)
|
||||||
|
print("Sent: " + answer)
|
||||||
|
return answer
|
||||||
|
|
||||||
|
def get_offer_sdp(peer_id):
|
||||||
|
sdp = json.dumps({'sdp': 'initial sdp'})
|
||||||
|
offer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp)
|
||||||
|
print("Sent: " + offer)
|
||||||
|
return offer
|
||||||
|
|
||||||
|
async def hello():
|
||||||
|
async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws:
|
||||||
|
await ws.send('HELLO ' + PEER_ID)
|
||||||
|
assert(await ws.recv() == 'HELLO')
|
||||||
|
|
||||||
|
await ws.send('ROOM {}'.format(ROOM_ID))
|
||||||
|
|
||||||
|
sent_offers = set()
|
||||||
|
# Receive messages
|
||||||
|
while True:
|
||||||
|
msg = await ws.recv()
|
||||||
|
if msg.startswith('ERROR'):
|
||||||
|
# On error, we bring down the webrtc pipeline, etc
|
||||||
|
print('{!r}, exiting'.format(msg))
|
||||||
|
return
|
||||||
|
if msg.startswith('ROOM_OK'):
|
||||||
|
print('Got ROOM_OK for room {!r}'.format(ROOM_ID))
|
||||||
|
_, *room_peers = msg.split()
|
||||||
|
for peer_id in room_peers:
|
||||||
|
print('Sending offer to {!r}'.format(peer_id))
|
||||||
|
# Create a peer connection for each peer and start
|
||||||
|
# exchanging SDP and ICE candidates
|
||||||
|
await ws.send(get_offer_sdp(peer_id))
|
||||||
|
sent_offers.add(peer_id)
|
||||||
|
continue
|
||||||
|
elif msg.startswith('ROOM_PEER'):
|
||||||
|
if msg.startswith('ROOM_PEER_JOINED'):
|
||||||
|
_, peer_id = msg.split(maxsplit=1)
|
||||||
|
print('Peer {!r} joined the room'.format(peer_id))
|
||||||
|
# Peer will send us an offer
|
||||||
|
continue
|
||||||
|
if msg.startswith('ROOM_PEER_LEFT'):
|
||||||
|
_, peer_id = msg.split(maxsplit=1)
|
||||||
|
print('Peer {!r} left the room'.format(peer_id))
|
||||||
|
continue
|
||||||
|
elif msg.startswith('ROOM_PEER_MSG'):
|
||||||
|
_, peer_id, msg = msg.split(maxsplit=2)
|
||||||
|
if peer_id in sent_offers:
|
||||||
|
print('Got answer from {!r}: {}'.format(peer_id, msg))
|
||||||
|
continue
|
||||||
|
print('Got offer from {!r}, replying'.format(peer_id))
|
||||||
|
await ws.send(get_answer_sdp(msg, peer_id))
|
||||||
|
continue
|
||||||
|
print('Unknown msg: {!r}, exiting'.format(msg))
|
||||||
|
return
|
||||||
|
|
||||||
|
print('Our uid is {!r}'.format(PEER_ID))
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.get_event_loop().run_until_complete(hello())
|
||||||
|
except websockets.exceptions.InvalidHandshake:
|
||||||
|
print('Invalid handshake: are you sure this is a websockets server?\n')
|
||||||
|
raise
|
||||||
|
except ssl.SSLError:
|
||||||
|
print('SSL Error: are you sure the server is using TLS?\n')
|
||||||
|
raise
|
|
@ -28,12 +28,19 @@ options = parser.parse_args(sys.argv[1:])
|
||||||
ADDR_PORT = (options.addr, options.port)
|
ADDR_PORT = (options.addr, options.port)
|
||||||
KEEPALIVE_TIMEOUT = options.keepalive_timeout
|
KEEPALIVE_TIMEOUT = options.keepalive_timeout
|
||||||
|
|
||||||
# Format: {uid: (Peer WebSocketServerProtocol, remote_address)}
|
############### Global data ###############
|
||||||
|
|
||||||
|
# Format: {uid: (Peer WebSocketServerProtocol,
|
||||||
|
# remote_address,
|
||||||
|
# <'session'|room_id|None>)}
|
||||||
peers = dict()
|
peers = dict()
|
||||||
# Format: {caller_uid: callee_uid,
|
# Format: {caller_uid: callee_uid,
|
||||||
# callee_uid: caller_uid}
|
# callee_uid: caller_uid}
|
||||||
# Bidirectional mapping between the two peers
|
# Bidirectional mapping between the two peers
|
||||||
sessions = dict()
|
sessions = dict()
|
||||||
|
# Format: {room_id: {peer1_id, peer2_id, peer3_id, ...}}
|
||||||
|
# Room dict with a set of peers in each room
|
||||||
|
rooms = dict()
|
||||||
|
|
||||||
############### Helper functions ###############
|
############### Helper functions ###############
|
||||||
|
|
||||||
|
@ -65,7 +72,7 @@ async def disconnect(ws, peer_id):
|
||||||
# Don't care about errors
|
# Don't care about errors
|
||||||
asyncio.ensure_future(ws.close(reason='hangup'))
|
asyncio.ensure_future(ws.close(reason='hangup'))
|
||||||
|
|
||||||
async def remove_peer(uid):
|
async def cleanup_session(uid):
|
||||||
if uid in sessions:
|
if uid in sessions:
|
||||||
other_id = sessions[uid]
|
other_id = sessions[uid]
|
||||||
del sessions[uid]
|
del sessions[uid]
|
||||||
|
@ -77,47 +84,129 @@ async def remove_peer(uid):
|
||||||
# close the connection to reset its state.
|
# close the connection to reset its state.
|
||||||
if other_id in peers:
|
if other_id in peers:
|
||||||
print("Closing connection to {}".format(other_id))
|
print("Closing connection to {}".format(other_id))
|
||||||
wso, oaddr = peers[other_id]
|
wso, oaddr, _ = peers[other_id]
|
||||||
del peers[other_id]
|
del peers[other_id]
|
||||||
await wso.close()
|
await wso.close()
|
||||||
|
|
||||||
|
async def cleanup_room(uid, room_id):
|
||||||
|
room_peers = rooms[room_id]
|
||||||
|
if uid not in room_peers:
|
||||||
|
return
|
||||||
|
room_peers.remove(uid)
|
||||||
|
for pid in room_peers:
|
||||||
|
wsp, paddr, _ = peers[pid]
|
||||||
|
msg = 'ROOM_PEER_LEFT {}'.format(uid)
|
||||||
|
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
|
||||||
|
await wsp.send(msg)
|
||||||
|
|
||||||
|
async def remove_peer(uid):
|
||||||
|
await cleanup_session(uid)
|
||||||
if uid in peers:
|
if uid in peers:
|
||||||
ws, raddr = peers[uid]
|
ws, raddr, status = peers[uid]
|
||||||
|
if status and status != 'session':
|
||||||
|
await cleanup_room(uid, status)
|
||||||
del peers[uid]
|
del peers[uid]
|
||||||
await ws.close()
|
await ws.close()
|
||||||
print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
|
print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
|
||||||
|
|
||||||
############### Handler functions ###############
|
############### Handler functions ###############
|
||||||
|
|
||||||
async def connection_handler(ws, peer_id):
|
async def connection_handler(ws, uid):
|
||||||
global peers, sessions
|
global peers, sessions, rooms
|
||||||
raddr = ws.remote_address
|
raddr = ws.remote_address
|
||||||
peers[peer_id] = (ws, raddr)
|
peer_status = None
|
||||||
print("Registered peer {!r} at {!r}".format(peer_id, raddr))
|
peers[uid] = [ws, raddr, peer_status]
|
||||||
|
print("Registered peer {!r} at {!r}".format(uid, raddr))
|
||||||
while True:
|
while True:
|
||||||
# Receive command, wait forever if necessary
|
# Receive command, wait forever if necessary
|
||||||
msg = await recv_msg_ping(ws, raddr)
|
msg = await recv_msg_ping(ws, raddr)
|
||||||
if msg.startswith('SESSION'):
|
# Update current status
|
||||||
print("{!r} command {!r}".format(peer_id, msg))
|
peer_status = peers[uid][2]
|
||||||
|
# We are in a session or a room, messages must be relayed
|
||||||
|
if peer_status is not None:
|
||||||
|
# We're in a session, route message to connected peer
|
||||||
|
if peer_status == 'session':
|
||||||
|
other_id = sessions[uid]
|
||||||
|
wso, oaddr, status = peers[other_id]
|
||||||
|
assert(status == 'session')
|
||||||
|
print("{} -> {}: {}".format(uid, other_id, msg))
|
||||||
|
await wso.send(msg)
|
||||||
|
# We're in a room, accept room-specific commands
|
||||||
|
elif peer_status:
|
||||||
|
# ROOM_PEER_MSG peer_id MSG
|
||||||
|
if msg.startswith('ROOM_PEER_MSG'):
|
||||||
|
_, other_id, msg = msg.split(maxsplit=2)
|
||||||
|
if other_id not in peers:
|
||||||
|
await ws.send('ERROR peer {!r} not found'
|
||||||
|
''.format(other_id))
|
||||||
|
continue
|
||||||
|
wso, oaddr, status = peers[other_id]
|
||||||
|
if status != room_id:
|
||||||
|
await ws.send('ERROR peer {!r} is not in the room'
|
||||||
|
''.format(other_id))
|
||||||
|
continue
|
||||||
|
msg = 'ROOM_PEER_MSG {} {}'.format(uid, msg)
|
||||||
|
print('room {}: {} -> {}: {}'.format(room_id, uid, other_id, msg))
|
||||||
|
await wso.send(msg)
|
||||||
|
elif msg == 'ROOM_PEER_LIST':
|
||||||
|
room_id = peers[peer_id][2]
|
||||||
|
room_peers = ' '.join([pid for pid in rooms[room_id] if pid != peer_id])
|
||||||
|
msg = 'ROOM_PEER_LIST {}'.format(room_peers)
|
||||||
|
print('room {}: -> {}: {}'.format(room_id, uid, msg))
|
||||||
|
await ws.send(msg)
|
||||||
|
else:
|
||||||
|
await ws.send('ERROR invalid msg, already in room')
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise AssertionError('Unknown peer status {!r}'.format(peer_status))
|
||||||
|
# Requested a session with a specific peer
|
||||||
|
elif msg.startswith('SESSION'):
|
||||||
|
print("{!r} command {!r}".format(uid, msg))
|
||||||
_, callee_id = msg.split(maxsplit=1)
|
_, callee_id = msg.split(maxsplit=1)
|
||||||
if callee_id not in peers:
|
if callee_id not in peers:
|
||||||
await ws.send('ERROR peer {!r} not found'.format(callee_id))
|
await ws.send('ERROR peer {!r} not found'.format(callee_id))
|
||||||
continue
|
continue
|
||||||
if callee_id in sessions:
|
if peer_status is not None:
|
||||||
await ws.send('ERROR peer {!r} busy'.format(callee_id))
|
await ws.send('ERROR peer {!r} busy'.format(callee_id))
|
||||||
continue
|
continue
|
||||||
await ws.send('SESSION_OK')
|
await ws.send('SESSION_OK')
|
||||||
wsc = peers[callee_id][0]
|
wsc = peers[callee_id][0]
|
||||||
print("Session from {!r} ({!r}) to {!r} ({!r})".format(peer_id, raddr, callee_id,
|
print('Session from {!r} ({!r}) to {!r} ({!r})'
|
||||||
wsc.remote_address))
|
''.format(uid, raddr, callee_id, wsc.remote_address))
|
||||||
# Register call
|
# Register session
|
||||||
sessions[peer_id] = callee_id
|
peers[uid][2] = peer_status = 'session'
|
||||||
sessions[callee_id] = peer_id
|
sessions[uid] = callee_id
|
||||||
# We're in a session, route message to connected peer
|
peers[callee_id][2] = 'session'
|
||||||
elif peer_id in sessions:
|
sessions[callee_id] = uid
|
||||||
other_id = sessions[peer_id]
|
# Requested joining or creation of a room
|
||||||
wso, oaddr = peers[other_id]
|
elif msg.startswith('ROOM'):
|
||||||
print("{} -> {}: {}".format(peer_id, other_id, msg))
|
print('{!r} command {!r}'.format(uid, msg))
|
||||||
await wso.send(msg)
|
_, room_id = msg.split(maxsplit=1)
|
||||||
|
# Room name cannot be 'session', empty, or contain whitespace
|
||||||
|
if room_id == 'session' or room_id.split() != [room_id]:
|
||||||
|
await ws.send('ERROR invalid room id {!r}'.format(room_id))
|
||||||
|
continue
|
||||||
|
if room_id in rooms:
|
||||||
|
if uid in rooms[room_id]:
|
||||||
|
raise AssertionError('How did we accept a ROOM command '
|
||||||
|
'despite already being in a room?')
|
||||||
|
else:
|
||||||
|
# Create room if required
|
||||||
|
rooms[room_id] = set()
|
||||||
|
room_peers = ' '.join([pid for pid in rooms[room_id]])
|
||||||
|
await ws.send('ROOM_OK {}'.format(room_peers))
|
||||||
|
# Enter room
|
||||||
|
peers[uid][2] = peer_status = room_id
|
||||||
|
rooms[room_id].add(uid)
|
||||||
|
for pid in rooms[room_id]:
|
||||||
|
if pid == uid:
|
||||||
|
continue
|
||||||
|
wsp, paddr, _ = peers[pid]
|
||||||
|
msg = 'ROOM_PEER_JOINED {}'.format(uid)
|
||||||
|
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
|
||||||
|
await wsp.send(msg)
|
||||||
|
else:
|
||||||
|
print('Ignoring unknown message {!r} from {!r}'.format(msg, uid))
|
||||||
|
|
||||||
async def hello_peer(ws):
|
async def hello_peer(ws):
|
||||||
'''
|
'''
|
||||||
|
@ -129,7 +218,7 @@ async def hello_peer(ws):
|
||||||
if hello != 'HELLO':
|
if hello != 'HELLO':
|
||||||
await ws.close(code=1002, reason='invalid protocol')
|
await ws.close(code=1002, reason='invalid protocol')
|
||||||
raise Exception("Invalid hello from {!r}".format(raddr))
|
raise Exception("Invalid hello from {!r}".format(raddr))
|
||||||
if not uid or uid in peers:
|
if not uid or uid in peers or uid.split() != [uid]: # no whitespace
|
||||||
await ws.close(code=1002, reason='invalid peer uid')
|
await ws.close(code=1002, reason='invalid peer uid')
|
||||||
raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
|
raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
|
||||||
# Send back a HELLO
|
# Send back a HELLO
|
||||||
|
|
Loading…
Reference in a new issue