mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-02 05:28:48 +00:00
266 lines
8.9 KiB
Python
266 lines
8.9 KiB
Python
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this program; if not, write to the
|
|
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
|
|
# Boston, MA 02110-1301, USA.
|
|
|
|
import websockets
|
|
import asyncio
|
|
import ssl
|
|
import os
|
|
import sys
|
|
import threading
|
|
import json
|
|
import logging
|
|
|
|
from observer import Signal, StateObserver, WebRTCObserver, DataChannelObserver
|
|
from enums import SignallingState, NegotiationState, DataChannelState
|
|
|
|
l = logging.getLogger(__name__)
|
|
|
|
class AsyncIOThread(threading.Thread):
|
|
"""
|
|
Run an asyncio loop in another thread.
|
|
"""
|
|
def __init__ (self, loop):
|
|
threading.Thread.__init__(self)
|
|
self.loop = loop
|
|
|
|
def run(self):
|
|
asyncio.set_event_loop(self.loop)
|
|
self.loop.run_forever()
|
|
|
|
def stop_thread(self):
|
|
self.loop.call_soon_threadsafe(self.loop.stop)
|
|
|
|
|
|
class SignallingClientThread(object):
|
|
"""
|
|
Connect to a signalling server
|
|
"""
|
|
def __init__(self, server):
|
|
# server string to connect to. Passed directly to websockets.connect()
|
|
self.server = server
|
|
|
|
# fired after we have connected to the signalling server
|
|
self.wss_connected = Signal()
|
|
# fired every time we receive a message from the signalling server
|
|
self.message = Signal()
|
|
|
|
self._init_async()
|
|
|
|
def _init_async(self):
|
|
self._running = False
|
|
self.conn = None
|
|
self._loop = asyncio.new_event_loop()
|
|
|
|
self._thread = AsyncIOThread(self._loop)
|
|
self._thread.start()
|
|
|
|
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self._a_loop()))
|
|
|
|
async def _a_connect(self):
|
|
# connect to the signalling server
|
|
assert not self.conn
|
|
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
|
self.conn = await websockets.connect(self.server, ssl=sslctx)
|
|
|
|
async def _a_loop(self):
|
|
self._running = True
|
|
l.info('loop started')
|
|
await self._a_connect()
|
|
self.wss_connected.fire()
|
|
assert self.conn
|
|
async for message in self.conn:
|
|
self.message.fire(message)
|
|
l.info('loop exited')
|
|
|
|
def send(self, data):
|
|
# send some information to the peer
|
|
async def _a_send():
|
|
await self.conn.send(data)
|
|
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_send()))
|
|
|
|
def stop(self):
|
|
if self._running == False:
|
|
return
|
|
|
|
cond = threading.Condition()
|
|
|
|
# asyncio, why you so complicated to stop ?
|
|
tasks = asyncio.all_tasks(self._loop)
|
|
async def _a_stop():
|
|
if self.conn:
|
|
await self.conn.close()
|
|
self.conn = None
|
|
|
|
to_wait = [t for t in tasks if not t.done()]
|
|
if to_wait:
|
|
l.info('waiting for ' + str(to_wait))
|
|
done, pending = await asyncio.wait(to_wait)
|
|
with cond:
|
|
l.error('notifying cond')
|
|
cond.notify()
|
|
self._running = False
|
|
|
|
with cond:
|
|
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop()))
|
|
l.error('cond waiting')
|
|
cond.wait()
|
|
l.error('cond waited')
|
|
self._thread.stop_thread()
|
|
self._thread.join()
|
|
l.error('thread joined')
|
|
|
|
|
|
class WebRTCSignallingClient(SignallingClientThread):
|
|
"""
|
|
Signalling client implementation. Deals wit session management over the
|
|
signalling protocol. Sends and receives from a peer.
|
|
"""
|
|
def __init__(self, server, id_):
|
|
super().__init__(server)
|
|
|
|
self.wss_connected.connect(self._on_connection)
|
|
self.message.connect(self._on_message)
|
|
self.state = SignallingState.NEW
|
|
self._state_observer = StateObserver(self, "state", threading.Condition())
|
|
|
|
self.id = id_
|
|
self._peerid = None
|
|
|
|
# fired when the hello has been received
|
|
self.connected = Signal()
|
|
# fired when the signalling server responds that the session creation is ok
|
|
self.session_created = Signal()
|
|
# fired on an error
|
|
self.error = Signal()
|
|
# fired when the peer receives some json data
|
|
self.have_json = Signal()
|
|
|
|
def _update_state(self, new_state):
|
|
self._state_observer.update (new_state)
|
|
|
|
def wait_for_states(self, states):
|
|
return self._state_observer.wait_for (states)
|
|
|
|
def hello(self):
|
|
self.send('HELLO ' + str(self.id))
|
|
l.info("sent HELLO")
|
|
self.wait_for_states([SignallingState.HELLO])
|
|
|
|
def create_session(self, peerid):
|
|
self._peerid = peerid
|
|
self.send('SESSION {}'.format(self._peerid))
|
|
l.info("sent SESSION")
|
|
self.wait_for_states([SignallingState.SESSION])
|
|
|
|
def _on_connection(self):
|
|
self._update_state (SignallingState.OPEN)
|
|
|
|
def _on_message(self, message):
|
|
l.debug("received: " + message)
|
|
if message == 'HELLO':
|
|
self._update_state (SignallingState.HELLO)
|
|
self.connected.fire()
|
|
elif message == 'SESSION_OK':
|
|
self._update_state (SignallingState.SESSION)
|
|
self.session_created.fire()
|
|
elif message.startswith('ERROR'):
|
|
self._update_state (SignallingState.ERROR)
|
|
self.error.fire(message)
|
|
else:
|
|
msg = json.loads(message)
|
|
self.have_json.fire(msg)
|
|
return False
|
|
|
|
|
|
class RemoteWebRTCObserver(WebRTCObserver):
|
|
"""
|
|
Use information sent over the signalling channel to construct the current
|
|
state of a remote peer. Allow performing actions by sending requests over
|
|
the signalling channel.
|
|
"""
|
|
def __init__(self, signalling):
|
|
super().__init__()
|
|
self.signalling = signalling
|
|
|
|
def on_json(msg):
|
|
if 'STATE' in msg:
|
|
state = NegotiationState (msg['STATE'])
|
|
self._update_negotiation_state(state)
|
|
if state == NegotiationState.OFFER_CREATED:
|
|
self.on_offer_created.fire(msg['description'])
|
|
elif state == NegotiationState.ANSWER_CREATED:
|
|
self.on_answer_created.fire(msg['description'])
|
|
elif state == NegotiationState.OFFER_SET:
|
|
self.on_offer_set.fire (msg['description'])
|
|
elif state == NegotiationState.ANSWER_SET:
|
|
self.on_answer_set.fire (msg['description'])
|
|
elif 'DATA-NEW' in msg:
|
|
new = msg['DATA-NEW']
|
|
observer = RemoteDataChannelObserver(new['id'], new['location'], self)
|
|
self.add_channel (observer)
|
|
elif 'DATA-STATE' in msg:
|
|
ident = msg['id']
|
|
channel = self.find_channel(ident)
|
|
channel._update_state (DataChannelState(msg['DATA-STATE']))
|
|
elif 'DATA-MSG' in msg:
|
|
ident = msg['id']
|
|
channel = self.find_channel(ident)
|
|
channel.got_message(msg['DATA-MSG'])
|
|
self.signalling.have_json.connect (on_json)
|
|
|
|
def add_data_channel (self, ident):
|
|
msg = json.dumps({'DATA_CREATE': {'id': ident}})
|
|
self.signalling.send (msg)
|
|
|
|
def create_offer (self):
|
|
msg = json.dumps({'CREATE_OFFER': ""})
|
|
self.signalling.send (msg)
|
|
|
|
def create_answer (self):
|
|
msg = json.dumps({'CREATE_ANSWER': ""})
|
|
self.signalling.send (msg)
|
|
|
|
def set_title (self, title):
|
|
# entirely for debugging purposes
|
|
msg = json.dumps({'SET_TITLE': title})
|
|
self.signalling.send (msg)
|
|
|
|
def set_options (self, opts):
|
|
options = {}
|
|
if opts.has_field("remote-bundle-policy"):
|
|
options["bundlePolicy"] = opts["remote-bundle-policy"]
|
|
msg = json.dumps({'OPTIONS' : options})
|
|
self.signalling.send (msg)
|
|
|
|
|
|
class RemoteDataChannelObserver(DataChannelObserver):
|
|
"""
|
|
Use information sent over the signalling channel to construct the current
|
|
state of a remote peer's data channel. Allow performing actions by sending
|
|
requests over the signalling channel.
|
|
"""
|
|
def __init__(self, ident, location, webrtc):
|
|
super().__init__(ident, location)
|
|
self.webrtc = webrtc
|
|
|
|
def send_string(self, msg):
|
|
msg = json.dumps({'DATA_SEND_MSG': {'msg' : msg, 'id': self.ident}})
|
|
self.webrtc.signalling.send (msg)
|
|
|
|
def close (self):
|
|
msg = json.dumps({'DATA_CLOSE': {'id': self.ident}})
|
|
self.webrtc.signalling.send (msg)
|