mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-27 18:50:48 +00:00
tests: first pass at some basic browser tests
This commit is contained in:
parent
37cf0dffb5
commit
bc821a85d4
9 changed files with 505 additions and 293 deletions
|
@ -1,5 +1,6 @@
|
|||
project('gstwebrtc-demo', 'c',
|
||||
meson_version : '>= 0.48',
|
||||
license: 'BSD-2-Clause',
|
||||
default_options : [ 'warning_level=1',
|
||||
'buildtype=debug' ])
|
||||
|
||||
|
@ -24,5 +25,15 @@ libsoup_dep = dependency('libsoup-2.4', version : '>=2.48',
|
|||
json_glib_dep = dependency('json-glib-1.0',
|
||||
fallback : ['json-glib', 'json_glib_dep'])
|
||||
|
||||
|
||||
py3_mod = import('python3')
|
||||
py3 = py3_mod.find_python()
|
||||
|
||||
py3_version = py3_mod.language_version()
|
||||
if py3_version.version_compare('< 3.6')
|
||||
error('Could not find a sufficient python version required: 3.6, found {}'.format(py3_version))
|
||||
endif
|
||||
|
||||
subdir('multiparty-sendrecv')
|
||||
subdir('signalling')
|
||||
subdir('sendrecv')
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
executable('webrtc-sendrecv',
|
||||
'webrtc-sendrecv.c',
|
||||
dependencies : [gst_dep, gstsdp_dep, gstwebrtc_dep, libsoup_dep, json_glib_dep ])
|
||||
|
||||
webrtc_py = files('webrtc_sendrecv.py')
|
||||
|
||||
subdir('tests')
|
||||
|
|
144
webrtc/sendrecv/gst/tests/basic.py
Normal file
144
webrtc/sendrecv/gst/tests/basic.py
Normal file
|
@ -0,0 +1,144 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import unittest
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
|
||||
from selenium.webdriver.chrome.options import Options as COptions
|
||||
import webrtc_sendrecv as webrtc
|
||||
import simple_server as sserver
|
||||
import asyncio
|
||||
import threading
|
||||
import signal
|
||||
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
from gi.repository import Gst
|
||||
|
||||
thread = None
|
||||
stop = None
|
||||
server = None
|
||||
|
||||
class AsyncIOThread(threading.Thread):
|
||||
def __init__ (self, loop):
|
||||
threading.Thread.__init__(self)
|
||||
self.loop = loop
|
||||
|
||||
def run(self):
|
||||
asyncio.set_event_loop(self.loop)
|
||||
self.loop.run_forever()
|
||||
self.loop.close()
|
||||
print ("closed loop")
|
||||
|
||||
def stop_thread(self):
|
||||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||
|
||||
async def run_until(server, stop_token):
|
||||
async with server:
|
||||
await stop_token
|
||||
print ("run_until done")
|
||||
|
||||
def setUpModule():
|
||||
global thread, server
|
||||
Gst.init(None)
|
||||
cacerts_path = os.environ.get('TEST_CA_CERT_PATH')
|
||||
loop = asyncio.new_event_loop()
|
||||
|
||||
thread = AsyncIOThread(loop)
|
||||
thread.start()
|
||||
server = sserver.WebRTCSimpleServer('127.0.0.1', 8443, 20, False, cacerts_path)
|
||||
def f():
|
||||
global stop
|
||||
stop = asyncio.ensure_future(server.run())
|
||||
loop.call_soon_threadsafe(f)
|
||||
|
||||
def tearDownModule():
|
||||
global thread, stop
|
||||
stop.cancel()
|
||||
thread.stop_thread()
|
||||
thread.join()
|
||||
print("thread joined")
|
||||
|
||||
def valid_int(n):
|
||||
if isinstance(n, int):
|
||||
return True
|
||||
if isinstance(n, str):
|
||||
try:
|
||||
num = int(n)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
return False
|
||||
|
||||
def create_firefox_driver():
|
||||
capabilities = webdriver.DesiredCapabilities().FIREFOX.copy()
|
||||
capabilities['acceptSslCerts'] = True
|
||||
capabilities['acceptInsecureCerts'] = True
|
||||
profile = FirefoxProfile()
|
||||
profile.set_preference ('media.navigator.streams.fake', True)
|
||||
profile.set_preference ('media.navigator.permission.disabled', True)
|
||||
|
||||
return webdriver.Firefox(firefox_profile=profile, capabilities=capabilities)
|
||||
|
||||
def create_chrome_driver():
|
||||
capabilities = webdriver.DesiredCapabilities().CHROME.copy()
|
||||
capabilities['acceptSslCerts'] = True
|
||||
capabilities['acceptInsecureCerts'] = True
|
||||
copts = COptions()
|
||||
copts.add_argument('--allow-file-access-from-files')
|
||||
copts.add_argument('--use-fake-ui-for-media-stream')
|
||||
copts.add_argument('--use-fake-device-for-media-stream')
|
||||
copts.add_argument('--enable-blink-features=RTCUnifiedPlanByDefault')
|
||||
|
||||
return webdriver.Chrome(options=copts, desired_capabilities=capabilities)
|
||||
|
||||
class ServerConnectionTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.browser = create_firefox_driver()
|
||||
# self.browser = create_chrome_driver()
|
||||
self.addCleanup(self.browser.quit)
|
||||
self.html_source = os.environ.get('TEST_HTML_SOURCE')
|
||||
self.assertIsNot(self.html_source, None)
|
||||
self.assertNotEqual(self.html_source, '')
|
||||
self.html_source = 'file://' + self.html_source + '/index.html'
|
||||
|
||||
def get_peer_id(self):
|
||||
self.browser.get(self.html_source)
|
||||
peer_id = WebDriverWait(self.browser, 5).until(
|
||||
lambda x: x.find_element_by_id('peer-id'),
|
||||
message='Peer-id element was never seen'
|
||||
)
|
||||
WebDriverWait (self.browser, 5).until(
|
||||
lambda x: valid_int(peer_id.text),
|
||||
message='Peer-id never became a number'
|
||||
)
|
||||
return int(peer_id.text)
|
||||
|
||||
def testPeerID(self):
|
||||
self.get_peer_id()
|
||||
|
||||
def testPerformCall(self):
|
||||
loop = asyncio.new_event_loop()
|
||||
thread = AsyncIOThread(loop)
|
||||
thread.start()
|
||||
peer_id = self.get_peer_id()
|
||||
client = webrtc.WebRTCClient(peer_id + 1, peer_id, 'wss://127.0.0.1:8443')
|
||||
|
||||
async def do_things():
|
||||
await client.connect()
|
||||
async def stop_after(client, delay):
|
||||
await asyncio.sleep(delay)
|
||||
await client.stop()
|
||||
future = asyncio.ensure_future (stop_after (client, 5))
|
||||
res = await client.loop()
|
||||
thread.stop_thread()
|
||||
return res
|
||||
|
||||
res = asyncio.run_coroutine_threadsafe(do_things(), loop).result()
|
||||
thread.join()
|
||||
print ("client thread joined")
|
||||
self.assertEqual(res, 0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(verbosity=2)
|
15
webrtc/sendrecv/gst/tests/meson.build
Normal file
15
webrtc/sendrecv/gst/tests/meson.build
Normal file
|
@ -0,0 +1,15 @@
|
|||
tests = [
|
||||
['basic', 'basic.py'],
|
||||
]
|
||||
|
||||
test_deps = [certs]
|
||||
|
||||
foreach elem : tests
|
||||
test(elem.get(0),
|
||||
py3,
|
||||
depends: test_deps,
|
||||
args : files(elem.get(1)),
|
||||
env : ['PYTHONPATH=' + join_paths(meson.source_root(), 'sendrecv', 'gst') + ':' + join_paths(meson.source_root(), 'signalling'),
|
||||
'TEST_HTML_SOURCE=' + join_paths(meson.source_root(), 'sendrecv', 'js'),
|
||||
'TEST_CA_CERT_PATH=' + join_paths(meson.build_root(), 'signalling')])
|
||||
endforeach
|
|
@ -23,6 +23,8 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl
|
|||
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
|
||||
'''
|
||||
|
||||
from websockets.version import version as wsv
|
||||
|
||||
class WebRTCClient:
|
||||
def __init__(self, id_, peer_id, server):
|
||||
self.id_ = id_
|
||||
|
@ -32,10 +34,11 @@ class WebRTCClient:
|
|||
self.peer_id = peer_id
|
||||
self.server = server or 'wss://webrtc.nirbheek.in:8443'
|
||||
|
||||
|
||||
async def connect(self):
|
||||
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
||||
self.conn = await websockets.connect(self.server, ssl=sslctx)
|
||||
await self.conn.send('HELLO %d' % our_id)
|
||||
await self.conn.send('HELLO %d' % self.id_)
|
||||
|
||||
async def setup_call(self):
|
||||
await self.conn.send('SESSION {}'.format(self.peer_id))
|
||||
|
@ -46,6 +49,7 @@ class WebRTCClient:
|
|||
msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(self.conn.send(msg))
|
||||
loop.close()
|
||||
|
||||
def on_offer_created(self, promise, _, __):
|
||||
promise.wait()
|
||||
|
@ -64,6 +68,7 @@ class WebRTCClient:
|
|||
icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(self.conn.send(icemsg))
|
||||
loop.close()
|
||||
|
||||
def on_incoming_decodebin_stream(self, _, pad):
|
||||
if not pad.has_current_caps():
|
||||
|
@ -113,7 +118,7 @@ class WebRTCClient:
|
|||
self.webrtc.connect('pad-added', self.on_incoming_stream)
|
||||
self.pipe.set_state(Gst.State.PLAYING)
|
||||
|
||||
async def handle_sdp(self, message):
|
||||
def handle_sdp(self, message):
|
||||
assert (self.webrtc)
|
||||
msg = json.loads(message)
|
||||
if 'sdp' in msg:
|
||||
|
@ -133,6 +138,11 @@ class WebRTCClient:
|
|||
sdpmlineindex = ice['sdpMLineIndex']
|
||||
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
|
||||
|
||||
def close_pipeline(self):
|
||||
self.pipe.set_state(Gst.State.NULL)
|
||||
self.pipe = None
|
||||
self.webrtc = None
|
||||
|
||||
async def loop(self):
|
||||
assert self.conn
|
||||
async for message in self.conn:
|
||||
|
@ -142,11 +152,18 @@ class WebRTCClient:
|
|||
self.start_pipeline()
|
||||
elif message.startswith('ERROR'):
|
||||
print (message)
|
||||
self.close_pipeline()
|
||||
return 1
|
||||
else:
|
||||
await self.handle_sdp(message)
|
||||
self.handle_sdp(message)
|
||||
self.close_pipeline()
|
||||
return 0
|
||||
|
||||
async def stop(self):
|
||||
if self.conn:
|
||||
await self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
|
||||
def check_plugins():
|
||||
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
|
||||
|
@ -168,6 +185,7 @@ if __name__=='__main__':
|
|||
args = parser.parse_args()
|
||||
our_id = random.randrange(10, 10000)
|
||||
c = WebRTCClient(our_id, args.peerid, args.server)
|
||||
asyncio.get_event_loop().run_until_complete(c.connect())
|
||||
res = asyncio.get_event_loop().run_until_complete(c.loop())
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(c.connect())
|
||||
res = loop.run_until_complete(c.loop())
|
||||
sys.exit(res)
|
|
@ -1,3 +1,10 @@
|
|||
#! /bin/bash
|
||||
#! /bin/sh
|
||||
|
||||
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes
|
||||
BASE_DIR=$(dirname $0)
|
||||
|
||||
OUTDIR=""
|
||||
if [ $# -eq 1 ]; then
|
||||
OUTDIR=$1/
|
||||
fi
|
||||
|
||||
openssl req -x509 -newkey rsa:4096 -keyout ${OUTDIR}key.pem -out ${OUTDIR}cert.pem -days 365 -nodes -subj "/CN=example.com"
|
||||
|
|
8
webrtc/signalling/meson.build
Normal file
8
webrtc/signalling/meson.build
Normal file
|
@ -0,0 +1,8 @@
|
|||
generate_certs = find_program('generate_cert.sh')
|
||||
certs = custom_target(
|
||||
'generate-certs',
|
||||
command: [generate_certs, '@OUTDIR@'],
|
||||
output : ['key.pem', 'cert.pem']
|
||||
)
|
||||
|
||||
simple_server = files('simple_server.py')
|
|
@ -1,286 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# Example 1-1 call signalling server
|
||||
#
|
||||
# Copyright (C) 2017 Centricular Ltd.
|
||||
#
|
||||
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
|
||||
#
|
||||
|
||||
import os
|
||||
import sys
|
||||
import ssl
|
||||
import logging
|
||||
import asyncio
|
||||
import websockets
|
||||
import argparse
|
||||
import http
|
||||
|
||||
from concurrent.futures._base import TimeoutError
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
# See: host, port in https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_server
|
||||
parser.add_argument('--addr', default='', help='Address to listen on (default: all interfaces, both ipv4 and ipv6)')
|
||||
parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
|
||||
parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
|
||||
parser.add_argument('--cert-path', default=os.path.dirname(__file__))
|
||||
parser.add_argument('--disable-ssl', default=False, help='Disable ssl', action='store_true')
|
||||
parser.add_argument('--health', default='/health', help='Health check route')
|
||||
|
||||
options = parser.parse_args(sys.argv[1:])
|
||||
|
||||
ADDR_PORT = (options.addr, options.port)
|
||||
KEEPALIVE_TIMEOUT = options.keepalive_timeout
|
||||
|
||||
############### Global data ###############
|
||||
|
||||
# Format: {uid: (Peer WebSocketServerProtocol,
|
||||
# remote_address,
|
||||
# <'session'|room_id|None>)}
|
||||
peers = dict()
|
||||
# Format: {caller_uid: callee_uid,
|
||||
# callee_uid: caller_uid}
|
||||
# Bidirectional mapping between the two peers
|
||||
sessions = dict()
|
||||
# Format: {room_id: {peer1_id, peer2_id, peer3_id, ...}}
|
||||
# Room dict with a set of peers in each room
|
||||
rooms = dict()
|
||||
|
||||
############### Helper functions ###############
|
||||
|
||||
async def health_check(path, request_headers):
|
||||
if path == options.health:
|
||||
return http.HTTPStatus.OK, [], b"OK\n"
|
||||
|
||||
async def recv_msg_ping(ws, raddr):
|
||||
'''
|
||||
Wait for a message forever, and send a regular ping to prevent bad routers
|
||||
from closing the connection.
|
||||
'''
|
||||
msg = None
|
||||
while msg is None:
|
||||
try:
|
||||
msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT)
|
||||
except TimeoutError:
|
||||
print('Sending keepalive ping to {!r} in recv'.format(raddr))
|
||||
await ws.ping()
|
||||
return msg
|
||||
|
||||
async def disconnect(ws, peer_id):
|
||||
'''
|
||||
Remove @peer_id from the list of sessions and close our connection to it.
|
||||
This informs the peer that the session and all calls have ended, and it
|
||||
must reconnect.
|
||||
'''
|
||||
global sessions
|
||||
if peer_id in sessions:
|
||||
del sessions[peer_id]
|
||||
# Close connection
|
||||
if ws and ws.open:
|
||||
# Don't care about errors
|
||||
asyncio.ensure_future(ws.close(reason='hangup'))
|
||||
|
||||
async def cleanup_session(uid):
|
||||
if uid in sessions:
|
||||
other_id = sessions[uid]
|
||||
del sessions[uid]
|
||||
print("Cleaned up {} session".format(uid))
|
||||
if other_id in sessions:
|
||||
del sessions[other_id]
|
||||
print("Also cleaned up {} session".format(other_id))
|
||||
# If there was a session with this peer, also
|
||||
# close the connection to reset its state.
|
||||
if other_id in peers:
|
||||
print("Closing connection to {}".format(other_id))
|
||||
wso, oaddr, _ = peers[other_id]
|
||||
del peers[other_id]
|
||||
await wso.close()
|
||||
|
||||
async def cleanup_room(uid, room_id):
|
||||
room_peers = rooms[room_id]
|
||||
if uid not in room_peers:
|
||||
return
|
||||
room_peers.remove(uid)
|
||||
for pid in room_peers:
|
||||
wsp, paddr, _ = peers[pid]
|
||||
msg = 'ROOM_PEER_LEFT {}'.format(uid)
|
||||
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
|
||||
await wsp.send(msg)
|
||||
|
||||
async def remove_peer(uid):
|
||||
await cleanup_session(uid)
|
||||
if uid in peers:
|
||||
ws, raddr, status = peers[uid]
|
||||
if status and status != 'session':
|
||||
await cleanup_room(uid, status)
|
||||
del peers[uid]
|
||||
await ws.close()
|
||||
print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
|
||||
|
||||
############### Handler functions ###############
|
||||
|
||||
async def connection_handler(ws, uid):
|
||||
global peers, sessions, rooms
|
||||
raddr = ws.remote_address
|
||||
peer_status = None
|
||||
peers[uid] = [ws, raddr, peer_status]
|
||||
print("Registered peer {!r} at {!r}".format(uid, raddr))
|
||||
while True:
|
||||
# Receive command, wait forever if necessary
|
||||
msg = await recv_msg_ping(ws, raddr)
|
||||
# Update current status
|
||||
peer_status = peers[uid][2]
|
||||
# We are in a session or a room, messages must be relayed
|
||||
if peer_status is not None:
|
||||
# We're in a session, route message to connected peer
|
||||
if peer_status == 'session':
|
||||
other_id = sessions[uid]
|
||||
wso, oaddr, status = peers[other_id]
|
||||
assert(status == 'session')
|
||||
print("{} -> {}: {}".format(uid, other_id, msg))
|
||||
await wso.send(msg)
|
||||
# We're in a room, accept room-specific commands
|
||||
elif peer_status:
|
||||
# ROOM_PEER_MSG peer_id MSG
|
||||
if msg.startswith('ROOM_PEER_MSG'):
|
||||
_, other_id, msg = msg.split(maxsplit=2)
|
||||
if other_id not in peers:
|
||||
await ws.send('ERROR peer {!r} not found'
|
||||
''.format(other_id))
|
||||
continue
|
||||
wso, oaddr, status = peers[other_id]
|
||||
if status != room_id:
|
||||
await ws.send('ERROR peer {!r} is not in the room'
|
||||
''.format(other_id))
|
||||
continue
|
||||
msg = 'ROOM_PEER_MSG {} {}'.format(uid, msg)
|
||||
print('room {}: {} -> {}: {}'.format(room_id, uid, other_id, msg))
|
||||
await wso.send(msg)
|
||||
elif msg == 'ROOM_PEER_LIST':
|
||||
room_id = peers[peer_id][2]
|
||||
room_peers = ' '.join([pid for pid in rooms[room_id] if pid != peer_id])
|
||||
msg = 'ROOM_PEER_LIST {}'.format(room_peers)
|
||||
print('room {}: -> {}: {}'.format(room_id, uid, msg))
|
||||
await ws.send(msg)
|
||||
else:
|
||||
await ws.send('ERROR invalid msg, already in room')
|
||||
continue
|
||||
else:
|
||||
raise AssertionError('Unknown peer status {!r}'.format(peer_status))
|
||||
# Requested a session with a specific peer
|
||||
elif msg.startswith('SESSION'):
|
||||
print("{!r} command {!r}".format(uid, msg))
|
||||
_, callee_id = msg.split(maxsplit=1)
|
||||
if callee_id not in peers:
|
||||
await ws.send('ERROR peer {!r} not found'.format(callee_id))
|
||||
continue
|
||||
if peer_status is not None:
|
||||
await ws.send('ERROR peer {!r} busy'.format(callee_id))
|
||||
continue
|
||||
await ws.send('SESSION_OK')
|
||||
wsc = peers[callee_id][0]
|
||||
print('Session from {!r} ({!r}) to {!r} ({!r})'
|
||||
''.format(uid, raddr, callee_id, wsc.remote_address))
|
||||
# Register session
|
||||
peers[uid][2] = peer_status = 'session'
|
||||
sessions[uid] = callee_id
|
||||
peers[callee_id][2] = 'session'
|
||||
sessions[callee_id] = uid
|
||||
# Requested joining or creation of a room
|
||||
elif msg.startswith('ROOM'):
|
||||
print('{!r} command {!r}'.format(uid, msg))
|
||||
_, room_id = msg.split(maxsplit=1)
|
||||
# Room name cannot be 'session', empty, or contain whitespace
|
||||
if room_id == 'session' or room_id.split() != [room_id]:
|
||||
await ws.send('ERROR invalid room id {!r}'.format(room_id))
|
||||
continue
|
||||
if room_id in rooms:
|
||||
if uid in rooms[room_id]:
|
||||
raise AssertionError('How did we accept a ROOM command '
|
||||
'despite already being in a room?')
|
||||
else:
|
||||
# Create room if required
|
||||
rooms[room_id] = set()
|
||||
room_peers = ' '.join([pid for pid in rooms[room_id]])
|
||||
await ws.send('ROOM_OK {}'.format(room_peers))
|
||||
# Enter room
|
||||
peers[uid][2] = peer_status = room_id
|
||||
rooms[room_id].add(uid)
|
||||
for pid in rooms[room_id]:
|
||||
if pid == uid:
|
||||
continue
|
||||
wsp, paddr, _ = peers[pid]
|
||||
msg = 'ROOM_PEER_JOINED {}'.format(uid)
|
||||
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
|
||||
await wsp.send(msg)
|
||||
else:
|
||||
print('Ignoring unknown message {!r} from {!r}'.format(msg, uid))
|
||||
|
||||
async def hello_peer(ws):
|
||||
'''
|
||||
Exchange hello, register peer
|
||||
'''
|
||||
raddr = ws.remote_address
|
||||
hello = await ws.recv()
|
||||
hello, uid = hello.split(maxsplit=1)
|
||||
if hello != 'HELLO':
|
||||
await ws.close(code=1002, reason='invalid protocol')
|
||||
raise Exception("Invalid hello from {!r}".format(raddr))
|
||||
if not uid or uid in peers or uid.split() != [uid]: # no whitespace
|
||||
await ws.close(code=1002, reason='invalid peer uid')
|
||||
raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
|
||||
# Send back a HELLO
|
||||
await ws.send('HELLO')
|
||||
return uid
|
||||
|
||||
async def handler(ws, path):
|
||||
'''
|
||||
All incoming messages are handled here. @path is unused.
|
||||
'''
|
||||
raddr = ws.remote_address
|
||||
print("Connected to {!r}".format(raddr))
|
||||
peer_id = await hello_peer(ws)
|
||||
try:
|
||||
await connection_handler(ws, peer_id)
|
||||
except websockets.ConnectionClosed:
|
||||
print("Connection to peer {!r} closed, exiting handler".format(raddr))
|
||||
finally:
|
||||
await remove_peer(peer_id)
|
||||
|
||||
sslctx = None
|
||||
if not options.disable_ssl:
|
||||
# Create an SSL context to be used by the websocket server
|
||||
certpath = options.cert_path
|
||||
print('Using TLS with keys in {!r}'.format(certpath))
|
||||
if 'letsencrypt' in certpath:
|
||||
chain_pem = os.path.join(certpath, 'fullchain.pem')
|
||||
key_pem = os.path.join(certpath, 'privkey.pem')
|
||||
else:
|
||||
chain_pem = os.path.join(certpath, 'cert.pem')
|
||||
key_pem = os.path.join(certpath, 'key.pem')
|
||||
|
||||
sslctx = ssl.create_default_context()
|
||||
try:
|
||||
sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
|
||||
except FileNotFoundError:
|
||||
print("Certificates not found, did you run generate_cert.sh?")
|
||||
sys.exit(1)
|
||||
# FIXME
|
||||
sslctx.check_hostname = False
|
||||
sslctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
print("Listening on https://{}:{}".format(*ADDR_PORT))
|
||||
# Websocket server
|
||||
wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx, process_request=health_check,
|
||||
# Maximum number of messages that websockets will pop
|
||||
# off the asyncio and OS buffers per connection. See:
|
||||
# https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
|
||||
max_queue=16)
|
||||
|
||||
logger = logging.getLogger('websockets.server')
|
||||
|
||||
logger.setLevel(logging.ERROR)
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(wsd)
|
||||
asyncio.get_event_loop().run_forever()
|
291
webrtc/signalling/simple_server.py
Executable file
291
webrtc/signalling/simple_server.py
Executable file
|
@ -0,0 +1,291 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# Example 1-1 call signalling server
|
||||
#
|
||||
# Copyright (C) 2017 Centricular Ltd.
|
||||
#
|
||||
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
|
||||
#
|
||||
|
||||
import os
|
||||
import sys
|
||||
import ssl
|
||||
import logging
|
||||
import asyncio
|
||||
import websockets
|
||||
import argparse
|
||||
import http
|
||||
|
||||
from concurrent.futures._base import TimeoutError
|
||||
|
||||
class WebRTCSimpleServer(object):
|
||||
|
||||
def __init__(self, addr, port, keepalive_timeout, disable_ssl, certpath, health_path=None):
|
||||
############### Global data ###############
|
||||
|
||||
# Format: {uid: (Peer WebSocketServerProtocol,
|
||||
# remote_address,
|
||||
# <'session'|room_id|None>)}
|
||||
self.peers = dict()
|
||||
# Format: {caller_uid: callee_uid,
|
||||
# callee_uid: caller_uid}
|
||||
# Bidirectional mapping between the two peers
|
||||
self.sessions = dict()
|
||||
# Format: {room_id: {peer1_id, peer2_id, peer3_id, ...}}
|
||||
# Room dict with a set of peers in each room
|
||||
self.rooms = dict()
|
||||
|
||||
self.keepalive_timeout = keepalive_timeout
|
||||
self.addr = addr
|
||||
self.port = port
|
||||
self.disable_ssl = disable_ssl
|
||||
self.certpath = certpath
|
||||
self.health_path = health_path
|
||||
|
||||
############### Helper functions ###############
|
||||
|
||||
async def health_check(self, path, request_headers):
|
||||
if path == self.health_part:
|
||||
return http.HTTPStatus.OK, [], b"OK\n"
|
||||
return None
|
||||
|
||||
async def recv_msg_ping(self, ws, raddr):
|
||||
'''
|
||||
Wait for a message forever, and send a regular ping to prevent bad routers
|
||||
from closing the connection.
|
||||
'''
|
||||
msg = None
|
||||
while msg is None:
|
||||
try:
|
||||
msg = await asyncio.wait_for(ws.recv(), self.keepalive_timeout)
|
||||
except TimeoutError:
|
||||
print('Sending keepalive ping to {!r} in recv'.format(raddr))
|
||||
await ws.ping()
|
||||
return msg
|
||||
|
||||
async def cleanup_session(self, uid):
|
||||
if uid in self.sessions:
|
||||
other_id = self.sessions[uid]
|
||||
del self.sessions[uid]
|
||||
print("Cleaned up {} session".format(uid))
|
||||
if other_id in self.sessions:
|
||||
del self.sessions[other_id]
|
||||
print("Also cleaned up {} session".format(other_id))
|
||||
# If there was a session with this peer, also
|
||||
# close the connection to reset its state.
|
||||
if other_id in self.peers:
|
||||
print("Closing connection to {}".format(other_id))
|
||||
wso, oaddr, _ = self.peers[other_id]
|
||||
del self.peers[other_id]
|
||||
await wso.close()
|
||||
|
||||
async def cleanup_room(self, uid, room_id):
|
||||
room_peers = self.rooms[room_id]
|
||||
if uid not in room_peers:
|
||||
return
|
||||
room_peers.remove(uid)
|
||||
for pid in room_peers:
|
||||
wsp, paddr, _ = self.peers[pid]
|
||||
msg = 'ROOM_PEER_LEFT {}'.format(uid)
|
||||
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
|
||||
await wsp.send(msg)
|
||||
|
||||
async def remove_peer(self, uid):
|
||||
await self.cleanup_session(uid)
|
||||
if uid in self.peers:
|
||||
ws, raddr, status = self.peers[uid]
|
||||
if status and status != 'session':
|
||||
await self.cleanup_room(uid, status)
|
||||
del self.peers[uid]
|
||||
await ws.close()
|
||||
print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
|
||||
|
||||
############### Handler functions ###############
|
||||
|
||||
|
||||
async def connection_handler(self, ws, uid):
|
||||
raddr = ws.remote_address
|
||||
peer_status = None
|
||||
self.peers[uid] = [ws, raddr, peer_status]
|
||||
print("Registered peer {!r} at {!r}".format(uid, raddr))
|
||||
while True:
|
||||
# Receive command, wait forever if necessary
|
||||
msg = await self.recv_msg_ping(ws, raddr)
|
||||
# Update current status
|
||||
peer_status = self.peers[uid][2]
|
||||
# We are in a session or a room, messages must be relayed
|
||||
if peer_status is not None:
|
||||
# We're in a session, route message to connected peer
|
||||
if peer_status == 'session':
|
||||
other_id = self.sessions[uid]
|
||||
wso, oaddr, status = self.peers[other_id]
|
||||
assert(status == 'session')
|
||||
print("{} -> {}: {}".format(uid, other_id, msg))
|
||||
await wso.send(msg)
|
||||
# We're in a room, accept room-specific commands
|
||||
elif peer_status:
|
||||
# ROOM_PEER_MSG peer_id MSG
|
||||
if msg.startswith('ROOM_PEER_MSG'):
|
||||
_, other_id, msg = msg.split(maxsplit=2)
|
||||
if other_id not in self.peers:
|
||||
await ws.send('ERROR peer {!r} not found'
|
||||
''.format(other_id))
|
||||
continue
|
||||
wso, oaddr, status = self.peers[other_id]
|
||||
if status != room_id:
|
||||
await ws.send('ERROR peer {!r} is not in the room'
|
||||
''.format(other_id))
|
||||
continue
|
||||
msg = 'ROOM_PEER_MSG {} {}'.format(uid, msg)
|
||||
print('room {}: {} -> {}: {}'.format(room_id, uid, other_id, msg))
|
||||
await wso.send(msg)
|
||||
elif msg == 'ROOM_PEER_LIST':
|
||||
room_id = self.peers[peer_id][2]
|
||||
room_peers = ' '.join([pid for pid in self.rooms[room_id] if pid != peer_id])
|
||||
msg = 'ROOM_PEER_LIST {}'.format(room_peers)
|
||||
print('room {}: -> {}: {}'.format(room_id, uid, msg))
|
||||
await ws.send(msg)
|
||||
else:
|
||||
await ws.send('ERROR invalid msg, already in room')
|
||||
continue
|
||||
else:
|
||||
raise AssertionError('Unknown peer status {!r}'.format(peer_status))
|
||||
# Requested a session with a specific peer
|
||||
elif msg.startswith('SESSION'):
|
||||
print("{!r} command {!r}".format(uid, msg))
|
||||
_, callee_id = msg.split(maxsplit=1)
|
||||
if callee_id not in self.peers:
|
||||
await ws.send('ERROR peer {!r} not found'.format(callee_id))
|
||||
continue
|
||||
if peer_status is not None:
|
||||
await ws.send('ERROR peer {!r} busy'.format(callee_id))
|
||||
continue
|
||||
await ws.send('SESSION_OK')
|
||||
wsc = self.peers[callee_id][0]
|
||||
print('Session from {!r} ({!r}) to {!r} ({!r})'
|
||||
''.format(uid, raddr, callee_id, wsc.remote_address))
|
||||
# Register session
|
||||
self.peers[uid][2] = peer_status = 'session'
|
||||
self.sessions[uid] = callee_id
|
||||
self.peers[callee_id][2] = 'session'
|
||||
self.sessions[callee_id] = uid
|
||||
# Requested joining or creation of a room
|
||||
elif msg.startswith('ROOM'):
|
||||
print('{!r} command {!r}'.format(uid, msg))
|
||||
_, room_id = msg.split(maxsplit=1)
|
||||
# Room name cannot be 'session', empty, or contain whitespace
|
||||
if room_id == 'session' or room_id.split() != [room_id]:
|
||||
await ws.send('ERROR invalid room id {!r}'.format(room_id))
|
||||
continue
|
||||
if room_id in self.rooms:
|
||||
if uid in self.rooms[room_id]:
|
||||
raise AssertionError('How did we accept a ROOM command '
|
||||
'despite already being in a room?')
|
||||
else:
|
||||
# Create room if required
|
||||
self.rooms[room_id] = set()
|
||||
room_peers = ' '.join([pid for pid in self.rooms[room_id]])
|
||||
await ws.send('ROOM_OK {}'.format(room_peers))
|
||||
# Enter room
|
||||
self.peers[uid][2] = peer_status = room_id
|
||||
self.rooms[room_id].add(uid)
|
||||
for pid in self.rooms[room_id]:
|
||||
if pid == uid:
|
||||
continue
|
||||
wsp, paddr, _ = self.peers[pid]
|
||||
msg = 'ROOM_PEER_JOINED {}'.format(uid)
|
||||
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
|
||||
await wsp.send(msg)
|
||||
else:
|
||||
print('Ignoring unknown message {!r} from {!r}'.format(msg, uid))
|
||||
|
||||
async def hello_peer(self, ws):
|
||||
'''
|
||||
Exchange hello, register peer
|
||||
'''
|
||||
raddr = ws.remote_address
|
||||
hello = await ws.recv()
|
||||
hello, uid = hello.split(maxsplit=1)
|
||||
if hello != 'HELLO':
|
||||
await ws.close(code=1002, reason='invalid protocol')
|
||||
raise Exception("Invalid hello from {!r}".format(raddr))
|
||||
if not uid or uid in self.peers or uid.split() != [uid]: # no whitespace
|
||||
await ws.close(code=1002, reason='invalid peer uid')
|
||||
raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
|
||||
# Send back a HELLO
|
||||
await ws.send('HELLO')
|
||||
return uid
|
||||
|
||||
def run(self):
|
||||
sslctx = None
|
||||
if not self.disable_ssl:
|
||||
# Create an SSL context to be used by the websocket server
|
||||
print('Using TLS with keys in {!r}'.format(self.certpath))
|
||||
if 'letsencrypt' in self.certpath:
|
||||
chain_pem = os.path.join(self.certpath, 'fullchain.pem')
|
||||
key_pem = os.path.join(self.certpath, 'privkey.pem')
|
||||
else:
|
||||
chain_pem = os.path.join(self.certpath, 'cert.pem')
|
||||
key_pem = os.path.join(self.certpath, 'key.pem')
|
||||
|
||||
sslctx = ssl.create_default_context()
|
||||
try:
|
||||
sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
|
||||
except FileNotFoundError:
|
||||
print("Certificates not found, did you run generate_cert.sh?")
|
||||
sys.exit(1)
|
||||
# FIXME
|
||||
sslctx.check_hostname = False
|
||||
sslctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
async def handler(ws, path):
|
||||
'''
|
||||
All incoming messages are handled here. @path is unused.
|
||||
'''
|
||||
raddr = ws.remote_address
|
||||
print("Connected to {!r}".format(raddr))
|
||||
peer_id = await self.hello_peer(ws)
|
||||
try:
|
||||
await self.connection_handler(ws, peer_id)
|
||||
except websockets.ConnectionClosed:
|
||||
print("Connection to peer {!r} closed, exiting handler".format(raddr))
|
||||
finally:
|
||||
await self.remove_peer(peer_id)
|
||||
|
||||
print("Listening on https://{}:{}".format(self.addr, self.port))
|
||||
# Websocket server
|
||||
wsd = websockets.serve(handler, self.addr, self.port, ssl=sslctx, process_request=self.health_check if self.health_path else None,
|
||||
# Maximum number of messages that websockets will pop
|
||||
# off the asyncio and OS buffers per connection. See:
|
||||
# https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
|
||||
max_queue=16)
|
||||
|
||||
logger = logging.getLogger('websockets.server')
|
||||
|
||||
logger.setLevel(logging.ERROR)
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
|
||||
return wsd
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
# See: host, port in https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_server
|
||||
parser.add_argument('--addr', default='', help='Address to listen on (default: all interfaces, both ipv4 and ipv6)')
|
||||
parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
|
||||
parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
|
||||
parser.add_argument('--cert-path', default=os.path.dirname(__file__))
|
||||
parser.add_argument('--disable-ssl', default=False, help='Disable ssl', action='store_true')
|
||||
parser.add_argument('--health', default='/health', help='Health check route')
|
||||
|
||||
options = parser.parse_args(sys.argv[1:])
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
r = WebRTCSimpleServer(options.addr, options.port, options.keepalive_timeout, options.disable_ssl, options.cert_path)
|
||||
|
||||
loop.run_until_complete (r.run())
|
||||
loop.run_forever ()
|
||||
print ("Goodbye!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in a new issue