Add a simple python3 webrtc signalling server

+ client for testing + protocol documentation
This commit is contained in:
Nirbheek Chauhan 2017-10-21 19:56:52 +05:30
parent 8d782e4460
commit 663ad7ba98
5 changed files with 381 additions and 0 deletions

View file

@ -0,0 +1,80 @@
# Terminology
### Client
A GStreamer-based application
### Browser
A JS application that runs in the browser and uses built-in browser webrtc APIs
### Peer
Any webrtc-using application that can participate in a call
### Signalling server
Basic websockets server implemented in Python that manages the peers list and shovels data between peers
# Overview
This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
# Peer registration and calling
Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
This protocol builds upon https://github.com/shanet/WebRTC-Example/
* Connect to the websocket server
* Send `HELLO <uid>` where `<uid>` is a string which will uniquely identify this peer
* Receive `HELLO`
* Any other message starting with `ERROR` is an error.
* To connect to a peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
* All further messages will be forwarded to the peer
* The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
* Closure of the server connection means the call has ended; either because the other peer ended it or went away
* To end the call, disconnect from the server. You may reconnect again whenever you wish.
# Negotiation
Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
The calling side must create an SDP offer and send it to the peer as a JSON object:
```json
{
"sdp": {
"sdp": "o=- [....]",
"type": "offer"
}
}
```
The callee must then reply with an answer:
```json
{
"sdp": {
"sdp": "o=- [....]",
"type": "answer"
}
}
```
ICE candidates must be exchanged similarly by exchanging JSON objects:
```json
{
"ice": {
"candidate": ...,
"sdpMLineIndex": ...,
...
}
}
```
Note that the structure of these is the same as that specified by the WebRTC spec.

View file

@ -0,0 +1,26 @@
## Overview
Read Protocol.md
## Dependencies
* Python 3
* pip3 install --user websockets
## Example usage
In three separate tabs, run consecutively:
```console
$ ./generate_certs.sh
$ ./simple-server.py
```
```console
$ ./client.py
Our uid is 'ws-test-client-8f63b9'
```
```console
$ ./client.py --call ws-test-client-8f63b9
```

85
webrtc/signalling/client.py Executable file
View file

@ -0,0 +1,85 @@
#!/usr/bin/env python3
#
# Test client for simple 1-1 call signalling server
#
# Copyright (C) 2017 Centricular Ltd.
#
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
#
import sys
import ssl
import json
import uuid
import asyncio
import websockets
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to')
parser.add_argument('--call', default=None, help='uid of peer to call')
options = parser.parse_args(sys.argv[1:])
SERVER_ADDR = options.url
CALLEE_ID = options.call
PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6]
sslctx = False
if SERVER_ADDR.startswith(('wss://', 'https://')):
sslctx = ssl.create_default_context()
# FIXME
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
def reply_sdp_ice(msg):
# Here we'd parse the incoming JSON message for ICE and SDP candidates
print("Got: " + msg)
reply = json.dumps({'sdp': 'reply sdp'})
print("Sent: " + reply)
return reply
def send_sdp_ice():
reply = json.dumps({'sdp': 'initial sdp'})
print("Sent: " + reply)
return reply
async def hello():
async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws:
await ws.send('HELLO ' + PEER_ID)
assert(await ws.recv() == 'HELLO')
# Initiate call if requested
if CALLEE_ID:
await ws.send('CALL {}'.format(CALLEE_ID))
# Receive messages
while True:
msg = await ws.recv()
if msg.startswith('ERROR'):
# On error, we bring down the webrtc pipeline, etc
print('{!r}, exiting'.format(msg))
return
if CALLEE_ID:
if msg == 'CALL_OK':
await ws.send(send_sdp_ice())
# Return so we don't have an infinite loop
return
else:
print('Unknown reply: {!r}, exiting'.format(msg))
return
else:
await ws.send(reply_sdp_ice(msg))
# Return so we don't have an infinite loop
return
print('Our uid is {!r}'.format(PEER_ID))
try:
asyncio.get_event_loop().run_until_complete(hello())
except websockets.exceptions.InvalidHandshake:
print('Invalid handshake: are you sure this is a websockets server?\n')
raise
except ssl.SSLError:
print('SSL Error: are you sure the server is using TLS?\n')
raise

View file

@ -0,0 +1,3 @@
#! /bin/bash
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes

View file

@ -0,0 +1,187 @@
#!/usr/bin/env python3
#
# Example 1-1 call signalling server
#
# Copyright (C) 2017 Centricular Ltd.
#
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
#
import os
import sys
import ssl
import logging
import asyncio
import websockets
import argparse
from concurrent.futures._base import TimeoutError
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--addr', default='0.0.0.0', help='Address to listen on')
parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
parser.add_argument('--cert-path', default=os.path.dirname(__file__))
options = parser.parse_args(sys.argv[1:])
ADDR_PORT = (options.addr, options.port)
KEEPALIVE_TIMEOUT = options.keepalive_timeout
# Format: {uid: (Peer WebSocketServerProtocol, remote_address)}
peers = dict()
# Format: {caller_uid: callee_uid,
# callee_uid: caller_uid}
# Bidirectional mapping between the two peers
sessions = dict()
############### Helper functions ###############
async def recv_msg_ping(ws, raddr):
'''
Wait for a message forever, and send a regular ping to prevent bad routers
from closing the connection.
'''
msg = None
while msg is None:
try:
msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT)
except TimeoutError:
print('Sending keepalive ping to {!r} in recv'.format(raddr))
await ws.ping()
return msg
async def disconnect(ws, peer_id):
'''
Remove @peer_id from the list of sessions and close our connection to it.
This informs the peer that the session and all calls have ended, and it
must reconnect.
'''
global sessions
if peer_id in sessions:
del sessions[peer_id]
# Close connection
if ws and ws.open:
# Don't care about errors
asyncio.ensure_future(ws.close(reason='hangup'))
async def remove_peer(uid):
if uid in sessions:
other_id = sessions[uid]
del sessions[uid]
print("Cleaned up {} session".format(uid))
if other_id in sessions:
del sessions[other_id]
print("Also cleaned up {} session".format(other_id))
# If there was a session with this peer, also
# close the connection to reset its state.
if other_id in peers:
print("Closing connection to {}".format(other_id))
wso, oaddr = peers[other_id]
del peers[other_id]
await wso.close()
if uid in peers:
ws, raddr = peers[uid]
del peers[uid]
await ws.close()
print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
############### Handler functions ###############
async def connection_handler(ws, peer_id):
global peers, sessions
raddr = ws.remote_address
peers[peer_id] = (ws, raddr)
print("Registered peer {!r} at {!r}".format(peer_id, raddr))
while True:
# Receive command, wait forever if necessary
msg = await recv_msg_ping(ws, raddr)
if msg.startswith('SESSION'):
print("{!r} command {!r}".format(peer_id, msg))
_, callee_id = msg.split(maxsplit=1)
if callee_id not in peers:
await ws.send('ERROR peer {!r} not found'.format(callee_id))
continue
if callee_id in sessions:
await ws.send('ERROR peer {!r} busy'.format(callee_id))
continue
await ws.send('SESSION_OK')
wsc = peers[callee_id][0]
print("Session from {!r} ({!r}) to {!r} ({!r})".format(peer_id, raddr, callee_id,
wsc.remote_address))
# Register call
sessions[peer_id] = callee_id
sessions[callee_id] = peer_id
# We're in a session, route message to connected peer
elif peer_id in sessions:
other_id = sessions[peer_id]
wso, oaddr = peers[other_id]
print("{} -> {}: {}".format(peer_id, other_id, msg))
await wso.send(msg)
async def hello_peer(ws):
'''
Exchange hello, register peer
'''
raddr = ws.remote_address
hello = await ws.recv()
hello, uid = hello.split(maxsplit=1)
if hello != 'HELLO':
await ws.close(code=1002, reason='invalid protocol')
raise Exception("Invalid hello from {!r}".format(raddr))
if not uid or uid in peers:
await ws.close(code=1002, reason='invalid peer uid')
raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
# Send back a HELLO
await ws.send('HELLO')
return uid
async def handler(ws, path):
'''
All incoming messages are handled here. @path is unused.
'''
raddr = ws.remote_address
print("Connected to {!r}".format(raddr))
peer_id = await hello_peer(ws)
try:
await connection_handler(ws, peer_id)
except websockets.ConnectionClosed:
print("Connection to peer {!r} closed, exiting handler".format(raddr))
finally:
await remove_peer(peer_id)
# Create an SSL context to be used by the websocket server
certpath = options.cert_path
print('Using TLS with keys in {!r}'.format(certpath))
if 'letsencrypt' in certpath:
chain_pem = os.path.join(certpath, 'fullchain.pem')
key_pem = os.path.join(certpath, 'privkey.pem')
else:
chain_pem = os.path.join(certpath, 'cert.pem')
key_pem = os.path.join(certpath, 'key.pem')
sslctx = ssl.create_default_context()
try:
sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
except FileNotFoundError:
print("Certificates not found, did you run generate_cert.sh?")
sys.exit(1)
# FIXME
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
print("Listening on https://{}:{}".format(*ADDR_PORT))
# Websocket server
wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx,
# Maximum number of messages that websockets will pop
# off the asyncio and OS buffers per connection. See:
# https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
max_queue=16)
logger = logging.getLogger('websockets.server')
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
asyncio.get_event_loop().run_until_complete(wsd)
asyncio.get_event_loop().run_forever()