check/validate: a few more tests and improvements

Tests a matrix of options:
- local/remote negotiation initiator
- 'most' bundle-policy combinations (some combinations will never work)
- firefox or chrome browser

Across 4 test scenarios:
- simple negotiation with default browser streams (or none if gstreamer
  initiates)
- sending a vp8 stream
- opening a data channel
- sending a message over the data channel

for a total of 112 tests!
This commit is contained in:
Matthew Waters 2020-02-12 21:56:34 +11:00 committed by Matthew Waters
parent c3f629340d
commit 615813ef93
29 changed files with 926 additions and 262 deletions

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3
#
# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -22,6 +20,11 @@ gi.require_version("GstValidate", "1.0")
from gi.repository import GstValidate
from observer import Signal
from enums import Actions
import logging
l = logging.getLogger(__name__)
class ActionObserver(object):
def __init__(self):
@ -42,36 +45,62 @@ class ActionObserver(object):
return val
self.create_offer = Signal(_action_continue, _action_accum)
self.wait_for_negotiation_state = Signal(_action_continue, _action_accum)
self.add_stream = Signal(_action_continue, _action_accum)
self.wait_for_remote_state = Signal(_action_continue, _action_accum)
self.action = Signal(_action_continue, _action_accum)
def _action(self, scenario, action):
l.debug('executing action: ' + str(action.structure))
return self.action.fire (Actions(action.structure.get_name()), action)
def _create_offer(self, scenario, action):
print("action create-offer")
return self.create_offer.fire()
def _wait_for_negotiation_state(self, scenario, action):
state = action.structure["state"]
print("action wait-for-negotiation-state", state)
return self.wait_for_negotiation_state.fire(state)
def _add_stream(self, scenario, action):
pipeline = action.structure["pipeline"]
print("action add-stream", pipeline)
return self.add_stream.fire(pipeline)
def register_action_types(observer):
if not isinstance(observer, ActionObserver):
raise TypeError
GstValidate.register_action_type("create-offer", "webrtc",
observer._create_offer, None,
GstValidate.register_action_type(Actions.CREATE_OFFER.value,
"webrtc", observer._action, None,
"Instruct a create-offer to commence",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type("wait-for-negotiation-state", "webrtc",
observer._wait_for_negotiation_state, None,
GstValidate.register_action_type(Actions.CREATE_ANSWER.value,
"webrtc", observer._action, None,
"Create answer",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.WAIT_FOR_NEGOTIATION_STATE.value,
"webrtc", observer._action, None,
"Wait for a specific negotiation state to be reached",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type("add-stream", "webrtc",
observer._add_stream, None,
GstValidate.register_action_type(Actions.ADD_STREAM.value,
"webrtc", observer._action, None,
"Add a stream to the webrtcbin",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.ADD_DATA_CHANNEL.value,
"webrtc", observer._action, None,
"Add a data channel to the webrtcbin",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.SEND_DATA_CHANNEL_STRING.value,
"webrtc", observer._action, None,
"Send a message using a data channel",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL_STATE.value,
"webrtc", observer._action, None,
"Wait for data channel to reach state",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.CLOSE_DATA_CHANNEL.value,
"webrtc", observer._action, None,
"Close a data channel",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL.value,
"webrtc", observer._action, None,
"Wait for a data channel to appear",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.WAIT_FOR_DATA_CHANNEL_STRING.value,
"webrtc", observer._action, None,
"Wait for a data channel to receive a message",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.WAIT_FOR_NEGOTIATION_NEEDED.value,
"webrtc", observer._action, None,
"Wait for a the on-negotiation-needed signal to fire",
GstValidate.ActionTypeFlags.NONE)
GstValidate.register_action_type(Actions.SET_WEBRTC_OPTIONS.value,
"webrtc", observer._action, None,
"Set some webrtc options",
GstValidate.ActionTypeFlags.NONE)

View file

@ -1,6 +1,4 @@
#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python
#
# Copyright (c) 2018 Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -21,26 +19,82 @@ import inspect
import os
import sys
import shutil
import itertools
import tempfile
from launcher.baseclasses import TestsManager, TestsGenerator, GstValidateTest, ScenarioManager
from launcher.baseclasses import TestsManager, GstValidateTest, ScenarioManager
from launcher.utils import DEFAULT_TIMEOUT
DEFAULT_BROWSERS = ['firefox', 'chrome']
# list of scenarios. These are the names of the actual scenario files stored
# on disk.
DEFAULT_SCENARIOS = [
"offer_answer",
"vp8_send_stream"
"vp8_send_stream",
"open_data_channel",
"send_data_channel_string",
]
BROWSER_SCENARIO_BLACKLISTS = {
'firefox' : [
'offer_answer', # fails to accept an SDP without any media sections
],
'chrome' : [
],
# various configuration changes that are included from other scenarios.
# key is the name of the override used in the name of the test
# value is the subdirectory where the override is placed
# changes some things about the test like:
# - who initiates the negotiation
# - bundle settings
SCENARIO_OVERRIDES = {
# name : directory
# who starts the negotiation
'local' : 'local_initiates_negotiation',
'remote' : 'remote_initiates_negotiation',
# bundle-policy configuration
# XXX: webrtcbin's bundle-policy=none is not part of the spec
'none_compat' : 'bundle_local_none_remote_max_compat',
'none_balanced' : 'bundle_local_none_remote_balanced',
'none_bundle' : 'bundle_local_none_remote_max_bundle',
'compat_compat' : 'bundle_local_max_compat_remote_max_compat',
'compat_balanced' : 'bundle_local_max_compat_remote_balanced',
'compat_bundle' : 'bundle_local_max_compat_remote_max_bundle',
'balanced_compat' : 'bundle_local_balanced_remote_max_compat',
'balanced_balanced' : 'bundle_local_balanced_remote_balanced',
'balanced_bundle' : 'bundle_local_balanced_remote_bundle',
'bundle_compat' : 'bundle_local_max_bundle_remote_max_compat',
'bundle_balanced' : 'bundle_local_max_bundle_remote_balanced',
'bundle_bundle' : 'bundle_local_max_bundle_remote_max_bundle',
}
bundle_options = ['compat', 'balanced', 'bundle']
# Given an override, these are the choices to choose from. Each choice is a
# separate test
OVERRIDE_CHOICES = {
'initiator' : ['local', 'remote'],
'bundle' : ['_'.join(opt) for opt in itertools.product(['none'] + bundle_options, bundle_options)],
}
# Which scenarios support which override. All the overrides will be chosen
SCENARIO_OVERRIDES_SUPPORTED = {
"offer_answer" : ['initiator', 'bundle'],
"vp8_send_stream" : ['initiator', 'bundle'],
"open_data_channel" : ['initiator', 'bundle'],
"send_data_channel_string" : ['initiator', 'bundle'],
}
# Things that don't work for some reason or another.
DEFAULT_BLACKLIST = [
(r"webrtc\.firefox\.local\..*offer_answer",
"Firefox doesn't like a SDP without any media"),
(r"webrtc.*remote.*vp8_send_stream",
"We can't match payload types with a remote offer and a sending stream"),
(r"webrtc.*\.balanced_.*",
"webrtcbin doesn't implement bundle-policy=balanced"),
(r"webrtc.*\.none_bundle.*",
"Browsers want a BUNDLE group if in max-bundle mode"),
]
class MutableInt(object):
def __init__(self, value):
self.value = value
@ -66,11 +120,12 @@ class GstWebRTCTest(GstValidateTest):
@classmethod
def __get_available_peer_id(cls):
# each connection uses two peer ids
peerid = cls.__last_id.value
cls.__last_id.value += 2
return peerid
def __init__(self, classname, tests_manager, scenario, browser, timeout=DEFAULT_TIMEOUT):
def __init__(self, classname, tests_manager, scenario, browser, scenario_override_includes=None, timeout=DEFAULT_TIMEOUT):
super().__init__("python3",
classname,
tests_manager.options,
@ -82,6 +137,7 @@ class GstWebRTCTest(GstValidateTest):
self.current_file_path = os.path.dirname (os.path.abspath (filename))
self.certdir = None
self.browser = browser
self.scenario_override_includes = scenario_override_includes
def launch_server(self):
if self.options.redirect_logs == 'stdout':
@ -138,6 +194,8 @@ class GstWebRTCTest(GstValidateTest):
html_page = os.path.join(self.current_file_path, '..', 'web', 'single_stream.html')
html_params = '?server=127.0.0.1&port=' + str(self.server_port) + '&id=' + str(web_id)
self.add_arguments("file://" + html_page + html_params)
self.add_arguments("--name")
self.add_arguments(self.classname)
self.add_arguments('--peer-id')
self.add_arguments(str(web_id))
self.add_arguments(str(gst_id))
@ -154,10 +212,27 @@ class GstWebRTCTest(GstValidateTest):
self.__used_ports.remove(self.server_port)
if self.certdir:
shutil.rmtree(self.certdir, ignore_errors=True)
self.certdir
return res
def get_subproc_env(self):
env = super().get_subproc_env()
if not self.scenario_override_includes:
return env
# this feels gross...
paths = env.get('GST_VALIDATE_SCENARIOS_PATH', '').split(os.pathsep)
new_paths = []
for p in paths:
new_paths.append(p)
for override_path in self.scenario_override_includes:
new_p = os.path.join(p, override_path)
if os.path.exists (new_p):
new_paths.append(new_p)
env['GST_VALIDATE_SCENARIOS_PATH'] = os.pathsep.join(new_paths)
return env
class GstWebRTCTestsManager(TestsManager):
scenarios_manager = ScenarioManager()
name = "webrtc"
@ -165,23 +240,50 @@ class GstWebRTCTestsManager(TestsManager):
def __init__(self):
super(GstWebRTCTestsManager, self).__init__()
self.loading_testsuite = self.name
self._scenarios = []
def webrtc_server_address(self):
return "wss://127.0.0.1:8443"
def add_scenarios(self, scenarios):
if isinstance(scenarios, list):
self._scenarios.extend(scenarios)
else:
self._scenarios.append(scenarios)
self._scenarios = list(set(self._scenarios))
def set_scenarios(self, scenarios):
self._scenarios = []
self.add_scenarios(scenarios)
def get_scenarios(self):
return self._scenarios
def populate_testsuite(self):
self.add_scenarios (DEFAULT_SCENARIOS)
self.set_default_blacklist(DEFAULT_BLACKLIST)
def list_tests(self):
if self.tests:
return self.tests
scenarios = [(scenario_name, self.scenarios_manager.get_scenario(scenario_name))
for scenario_name in self.get_scenarios()]
for browser in DEFAULT_BROWSERS:
for name, scenario in scenarios:
if not scenario:
self.warning("Could not find scenario %s" % name)
continue
for browser in DEFAULT_BROWSERS:
if name in BROWSER_SCENARIO_BLACKLISTS[browser]:
self.warning('Skipping broken test', name, 'for browser', browser)
continue
classname = browser + '_' + name
if not SCENARIO_OVERRIDES_SUPPORTED[name]:
# no override choices supported
classname = browser + '.' + name
print ("adding", classname)
self.add_test(GstWebRTCTest(classname, self, scenario, browser))
else:
for overrides in itertools.product(*[OVERRIDE_CHOICES[c] for c in SCENARIO_OVERRIDES_SUPPORTED[name]]):
oname = '.'.join (overrides)
opaths = [SCENARIO_OVERRIDES[p] for p in overrides]
classname = browser + '.' + oname + '.' + name
print ("adding", classname)
self.add_test(GstWebRTCTest(classname, self, scenario, browser, opaths))
return self.tests

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3
#
# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -17,11 +15,14 @@
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
import logging
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.firefox.firefox_profile import FirefoxProfile
from selenium.webdriver.chrome.options import Options as COptions
l = logging.getLogger(__name__)
def create_firefox_driver():
capabilities = webdriver.DesiredCapabilities().FIREFOX.copy()
capabilities['acceptSslCerts'] = True
@ -39,6 +40,9 @@ def create_chrome_driver():
copts.add_argument('--use-fake-ui-for-media-stream')
copts.add_argument('--use-fake-device-for-media-stream')
copts.add_argument('--enable-blink-features=RTCUnifiedPlanByDefault')
# XXX: until libnice can deal with mdns candidates
local_state = {"enabled_labs_experiments" : ["enable-webrtc-hide-local-ips-with-mdns@2"] }
copts.add_experimental_option("localState", {"browser" : local_state})
return webdriver.Chrome(options=copts, desired_capabilities=capabilities)
@ -48,7 +52,7 @@ def create_driver(name):
elif name == 'chrome':
return create_chrome_driver()
else:
raise ValueError("Unknown browser name " + name)
raise ValueError("Unknown browser name \'" + name + "\'")
def valid_int(n):
if isinstance(n, int):
@ -62,17 +66,22 @@ def valid_int(n):
return False
class Browser(object):
def __init__(self, driver, html_source):
"""
A browser as connected through selenium.
"""
def __init__(self, driver):
l.info('Using driver \'' + driver.name + '\' with capabilities ' + str(driver.capabilities))
self.driver = driver
self.html_source = html_source
def open(self, html_source):
self.driver.get(html_source)
def get_peer_id(self):
self.driver.get(self.html_source)
peer_id = WebDriverWait(self.driver, 10).until(
peer_id = WebDriverWait(self.driver, 5).until(
lambda x: x.find_element_by_id('peer-id'),
message='Peer-id element was never seen'
)
WebDriverWait (self.driver, 10).until(
WebDriverWait (self.driver, 5).until(
lambda x: valid_int(peer_id.text),
message='Peer-id never became a number'
)

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3
#
# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -20,8 +18,8 @@
import threading
import copy
from observer import Signal
from enums import NegotiationState
from observer import Signal, WebRTCObserver, DataChannelObserver, StateObserver
from enums import NegotiationState, DataChannelState
import gi
gi.require_version("Gst", "1.0")
@ -33,10 +31,13 @@ from gi.repository import GstSdp
gi.require_version("GstValidate", "1.0")
from gi.repository import GstValidate
class WebRTCBinObserver(object):
class WebRTCBinObserver(WebRTCObserver):
"""
Observe a webrtcbin element.
"""
def __init__(self, element):
self.state = NegotiationState.NEW
self.state_cond = threading.Condition()
WebRTCObserver.__init__(self)
self.element = element
self.signal_handlers = []
self.signal_handlers.append(element.connect("on-negotiation-needed", self._on_negotiation_needed))
@ -44,17 +45,16 @@ class WebRTCBinObserver(object):
self.signal_handlers.append(element.connect("pad-added", self._on_pad_added))
self.signal_handlers.append(element.connect("on-new-transceiver", self._on_new_transceiver))
self.signal_handlers.append(element.connect("on-data-channel", self._on_data_channel))
self.on_offer_created = Signal()
self.on_answer_created = Signal()
self.negotiation_needed = 0
self._negotiation_needed_observer = StateObserver(self, "negotiation_needed", threading.Condition())
self.on_negotiation_needed = Signal()
self.on_ice_candidate = Signal()
self.on_pad_added = Signal()
self.on_new_transceiver = Signal()
self.on_data_channel = Signal()
self.on_local_description_set = Signal()
self.on_remote_description_set = Signal()
def _on_negotiation_needed(self, element):
self.negotiation_needed += 1
self._negotiation_needed_observer.update(self.negotiation_needed)
self.on_negotiation_needed.fire()
def _on_ice_candidate(self, element, mline, candidate):
@ -63,35 +63,19 @@ class WebRTCBinObserver(object):
def _on_pad_added(self, element, pad):
self.on_pad_added.fire(pad)
def _on_local_description_set(self, promise, desc):
self._update_negotiation_from_description_state(desc)
self.on_local_description_set.fire(desc)
def _on_remote_description_set(self, promise, desc):
self._update_negotiation_from_description_state(desc)
self.on_remote_description_set.fire(desc)
def _on_description_set(self, promise, desc):
new_state = self._update_negotiation_from_description_state(desc)
if new_state == NegotiationState.OFFER_SET:
self.on_offer_set.fire (desc)
elif new_state == NegotiationState.ANSWER_SET:
self.on_answer_set.fire (desc)
def _on_new_transceiver(self, element, transceiver):
self.on_new_transceiver.fire(transceiver)
def _on_data_channel(self, element):
self.on_data_channel.fire(desc)
def _update_negotiation_state(self, new_state):
with self.state_cond:
old_state = self.state
self.state = new_state
self.state_cond.notify_all()
print ("observer updated state to", new_state)
def wait_for_negotiation_states(self, states):
ret = None
with self.state_cond:
while self.state not in states:
self.state_cond.wait()
print ("observer waited for", states)
ret = self.state
return ret
def _on_data_channel(self, element, channel):
observer = WebRTCBinDataChannelObserver(channel, channel.props.label, 'remote')
self.add_channel(observer)
def _update_negotiation_from_description_state(self, desc):
new_state = None
@ -101,6 +85,7 @@ class WebRTCBinObserver(object):
new_state = NegotiationState.ANSWER_SET
assert new_state is not None
self._update_negotiation_state(new_state)
return new_state
def _deepcopy_session_description(self, desc):
# XXX: passing 'offer' to both a promise and an action signal without
@ -115,11 +100,10 @@ class WebRTCBinObserver(object):
offer = reply['offer']
new_offer = self._deepcopy_session_description(offer)
promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer)
promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer)
new_offer = self._deepcopy_session_description(offer)
self.element.emit('set-local-description', new_offer, promise)
self.on_offer_created.fire(offer)
def _on_answer_created(self, promise, element):
@ -128,11 +112,10 @@ class WebRTCBinObserver(object):
offer = reply['answer']
new_offer = self._deepcopy_session_description(offer)
promise = Gst.Promise.new_with_change_func(self._on_local_description_set, new_offer)
promise = Gst.Promise.new_with_change_func(self._on_description_set, new_offer)
new_offer = self._deepcopy_session_description(offer)
self.element.emit('set-local-description', new_offer, promise)
self.on_answer_created.fire(offer)
def create_offer(self, options=None):
@ -144,13 +127,24 @@ class WebRTCBinObserver(object):
self.element.emit('create-answer', options, promise)
def set_remote_description(self, desc):
promise = Gst.Promise.new_with_change_func(self._on_remote_description_set, desc)
promise = Gst.Promise.new_with_change_func(self._on_description_set, desc)
self.element.emit('set-remote-description', desc, promise)
def add_ice_candidate(self, mline, candidate):
self.element.emit('add-ice-candidate', mline, candidate)
def add_data_channel(self, ident):
channel = self.element.emit('create-data-channel', ident, None)
observer = WebRTCBinDataChannelObserver(channel, ident, 'local')
self.add_channel(observer)
def wait_for_negotiation_needed(self, generation):
self._negotiation_needed_observer.wait_for ((generation,))
class WebRTCStream(object):
"""
An stream attached to a webrtcbin element
"""
def __init__(self):
self.bin = None
@ -189,6 +183,10 @@ class WebRTCStream(object):
self.bin.sync_state_with_parent()
class WebRTCClient(WebRTCBinObserver):
"""
Client for performing webrtc operations. Controls the pipeline that
contains a webrtcbin element.
"""
def __init__(self):
self.pipeline = Gst.Pipeline(None)
self.webrtcbin = Gst.ElementFactory.make("webrtcbin")
@ -210,3 +208,42 @@ class WebRTCClient(WebRTCBinObserver):
stream.set_description(desc)
stream.add_and_link_to (self.pipeline, self.webrtcbin, pad)
self._streams.append(stream)
def set_options (self, opts):
if opts.has_field("local-bundle-policy"):
self.webrtcbin.props.bundle_policy = opts["local-bundle-policy"]
class WebRTCBinDataChannelObserver(DataChannelObserver):
"""
Data channel observer for a webrtcbin data channel.
"""
def __init__(self, target, ident, location):
super().__init__(ident, location)
self.target = target
self.signal_handlers = []
self.signal_handlers.append(target.connect("on-open", self._on_open))
self.signal_handlers.append(target.connect("on-close", self._on_close))
self.signal_handlers.append(target.connect("on-error", self._on_error))
self.signal_handlers.append(target.connect("on-message-data", self._on_message_data))
self.signal_handlers.append(target.connect("on-message-string", self._on_message_string))
self.signal_handlers.append(target.connect("on-buffered-amount-low", self._on_buffered_amount_low))
def _on_open(self, channel):
self._update_state (DataChannelState.OPEN)
def _on_close(self, channel):
self._update_state (DataChannelState.CLOSED)
def _on_error(self, channel):
self._update_state (DataChannelState.ERROR)
def _on_message_data(self, channel, data):
self.data.append(msg)
def _on_message_string(self, channel, msg):
self.got_message (msg)
def _on_buffered_amount_low(self, channel):
pass
def close(self):
self.target.emit('close')
def send_string (self, msg):
self.target.emit('send-string', msg)

View file

@ -1,4 +1,4 @@
# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -15,22 +15,57 @@
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
class SignallingState(object):
from enum import Enum, unique
@unique
class SignallingState(Enum):
"""
State of the signalling protocol.
"""
NEW = "new" # no connection has been made
ERROR = "error" # an error was thrown. overrides all others
OPEN = "open" # websocket connection is open
ERROR = "error" # and error was thrown. overrides all others
HELLO = "hello" # hello was sent and received
SESSION = "session" # session setup was sent and received
class NegotiationState(object):
NEW = "new"
ERROR = "error"
NEGOTIATION_NEEDED = "negotiation-needed"
OFFER_CREATED = "offer-created"
ANSWER_CREATED = "answer-created"
OFFER_SET = "offer-set"
ANSWER_SET = "answer-set"
@unique
class NegotiationState(Enum):
"""
State of the webrtc negotiation. Both peers have separate states and are
tracked separately.
"""
NEW = "new" # No negotiation has been performed
ERROR = "error" # an error occured
OFFER_CREATED = "offer-created" # offer was created
ANSWER_CREATED = "answer-created" # answer was created
OFFER_SET = "offer-set" # offer has been set
ANSWER_SET = "answer-set" # answer has been set
class RemoteState(object):
ERROR = "error"
REMOTE_STREAM_RECEIVED = "remote-stream-received"
@unique
class DataChannelState(Enum):
"""
State of a data channel. Each data channel is tracked individually
"""
NEW = "new" # data channel created but not connected
OPEN = "open" # data channel is open, data can flow
CLOSED = "closed" # data channel is closed, sending data will fail
ERROR = "error" # data channel encountered an error
@unique
class Actions(Enum):
"""
Action names that we implement. Each name is the structure name for each
action as stored in the scenario file.
"""
CREATE_OFFER = "create-offer" # create an offer and send it to the peer
CREATE_ANSWER = "create-answer" # create an answer and send it to the peer
WAIT_FOR_NEGOTIATION_STATE = "wait-for-negotiation-state" # wait for the @NegotiationState to reach a certain value
ADD_STREAM = "add-stream" # add a stream to send to the peer. local only
ADD_DATA_CHANNEL = "add-data-channel" # add a stream to send to the peer. local only
WAIT_FOR_DATA_CHANNEL = "wait-for-data-channel" # wait for a data channel to appear
WAIT_FOR_DATA_CHANNEL_STATE = "wait-for-data-channel-state" # wait for a data channel to have a certain state
SEND_DATA_CHANNEL_STRING = "send-data-channel-string" # send a string over the data channel
WAIT_FOR_DATA_CHANNEL_STRING = "wait-for-data-channel-string" # wait for a string on the data channel
CLOSE_DATA_CHANNEL = "close-data-channel" # close a data channel
WAIT_FOR_NEGOTIATION_NEEDED = "wait-for-negotiation-needed" # wait for negotiation needed to fire
SET_WEBRTC_OPTIONS = "set-webrtc-options" # set some options

View file

@ -1,4 +1,4 @@
# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -15,7 +15,17 @@
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
import logging
import threading
from enums import NegotiationState, DataChannelState
l = logging.getLogger(__name__)
class Signal(object):
"""
A class for callback-based signal handlers.
"""
def __init__(self, cont_func=None, accum_func=None):
self._handlers = []
if not cont_func:
@ -41,3 +51,119 @@ class Signal(object):
if not self.cont_func(ret):
break
return ret
class StateObserver(object):
"""
Observe some state. Allows waiting for specific states to occur and
notifying waiters of updated values. Will hold previous states to ensure
@update cannot change the state before @wait_for can look at the state.
"""
def __init__(self, target, attr_name, cond):
self.target = target
self.attr_name = attr_name
self.cond = cond
# track previous states of the value so that the notification still
# occurs even if the field has moved on to another state
self.previous_states = []
def wait_for(self, states):
ret = None
with self.cond:
state = getattr (self.target, self.attr_name)
l.debug (str(self.target) + " \'" + self.attr_name +
"\' waiting for " + str(states))
while True:
l.debug(str(self.target) + " \'" + self.attr_name +
"\' previous states: " + str(self.previous_states))
for i, s in enumerate (self.previous_states):
if s in states:
l.debug(str(self.target) + " \'" + self.attr_name +
"\' " + str(s) + " found at position " +
str(i) + " of " + str(self.previous_states))
self.previous_states = self.previous_states[i:]
return s
self.cond.wait()
def update (self, new_state):
with self.cond:
self.previous_states += [new_state]
setattr(self.target, self.attr_name, new_state)
self.cond.notify_all()
l.debug (str(self.target) + " updated \'" + self.attr_name + "\' to " + str(new_state))
class WebRTCObserver(object):
"""
Base webrtc observer class. Stores a lot of duplicated functionality
between the local and remove peer implementations.
"""
def __init__(self):
self.state = NegotiationState.NEW
self._state_observer = StateObserver(self, "state", threading.Condition())
self.on_offer_created = Signal()
self.on_answer_created = Signal()
self.on_offer_set = Signal()
self.on_answer_set = Signal()
self.on_data_channel = Signal()
self.data_channels = []
self._xxxxxxxdata_channel_ids = None
self._data_channels_observer = StateObserver(self, "_xxxxxxxdata_channel_ids", threading.Condition())
def _update_negotiation_state(self, new_state):
self._state_observer.update (new_state)
def wait_for_negotiation_states(self, states):
return self._state_observer.wait_for (states)
def find_channel (self, ident):
for c in self.data_channels:
if c.ident == ident:
return c
def add_channel (self, channel):
l.debug('adding channel ' + str (channel) + ' with name ' + str(channel.ident))
self.data_channels.append (channel)
self._data_channels_observer.update (channel.ident)
self.on_data_channel.fire(channel)
def wait_for_data_channel(self, ident):
return self._data_channels_observer.wait_for (ident)
def create_offer(self, options):
raise NotImplementedError()
def add_data_channel(self, ident):
raise NotImplementedError()
class DataChannelObserver(object):
"""
Base webrtc data channelobserver class. Stores a lot of duplicated
functionality between the local and remove peer implementations.
"""
def __init__(self, ident, location):
self.state = DataChannelState.NEW
self._state_observer = StateObserver(self, "state", threading.Condition())
self.ident = ident
self.location = location
self.message = None
self._message_observer = StateObserver(self, "message", threading.Condition())
def _update_state(self, new_state):
self._state_observer.update (new_state)
def wait_for_states(self, states):
return self._state_observer.wait_for (states)
def wait_for_message (self, msg):
return self._message_observer.wait_for (msg)
def got_message(self, msg):
self._message_observer.update (msg)
def close (self):
raise NotImplementedError()
def send_string (self, msg):
raise NotImplementedError()

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=balanced, remote_bundle_policy=balanced

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=balanced, remote_bundle_policy=max-bundle

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=balanced, remote_bundle_policy=max-compat

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=balanced

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=max-bundle

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=max-bundle, remote_bundle_policy=max-compat

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=max-compat, remote_bundle_policy=balanced

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=max-compat, remote_bundle_policy=max-bundle

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=max-compat, remote_bundle_policy=max-compat

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=none, remote_bundle_policy=balanced

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=none, remote_bundle_policy=max-bundle

View file

@ -0,0 +1 @@
set-vars, local_bundle_policy=none, remote_bundle_policy=max-compat

View file

@ -0,0 +1 @@
set-vars, negotiation_initiator=local, negotiation_responder=remote

View file

@ -1,3 +1,15 @@
description, summary="Produce an offer"
create-offer;
wait-for-negotiation-state, state="answer-set"
description, summary="Produce an offer and negotiate it with the peer"
include,location=negotiation_initiator.scenario
include,location=bundle_policy.scenario
set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
create-offer, which="$(negotiation_initiator)";
# all of these waits are technically unnecessary and only the last is needed
wait-for-negotiation-state, which="$(negotiation_initiator)", state="offer-created"
wait-for-negotiation-state, which="$(negotiation_initiator)", state="offer-set"
wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
create-answer, which="$(negotiation_responder)";
wait-for-negotiation-state, which="$(negotiation_responder)", state="answer-created"
wait-for-negotiation-state, which="$(negotiation_responder)", state="answer-set"
wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"

View file

@ -0,0 +1,23 @@
description, summary="Open a data channel"
include,location=negotiation_initiator.scenario
include,location=bundle_policy.scenario
set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
# add the channel on the initiator so that datachannel is added to the sdp
add-data-channel, which="$(negotiation_initiator)", id="gstreamer";
# negotiate
create-offer, which="$(negotiation_initiator)";
wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
create-answer, which="$(negotiation_responder)";
wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"
# ensure data channel is created
wait-for-data-channel, which="$(negotiation_responder)", id="gstreamer";
wait-for-data-channel, which="$(negotiation_initiator)", id="gstreamer";
wait-for-data-channel-state, which="$(negotiation_initiator)", id="gstreamer", state="open";
# only the browser closing works at the moment
close-data-channel, which="remote", id="gstreamer"
wait-for-data-channel-state, which="local", id="gstreamer", state="closed";

View file

@ -0,0 +1 @@
set-vars, negotiation_initiator=remote, negotiation_responder=local

View file

@ -0,0 +1,21 @@
description, summary="Send data over a data channel"
include,location=negotiation_initiator.scenario
include,location=bundle_policy.scenario
set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
add-data-channel, which="$(negotiation_initiator)", id="gstreamer";
create-offer, which="$(negotiation_initiator)";
wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
create-answer, which="$(negotiation_responder)";
wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"
# wait for the data channel to appear
wait-for-data-channel, which="$(negotiation_initiator)", id="gstreamer";
wait-for-data-channel, which="$(negotiation_responder)", id="gstreamer";
wait-for-data-channel-state, which="$(negotiation_initiator)", id="gstreamer", state="open";
# send something
send-data-channel-string, which="local", id="gstreamer", msg="some data";
wait-for-data-channel-string, which="remote", id="gstreamer", msg="some data";

View file

@ -1,5 +1,15 @@
description, summary="Send a VP8 stream", handles-state=true
include,location=negotiation_initiator.scenario
include,location=bundle_policy.scenario
set-webrtc-options, local-bundle-policy="$(local_bundle_policy)", remote-bundle-policy="$(remote_bundle_policy)"
add-stream, pipeline="videotestsrc is-live=1 ! vp8enc ! rtpvp8pay ! queue"
set-state, state="playing";
create-offer;
wait-for-negotiation-state, state="answer-set"
wait-for-negotiation-needed, generation=1;
# negotiate
create-offer, which="$(negotiation_initiator)";
wait-for-negotiation-state, which="$(negotiation_responder)", state="offer-set"
create-answer, which="$(negotiation_responder)";
wait-for-negotiation-state, which="$(negotiation_initiator)", state="answer-set"

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python3
#
# Copyright (c) 2018, Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020, Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -24,11 +22,17 @@ import os
import sys
import threading
import json
import logging
from observer import Signal
from enums import SignallingState, RemoteState
from observer import Signal, StateObserver, WebRTCObserver, DataChannelObserver
from enums import SignallingState, NegotiationState, DataChannelState
l = logging.getLogger(__name__)
class AsyncIOThread(threading.Thread):
"""
Run an asyncio loop in another thread.
"""
def __init__ (self, loop):
threading.Thread.__init__(self)
self.loop = loop
@ -40,16 +44,24 @@ class AsyncIOThread(threading.Thread):
def stop_thread(self):
self.loop.call_soon_threadsafe(self.loop.stop)
class SignallingClientThread(object):
"""
Connect to a signalling server
"""
def __init__(self, server):
# server string to connect to. Passed directly to websockets.connect()
self.server = server
# fired after we have connected to the signalling server
self.wss_connected = Signal()
# fired every time we receive a message from the signalling server
self.message = Signal()
self._init_async()
def _init_async(self):
self._running = False
self.conn = None
self._loop = asyncio.new_event_loop()
@ -59,23 +71,31 @@ class SignallingClientThread(object):
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(self._a_loop()))
async def _a_connect(self):
# connect to the signalling server
assert not self.conn
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
self.conn = await websockets.connect(self.server, ssl=sslctx)
async def _a_loop(self):
self._running = True
l.info('loop started')
await self._a_connect()
self.wss_connected.fire()
assert self.conn
async for message in self.conn:
self.message.fire(message)
l.info('loop exited')
def send(self, data):
# send some information to the peer
async def _a_send():
await self.conn.send(data)
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_send()))
def stop(self):
if self._running == False:
return
cond = threading.Condition()
# asyncio, why you so complicated to stop ?
@ -87,64 +107,70 @@ class SignallingClientThread(object):
to_wait = [t for t in tasks if not t.done()]
if to_wait:
l.info('waiting for ' + str(to_wait))
done, pending = await asyncio.wait(to_wait)
with cond:
l.error('notifying cond')
cond.notify()
self._running = False
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop()))
with cond:
self._loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_a_stop()))
l.error('cond waiting')
cond.wait()
l.error('cond waited')
self._thread.stop_thread()
self._thread.join()
l.error('thread joined')
class WebRTCSignallingClient(SignallingClientThread):
"""
Signalling client implementation. Deals wit session management over the
signalling protocol. Sends and receives from a peer.
"""
def __init__(self, server, id_):
super().__init__(server)
self.wss_connected.connect(self._on_connection)
self.message.connect(self._on_message)
self.state = SignallingState.NEW
self._state_cond = threading.Condition()
self._state_observer = StateObserver(self, "state", threading.Condition())
self.id = id_
self._peerid = None
# override that base class
# fired when the hello has been received
self.connected = Signal()
# fired when the signalling server responds that the session creation is ok
self.session_created = Signal()
# fired on an error
self.error = Signal()
# fired when the peer receives some json data
self.have_json = Signal()
def wait_for_states(self, states):
ret = None
with self._state_cond:
while self.state not in states:
self._state_cond.wait()
ret = self.state
return ret
def _update_state(self, new_state):
self._state_observer.update (new_state)
def _update_state(self, state):
with self._state_cond:
if self.state is not SignallingState.ERROR:
self.state = state
self._state_cond.notify_all()
def wait_for_states(self, states):
return self._state_observer.wait_for (states)
def hello(self):
self.send('HELLO ' + str(self.id))
l.info("sent HELLO")
self.wait_for_states([SignallingState.HELLO])
print("signalling-client sent HELLO")
def create_session(self, peerid):
self._peerid = peerid
self.send('SESSION {}'.format(self._peerid))
self.wait_for_states([SignallingState.SESSION, SignallingState.ERROR])
print("signalling-client sent SESSION")
l.info("sent SESSION")
self.wait_for_states([SignallingState.SESSION])
def _on_connection(self):
self._update_state (SignallingState.OPEN)
def _on_message(self, message):
print("signalling-client received", message)
l.debug("received: " + message)
if message == 'HELLO':
self._update_state (SignallingState.HELLO)
self.connected.fire()
@ -159,3 +185,82 @@ class WebRTCSignallingClient(SignallingClientThread):
self.have_json.fire(msg)
return False
class RemoteWebRTCObserver(WebRTCObserver):
"""
Use information sent over the signalling channel to construct the current
state of a remote peer. Allow performing actions by sending requests over
the signalling channel.
"""
def __init__(self, signalling):
super().__init__()
self.signalling = signalling
def on_json(msg):
if 'STATE' in msg:
state = NegotiationState (msg['STATE'])
self._update_negotiation_state(state)
if state == NegotiationState.OFFER_CREATED:
self.on_offer_created.fire(msg['description'])
elif state == NegotiationState.ANSWER_CREATED:
self.on_answer_created.fire(msg['description'])
elif state == NegotiationState.OFFER_SET:
self.on_offer_set.fire (msg['description'])
elif state == NegotiationState.ANSWER_SET:
self.on_answer_set.fire (msg['description'])
elif 'DATA-NEW' in msg:
new = msg['DATA-NEW']
observer = RemoteDataChannelObserver(new['id'], new['location'], self)
self.add_channel (observer)
elif 'DATA-STATE' in msg:
ident = msg['id']
channel = self.find_channel(ident)
channel._update_state (DataChannelState(msg['DATA-STATE']))
elif 'DATA-MSG' in msg:
ident = msg['id']
channel = self.find_channel(ident)
channel.got_message(msg['DATA-MSG'])
self.signalling.have_json.connect (on_json)
def add_data_channel (self, ident):
msg = json.dumps({'DATA_CREATE': {'id': ident}})
self.signalling.send (msg)
def create_offer (self):
msg = json.dumps({'CREATE_OFFER': ""})
self.signalling.send (msg)
def create_answer (self):
msg = json.dumps({'CREATE_ANSWER': ""})
self.signalling.send (msg)
def set_title (self, title):
# entirely for debugging purposes
msg = json.dumps({'SET_TITLE': title})
self.signalling.send (msg)
def set_options (self, opts):
options = {}
if opts.has_field("remote-bundle-policy"):
options["bundlePolicy"] = opts["remote-bundle-policy"]
msg = json.dumps({'OPTIONS' : options})
self.signalling.send (msg)
class RemoteDataChannelObserver(DataChannelObserver):
"""
Use information sent over the signalling channel to construct the current
state of a remote peer's data channel. Allow performing actions by sending
requests over the signalling channel.
"""
def __init__(self, ident, location, webrtc):
super().__init__(ident, location)
self.webrtc = webrtc
def send_string(self, msg):
msg = json.dumps({'DATA_SEND_MSG': {'msg' : msg, 'id': self.ident}})
self.webrtc.signalling.send (msg)
def close (self):
msg = json.dumps({'DATA_CLOSE': {'id': self.ident}})
self.webrtc.signalling.send (msg)

View file

@ -1,6 +1,4 @@
#*- vi:si:et:sw=4:sts=4:ts=4:syntax=python
#
# Copyright (c) 2018 Matthew Waters <matthew@centricular.com>
# Copyright (c) 2020 Matthew Waters <matthew@centricular.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@ -25,12 +23,9 @@ import os
TEST_MANAGER = "webrtc"
BLACKLIST = [
]
def setup_tests(test_manager, options):
print("Setting up webrtc tests")
# test_manager.set_default_blacklist(BLACKLIST)
return True

View file

@ -27,9 +27,5 @@
<div><textarea id="text" cols=40 rows=4></textarea></div>
<div>Our id is <b id="peer-id">unknown</b></div>
<br/>
<div>
<div>getUserMedia constraints being used:</div>
<div><textarea id="constraints" cols=40 rows=4></textarea></div>
</div>
</body>
</html>

View file

@ -14,13 +14,12 @@ var ws_port;
var default_peer_id;
// Override with your own STUN servers if you want
var rtc_configuration = {iceServers: [{urls: "stun:stun.services.mozilla.com"},
{urls: "stun:stun.l.google.com:19302"}]};
// The default constraints that will be attempted. Can be overriden by the user.
var default_constraints = {video: true, audio: true};
{urls: "stun:stun.l.google.com:19302"},]};
var default_constraints = {video: true, audio: false};
var connect_attempts = 0;
var peer_connection;
var send_channel;
var channels = []
var ws_conn;
// Promise for local stream after constraints are approved by the user
var local_stream_promise;
@ -56,7 +55,7 @@ function setError(text) {
var span = document.getElementById("status")
span.textContent = text;
span.classList.add('error');
ws_conn.send(JSON.stringify({'STATE': 'error'}))
ws_conn.send(JSON.stringify({'STATE': 'error', 'msg' : text}))
}
function resetVideo() {
@ -75,27 +74,40 @@ function resetVideo() {
videoElement.load();
}
function updateRemoteStateFromSetSDPJson(sdp) {
if (sdp.type == "offer")
ws_conn.send(JSON.stringify({'STATE': 'offer-set', 'description' : sdp}))
else if (sdp.type == "answer")
ws_conn.send(JSON.stringify({'STATE': 'answer-set', 'description' : sdp}))
else
throw new Error ("Unknown SDP type!");
}
function updateRemoteStateFromGeneratedSDPJson(sdp) {
if (sdp.type == "offer")
ws_conn.send(JSON.stringify({'STATE': 'offer-created', 'description' : sdp}))
else if (sdp.type == "answer")
ws_conn.send(JSON.stringify({'STATE': 'answer-created', 'description' : sdp}))
else
throw new Error ("Unknown SDP type!");
}
// SDP offer received from peer, set remote description and create an answer
function onIncomingSDP(sdp) {
peer_connection.setRemoteDescription(sdp).then(() => {
setStatus("Remote SDP set");
if (sdp.type != "offer")
return;
setStatus("Got SDP offer");
local_stream_promise.then((stream) => {
setStatus("Got local stream, creating answer");
peer_connection.createAnswer()
.then(onLocalDescription).catch(setError);
}).catch(setError);
updateRemoteStateFromSetSDPJson(sdp)
setStatus("Set remote SDP", sdp.type);
}).catch(setError);
}
// Local description was set, send it to peer
function onLocalDescription(desc) {
updateRemoteStateFromGeneratedSDPJson(desc)
console.log("Got local description: " + JSON.stringify(desc));
peer_connection.setLocalDescription(desc).then(function() {
setStatus("Sending SDP answer");
sdp = {'sdp': peer_connection.localDescription}
updateRemoteStateFromSetSDPJson(desc)
sdp = {'sdp': desc}
setStatus("Sending SDP", sdp.type);
ws_conn.send(JSON.stringify(sdp));
});
}
@ -103,9 +115,33 @@ function onLocalDescription(desc) {
// ICE candidate received from peer, add it to the peer connection
function onIncomingICE(ice) {
var candidate = new RTCIceCandidate(ice);
console.log("adding candidate", candidate)
peer_connection.addIceCandidate(candidate).catch(setError);
}
function createOffer(offer) {
local_stream_promise.then((stream) => {
setStatus("Got local stream, creating offer");
peer_connection.createOffer()
.then(onLocalDescription).catch(setError);
}).catch(setError)
}
function createAnswer(offer) {
local_stream_promise.then((stream) => {
setStatus("Got local stream, creating answer");
peer_connection.createAnswer()
.then(onLocalDescription).catch(setError);
}).catch(setError)
}
function handleOptions(options) {
console.log ('received options', options);
if (options.bundlePolicy != null) {
rtc_configuration['bundlePolicy'] = options.bundlePolicy;
}
}
function onServerMessage(event) {
console.log("Received " + event.data);
switch (event.data) {
@ -129,14 +165,33 @@ function onServerMessage(event) {
return;
}
if (msg.SET_TITLE != null) {
// some debugging for tests that hang around
document.title = msg['SET_TITLE']
return;
} else if (msg.OPTIONS != null) {
handleOptions(msg.OPTIONS);
return;
}
// Incoming JSON signals the beginning of a call
if (!peer_connection)
createCall(msg);
createCall();
if (msg.sdp != null) {
onIncomingSDP(msg.sdp);
} else if (msg.ice != null) {
onIncomingICE(msg.ice);
} else if (msg.CREATE_OFFER != null) {
createOffer(msg.CREATE_OFFER)
} else if (msg.CREATE_ANSWER != null) {
createAnswer(msg.CREATE_ANSWER)
} else if (msg.DATA_CREATE != null) {
addDataChannel(msg.DATA_CREATE.id)
} else if (msg.DATA_CLOSE != null) {
closeDataChannel(msg.DATA_CLOSE.id)
} else if (msg.DATA_SEND_MSG != null) {
sendDataChannelMessage(msg.DATA_SEND_MSG)
} else {
handleIncomingError("Unknown incoming JSON: " + msg);
}
@ -151,6 +206,7 @@ function onServerClose(event) {
peer_connection.close();
peer_connection = null;
}
channels = []
// Reset after a second
window.setTimeout(websocketServerConnect, 1000);
@ -164,14 +220,7 @@ function onServerError(event) {
function getLocalStream() {
var constraints;
var textarea = document.getElementById('constraints');
try {
constraints = JSON.parse(textarea.value);
} catch (e) {
console.error(e);
setError('ERROR parsing constraints: ' + e.message + ', using default constraints');
constraints = default_constraints;
}
console.log(JSON.stringify(constraints));
// Add local stream
@ -192,12 +241,7 @@ function websocketServerConnect() {
var span = document.getElementById("status");
span.classList.remove('error');
span.textContent = '';
// Populate constraints
var textarea = document.getElementById('constraints');
if (textarea.value == '')
textarea.value = JSON.stringify(default_constraints);
// Fetch the peer id to use
var url = new URL(window.location.href);
peer_id = url.searchParams.get("id");
@ -236,7 +280,6 @@ function onRemoteStreamAdded(event) {
if (videoTracks.length > 0) {
console.log('Incoming stream: ' + videoTracks.length + ' video tracks and ' + audioTracks.length + ' audio tracks');
getVideoElement().srcObject = event.stream;
ws_conn.send(JSON.stringify({'STATE': 'remote-stream-received'}))
} else {
handleIncomingError('Stream with unknown tracks added, resetting');
}
@ -246,53 +289,80 @@ function errorUserMediaHandler() {
setError("Browser doesn't support getUserMedia!");
}
const handleDataChannelOpen = (event) =>{
console.log("dataChannel.OnOpen", event);
};
const handleDataChannelMessageReceived = (event) =>{
console.log("dataChannel.OnMessage:", event, event.data.type);
setStatus("Received data channel message");
if (typeof event.data === 'string' || event.data instanceof String) {
console.log('Incoming string message: ' + event.data);
textarea = document.getElementById("text")
textarea.value = textarea.value + '\n' + event.data
} else {
console.log('Incoming data message');
}
send_channel.send("Hi! (from browser)");
ws_conn.send(JSON.stringify({'DATA-MSG' : event.data, 'id' : event.target.label}));
};
const handleDataChannelOpen = (event) =>{
console.log("dataChannel.OnOpen", event);
ws_conn.send(JSON.stringify({'DATA-STATE' : 'open', 'id' : event.target.label}));
};
const handleDataChannelError = (error) =>{
console.log("dataChannel.OnError:", error);
ws_conn.send(JSON.stringify({'DATA-STATE' : error, 'id' : event.target.label}));
};
const handleDataChannelClose = (event) =>{
console.log("dataChannel.OnClose", event);
ws_conn.send(JSON.stringify({'DATA-STATE' : 'closed', 'id' : event.target.label}));
};
function onDataChannel(event) {
setStatus("Data channel created");
let receiveChannel = event.channel;
receiveChannel.onopen = handleDataChannelOpen;
receiveChannel.onmessage = handleDataChannelMessageReceived;
receiveChannel.onerror = handleDataChannelError;
receiveChannel.onclose = handleDataChannelClose;
let channel = event.channel;
console.log('adding remote data channel with label', channel.label)
ws_conn.send(JSON.stringify({'DATA-NEW' : {'id' : channel.label, 'location' : 'remote'}}));
channel.onopen = handleDataChannelOpen;
channel.onmessage = handleDataChannelMessageReceived;
channel.onerror = handleDataChannelError;
channel.onclose = handleDataChannelClose;
channels.push(channel)
}
function createCall(msg) {
function addDataChannel(label) {
channel = peer_connection.createDataChannel(label, null);
console.log('adding local data channel with label', label)
ws_conn.send(JSON.stringify({'DATA-NEW' : {'id' : label, 'location' : 'local'}}));
channel.onopen = handleDataChannelOpen;
channel.onmessage = handleDataChannelMessageReceived;
channel.onerror = handleDataChannelError;
channel.onclose = handleDataChannelClose;
channels.push(channel)
}
function find_channel(label) {
console.log('find', label, 'in', channels)
for (var c in channels) {
if (channels[c].label === label) {
console.log('found', label, c)
return channels[c];
}
}
return null;
}
function closeDataChannel(label) {
channel = find_channel (label)
console.log('closing data channel with label', label)
channel.close()
}
function sendDataChannelMessage(msg) {
channel = find_channel (msg.id)
console.log('sending on data channel', msg.id, 'message', msg.msg)
channel.send(msg.msg)
}
function createCall() {
// Reset connection attempts because we connected successfully
connect_attempts = 0;
console.log('Creating RTCPeerConnection');
console.log('Creating RTCPeerConnection with configuration', rtc_configuration);
peer_connection = new RTCPeerConnection(rtc_configuration);
send_channel = peer_connection.createDataChannel('label', null);
send_channel.onopen = handleDataChannelOpen;
send_channel.onmessage = handleDataChannelMessageReceived;
send_channel.onerror = handleDataChannelError;
send_channel.onclose = handleDataChannelClose;
peer_connection.ondatachannel = onDataChannel;
peer_connection.onaddstream = onRemoteStreamAdded;
/* Send our video/audio to the other peer */
@ -302,10 +372,6 @@ function createCall(msg) {
return stream;
}).catch(setError);
if (!msg.sdp) {
console.log("WARNING: First message wasn't an SDP message!?");
}
peer_connection.onicecandidate = (event) => {
// We have a candidate, send it to the remote party with the
// same uuid
@ -313,6 +379,7 @@ function createCall(msg) {
console.log("ICE Candidate was null, done");
return;
}
console.log("generated ICE Candidate", event.candidate);
ws_conn.send(JSON.stringify({'ice': event.candidate}));
};

View file

@ -21,12 +21,13 @@ import os
import sys
import argparse
import json
import logging
from signalling import WebRTCSignallingClient
from actions import register_action_types, ActionObserver
from signalling import WebRTCSignallingClient, RemoteWebRTCObserver
from actions import ActionObserver
from client import WebRTCClient
from browser import Browser, create_driver
from enums import SignallingState, NegotiationState, RemoteState
from enums import SignallingState, NegotiationState, DataChannelState, Actions
import gi
gi.require_version("GLib", "2.0")
@ -40,14 +41,20 @@ from gi.repository import GstSdp
gi.require_version("GstValidate", "1.0")
from gi.repository import GstValidate
FORMAT = '%(asctime)-23s %(levelname)-7s %(thread)d %(name)-24s\t%(funcName)-24s %(message)s'
LEVEL = os.environ.get("LOGLEVEL", "DEBUG")
logging.basicConfig(level=LEVEL, format=FORMAT)
l = logging.getLogger(__name__)
class WebRTCApplication(object):
def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source):
def __init__(self, server, id_, peerid, scenario_name, browser_name, html_source, test_name=None):
self.server = server
self.peerid = peerid
self.html_source = html_source
self.id = id_
self.scenario_name = scenario_name
self.browser_name = browser_name
self.test_name = test_name
def _init_validate(self, scenario_file):
self.runner = GstValidate.Runner.new()
@ -61,24 +68,79 @@ class WebRTCApplication(object):
self.client.pipeline.set_state(Gst.State.PLAYING)
def _on_scenario_done(self, scenario):
self.quit()
l.error ('scenario done')
GLib.idle_add(self.quit)
def _connect_actions(self):
def create_offer():
self.client.create_offer(None)
def _connect_actions(self, actions):
def on_action(atype, action):
"""
From a validate action, perform the action as required
"""
if atype == Actions.CREATE_OFFER:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
c.create_offer()
return GstValidate.ActionReturn.OK
self.actions.create_offer.connect(create_offer)
def wait_for_negotiation_state(state):
states = [state, NegotiationState.ERROR]
state = self.client.wait_for_negotiation_states(states)
return GstValidate.ActionReturn.OK if state != RemoteState.ERROR else GstValidate.ActionReturn.ERROR
self.actions.wait_for_negotiation_state.connect(wait_for_negotiation_state)
def add_stream(pipeline):
self.client.add_stream(pipeline)
elif atype == Actions.CREATE_ANSWER:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
c.create_answer()
return GstValidate.ActionReturn.OK
self.actions.add_stream.connect(add_stream)
elif atype == Actions.WAIT_FOR_NEGOTIATION_STATE:
states = [NegotiationState(action.structure["state"]), NegotiationState.ERROR]
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
state = c.wait_for_negotiation_states(states)
return GstValidate.ActionReturn.OK if state != NegotiationState.ERROR else GstValidate.ActionReturn.ERROR
elif atype == Actions.ADD_STREAM:
self.client.add_stream(action.structure["pipeline"])
return GstValidate.ActionReturn.OK
elif atype == Actions.ADD_DATA_CHANNEL:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
c.add_data_channel(action.structure["id"])
return GstValidate.ActionReturn.OK
elif atype == Actions.SEND_DATA_CHANNEL_STRING:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
channel = c.find_channel (action.structure["id"])
channel.send_string (action.structure["msg"])
return GstValidate.ActionReturn.OK
elif atype == Actions.WAIT_FOR_DATA_CHANNEL_STATE:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
states = [DataChannelState(action.structure["state"]), DataChannelState.ERROR]
channel = c.find_channel (action.structure["id"])
state = channel.wait_for_states(states)
return GstValidate.ActionReturn.OK if state != DataChannelState.ERROR else GstValidate.ActionReturn.ERROR
elif atype == Actions.CLOSE_DATA_CHANNEL:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
channel = c.find_channel (action.structure["id"])
channel.close()
return GstValidate.ActionReturn.OK
elif atype == Actions.WAIT_FOR_DATA_CHANNEL:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
state = c.wait_for_data_channel(action.structure["id"])
return GstValidate.ActionReturn.OK
elif atype == Actions.WAIT_FOR_DATA_CHANNEL_STRING:
assert action.structure["which"] in ("local", "remote")
c = self.client if action.structure["which"] == "local" else self.remote_client
channel = c.find_channel (action.structure["id"])
channel.wait_for_message(action.structure["msg"])
return GstValidate.ActionReturn.OK
elif atype == Actions.WAIT_FOR_NEGOTIATION_NEEDED:
self.client.wait_for_negotiation_needed(action.structure["generation"])
return GstValidate.ActionReturn.OK
elif atype == Actions.SET_WEBRTC_OPTIONS:
self.client.set_options (action.structure)
self.remote_client.set_options (action.structure)
return GstValidate.ActionReturn.OK
else:
assert "Not reached" == ""
actions.action.connect (on_action)
def _connect_client_observer(self):
def on_offer_created(offer):
@ -87,6 +149,12 @@ class WebRTCApplication(object):
self.signalling.send(msg)
self.client.on_offer_created.connect(on_offer_created)
def on_answer_created(answer):
text = answer.sdp.as_text()
msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}})
self.signalling.send(msg)
self.client.on_answer_created.connect(on_answer_created)
def on_ice_candidate(mline, candidate):
msg = json.dumps({'ice': {'sdpMLineIndex': str(mline), 'candidate' : candidate}})
self.signalling.send(msg)
@ -116,6 +184,7 @@ class WebRTCApplication(object):
def error(msg):
# errors are unexpected
l.error ('Unexpected error: ' + msg)
GLib.idle_add(self.quit)
GLib.idle_add(sys.exit, -20)
self.signalling.error.connect(error)
@ -126,37 +195,50 @@ class WebRTCApplication(object):
self.client = WebRTCClient()
self._connect_client_observer()
self.actions = ActionObserver()
register_action_types(self.actions)
self._connect_actions()
self.signalling = WebRTCSignallingClient(self.server, self.id)
self.remote_client = RemoteWebRTCObserver (self.signalling)
self._connect_signalling_observer()
actions = ActionObserver()
actions.register_action_types()
self._connect_actions(actions)
# wait for the signalling server to start up before creating the browser
self.signalling.wait_for_states([SignallingState.OPEN])
self.signalling.hello()
self.browser = Browser(create_driver(self.browser_name), self.html_source)
self.browser = Browser(create_driver(self.browser_name))
self.browser.open(self.html_source)
browser_id = self.browser.get_peer_id ()
assert browser_id == self.peerid
self.signalling.create_session(self.peerid)
test_name = self.test_name if self.test_name else self.scenario_name
self.remote_client.set_title (test_name)
self._init_validate(self.scenario_name)
print("app initialized")
def quit(self):
# Stop signalling first so asyncio doesn't keep us alive on weird failures
l.info('quiting')
self.signalling.stop()
self.browser.driver.quit()
self.client.stop()
l.info('signalling stopped')
self.main_loop.quit()
l.info('main loop stopped')
self.client.stop()
l.info('client stopped')
self.browser.driver.quit()
l.info('browser exitted')
def run(self):
try:
self._init()
l.info("app initialized")
self.main_loop.run()
l.info("loop exited")
except:
l.exception("Fatal error")
self.quit()
raise
@ -168,6 +250,7 @@ def parse_options():
parser.add_argument('--html-source', help='HTML page to open in the browser', default=None)
parser.add_argument('--scenario', help='Scenario file to execute', default=None)
parser.add_argument('--browser', help='Browser name to use', default=None)
parser.add_argument('--name', help='Name of the test', default=None)
return parser.parse_args()
def init():
@ -196,7 +279,7 @@ def init():
def run():
args = init()
w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source)
w = WebRTCApplication (args.server, args.id, args.peer_id, args.scenario, args.browser, args.html_source, test_name=args.name)
return w.run()
if __name__ == "__main__":