webrtc: implement support for asynchronous host resolution

Doesn't block anymore if a mdns host resolution takes multiple seconds
to complete in e.g. stun/turn/ice candidate usage.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1961>
This commit is contained in:
Matthew Waters 2022-03-04 14:28:21 +11:00 committed by GStreamer Marge Bot
parent dd4bbb6379
commit 6066e913ee

View file

@ -282,56 +282,165 @@ _parse_userinfo (const gchar * userinfo, gchar ** user, gchar ** pass)
*pass = g_uri_unescape_string (&colon[1], NULL);
}
static gchar *
_resolve_host (GstWebRTCICE * ice, const gchar * host)
struct resolve_host_data
{
GResolver *resolver = g_resolver_get_default ();
GstWebRTCICE *ice;
char *host;
gboolean main_context_handled;
gpointer user_data;
GDestroyNotify notify;
};
static void
on_resolve_host (GResolver * resolver, GAsyncResult * res, gpointer user_data)
{
GTask *task = user_data;
struct resolve_host_data *rh;
GError *error = NULL;
GInetAddress *addr;
GList *addresses;
gchar *address;
GST_DEBUG_OBJECT (ice, "Resolving host %s", host);
rh = g_task_get_task_data (task);
if (!(addresses = g_resolver_lookup_by_name (resolver, host, NULL, &error))) {
GST_ERROR ("%s", error->message);
g_clear_error (&error);
return NULL;
if (!(addresses = g_resolver_lookup_by_name_finish (resolver, res, &error))) {
GST_ERROR ("failed to resolve: %s", error->message);
g_task_return_error (task, error);
g_object_unref (task);
return;
}
GST_DEBUG_OBJECT (ice, "Resolved %d addresses for host %s",
g_list_length (addresses), host);
GST_DEBUG_OBJECT (rh->ice, "Resolved %d addresses for host %s with data %p",
g_list_length (addresses), rh->host, rh);
/* XXX: only the first address is used */
addr = addresses->data;
address = g_inet_address_to_string (addr);
g_resolver_free_addresses (addresses);
return address;
g_task_return_pointer (task, addresses,
(GDestroyNotify) g_resolver_free_addresses);
g_object_unref (task);
}
static void
_add_turn_server (GstWebRTCICE * ice, struct NiceStreamItem *item,
GstUri * turn_server)
free_resolve_host_data (struct resolve_host_data *rh)
{
GST_TRACE_OBJECT (rh->ice, "Freeing data %p for resolving host %s", rh,
rh->host);
if (rh->notify)
rh->notify (rh->user_data);
g_free (rh->host);
g_free (rh);
}
static struct resolve_host_data *
resolve_host_data_new (GstWebRTCICE * ice, const char *host)
{
struct resolve_host_data *rh = g_new0 (struct resolve_host_data, 1);
rh->ice = ice;
rh->host = g_strdup (host);
return rh;
}
static gboolean
resolve_host_main_cb (gpointer user_data)
{
GResolver *resolver = g_resolver_get_default ();
GTask *task = user_data;
struct resolve_host_data *rh;
rh = g_task_get_task_data (task);
/* no need to error anymore if the main context disappears and this task is
* not run */
rh->main_context_handled = TRUE;
GST_DEBUG_OBJECT (rh->ice, "Resolving host %s", rh->host);
g_resolver_lookup_by_name_async (resolver, rh->host, NULL,
(GAsyncReadyCallback) on_resolve_host, g_object_ref (task));
return G_SOURCE_REMOVE;
}
static void
error_task_if_unhandled (GTask * task)
{
struct resolve_host_data *rh;
rh = g_task_get_task_data (task);
if (!rh->main_context_handled) {
GST_DEBUG_OBJECT (rh->ice, "host resolve for %s with data %p was never "
"executed, main context quit?", rh->host, rh);
g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_CANCELLED, "%s",
"Cancelled");
}
g_object_unref (task);
}
static void
resolve_host_async (GstWebRTCICE * ice, const gchar * host,
GAsyncReadyCallback cb, gpointer user_data, GDestroyNotify notify)
{
struct resolve_host_data *rh = resolve_host_data_new (ice, host);
GTask *task;
rh->user_data = user_data;
rh->notify = notify;
task = g_task_new (rh->ice, NULL, cb, user_data);
g_task_set_task_data (task, rh, (GDestroyNotify) free_resolve_host_data);
GST_TRACE_OBJECT (rh->ice, "invoking main context for resolving host %s "
"with data %p", host, rh);
g_main_context_invoke_full (ice->priv->main_context, G_PRIORITY_DEFAULT,
resolve_host_main_cb, task, (GDestroyNotify) error_task_if_unhandled);
}
static GList *
resolve_host_finish (GstWebRTCICE * ice, GAsyncResult * res, GError ** error)
{
g_return_val_if_fail (g_task_is_valid (res, ice), NULL);
return g_task_propagate_pointer (G_TASK (res), error);
}
struct turn_server_data
{
GstUri *uri;
guint nice_stream_id;
};
static void
turn_server_data_free (struct turn_server_data *data)
{
gst_uri_unref (data->uri);
g_free (data);
}
static void
on_turn_server_resolved (GstWebRTCICE * ice, GAsyncResult * res,
struct turn_server_data *user_data)
{
GList *addresses;
GError *error = NULL;
GstUri *turn_server = user_data->uri;
gboolean ret;
gchar *user, *pass;
const gchar *host, *userinfo, *transport, *scheme;
const gchar *userinfo, *transport, *scheme;
NiceRelayType relays[4] = { 0, };
int i, relay_n = 0;
gchar *ip = NULL;
host = gst_uri_get_host (turn_server);
if (!host) {
GST_ERROR_OBJECT (ice, "Turn server has no host");
goto out;
}
ip = _resolve_host (ice, host);
if (!ip) {
GST_ERROR_OBJECT (ice, "Failed to resolve turn server '%s'", host);
goto out;
if (!(addresses = resolve_host_finish (ice, res, &error))) {
GST_WARNING_OBJECT (ice, "failed to resolve turn address: %s",
error->message);
g_clear_error (&error);
return;
}
/* XXX: only the first IP is used */
ip = g_inet_address_to_string (addresses->data);
/* Set the resolved IP as the host since that's what libnice wants */
gst_uri_set_host (turn_server, ip);
@ -352,7 +461,7 @@ _add_turn_server (GstWebRTCICE * ice, struct NiceStreamItem *item,
for (i = 0; i < relay_n; i++) {
ret = nice_agent_set_relay_info (ice->priv->nice_agent,
item->nice_stream_id, NICE_COMPONENT_TYPE_RTP,
user_data->nice_stream_id, NICE_COMPONENT_TYPE_RTP,
gst_uri_get_host (turn_server), gst_uri_get_port (turn_server),
user, pass, relays[i]);
if (!ret) {
@ -365,10 +474,30 @@ _add_turn_server (GstWebRTCICE * ice, struct NiceStreamItem *item,
g_free (user);
g_free (pass);
out:
g_free (ip);
}
static void
_add_turn_server (GstWebRTCICE * ice, struct NiceStreamItem *item,
GstUri * turn_server)
{
struct turn_server_data *data;
const gchar *host;
host = gst_uri_get_host (turn_server);
if (!host) {
GST_ERROR_OBJECT (ice, "Turn server has no host");
return;
}
data = g_new0 (struct turn_server_data, 1);
data->nice_stream_id = item->nice_stream_id;
data->uri = gst_uri_copy (turn_server);
resolve_host_async (ice, host, (GAsyncReadyCallback) on_turn_server_resolved,
data, (GDestroyNotify) turn_server_data_free);
}
typedef struct
{
GstWebRTCICE *ice;
@ -382,13 +511,37 @@ _add_turn_server_func (const gchar * uri, GstUri * turn_server,
_add_turn_server (data->ice, data->item, turn_server);
}
static void
on_stun_server_resolved (GstWebRTCICE * ice, GAsyncResult * res,
gpointer user_data)
{
GList *addresses;
GError *error = NULL;
guint port = GPOINTER_TO_UINT (user_data);
char *ip;
if (!(addresses = resolve_host_finish (ice, res, &error))) {
GST_WARNING_OBJECT (ice, "Failed to resolve stun server: %s",
error->message);
g_clear_error (&error);
return;
}
/* XXX: only the first IP is used */
ip = g_inet_address_to_string (addresses->data);
g_object_set (ice->priv->nice_agent, "stun-server", ip,
"stun-server-port", port, NULL);
g_free (ip);
}
static void
_add_stun_server (GstWebRTCICE * ice, GstUri * stun_server)
{
const gchar *msg = "must be of the form stun://<host>:<port>";
const gchar *host;
gchar *s = NULL;
gchar *ip = NULL;
guint port;
s = gst_uri_to_string (stun_server);
@ -407,18 +560,11 @@ _add_stun_server (GstWebRTCICE * ice, GstUri * stun_server)
gst_uri_set_port (stun_server, port);
}
ip = _resolve_host (ice, host);
if (!ip) {
GST_ERROR_OBJECT (ice, "Failed to resolve stun server '%s'", host);
goto out;
}
g_object_set (ice->priv->nice_agent, "stun-server", ip,
"stun-server-port", port, NULL);
resolve_host_async (ice, host, (GAsyncReadyCallback) on_stun_server_resolved,
GUINT_TO_POINTER (port), NULL);
out:
g_free (s);
g_free (ip);
}
GstWebRTCICEStream *
@ -623,6 +769,84 @@ failure:
return FALSE;
}
struct resolve_candidate_data
{
guint nice_stream_id;
char *prefix;
char *postfix;
};
static void
free_resolve_candidate_data (struct resolve_candidate_data *rc)
{
g_free (rc->prefix);
g_free (rc->postfix);
g_free (rc);
}
static void
add_ice_candidate_to_libnice (GstWebRTCICE * ice, guint nice_stream_id,
NiceCandidate * cand)
{
GSList *candidates = NULL;
if (cand->component_id == 2) {
/* we only support rtcp-mux so rtcp candidates are useless for us */
GST_INFO_OBJECT (ice, "Dropping RTCP candidate");
return;
}
candidates = g_slist_append (candidates, cand);
nice_agent_set_remote_candidates (ice->priv->nice_agent, nice_stream_id,
cand->component_id, candidates);
g_slist_free (candidates);
}
static void
on_candidate_resolved (GstWebRTCICE * ice, GAsyncResult * res,
gpointer user_data)
{
struct resolve_candidate_data *rc = user_data;
GError *error = NULL;
GList *addresses;
char *new_candv[4] = { NULL, };
char *new_addr, *new_candidate;
NiceCandidate *cand;
if (!(addresses = resolve_host_finish (ice, res, &error))) {
GST_WARNING_OBJECT (ice, "Could not resolve candidate address: %s",
error->message);
g_clear_error (&error);
return;
}
new_addr = g_inet_address_to_string (addresses->data);
new_candv[0] = rc->prefix;
new_candv[1] = new_addr;
new_candv[2] = rc->postfix;
new_candv[3] = NULL;
new_candidate = g_strjoinv (" ", new_candv);
GST_DEBUG_OBJECT (ice, "resolved to candidate %s", new_candidate);
cand =
nice_agent_parse_remote_candidate_sdp (ice->priv->nice_agent,
rc->nice_stream_id, new_candidate);
g_free (new_candidate);
if (!cand) {
GST_WARNING_OBJECT (ice, "Could not parse candidate \'%s\'", new_candidate);
return;
}
g_free (new_addr);
add_ice_candidate_to_libnice (ice, rc->nice_stream_id, cand);
nice_candidate_free (cand);
}
/* candidate must start with "a=candidate:" or be NULL*/
void
gst_webrtc_ice_add_candidate (GstWebRTCICE * ice, GstWebRTCICEStream * stream,
@ -630,7 +854,6 @@ gst_webrtc_ice_add_candidate (GstWebRTCICE * ice, GstWebRTCICEStream * stream,
{
struct NiceStreamItem *item;
NiceCandidate *cand;
GSList *candidates = NULL;
item = _find_item (ice, -1, -1, stream);
g_return_if_fail (item != NULL);
@ -647,9 +870,7 @@ gst_webrtc_ice_add_candidate (GstWebRTCICE * ice, GstWebRTCICEStream * stream,
if (!cand) {
/* might be a .local candidate */
char *prefix = NULL, *address = NULL, *postfix = NULL;
char *new_addr = NULL, *new_candidate = NULL;
char *new_candv[4] = { NULL, };
gboolean failure = TRUE;
struct resolve_candidate_data *rc;
if (!get_candidate_address (candidate, &prefix, &address, &postfix)) {
GST_WARNING_OBJECT (ice, "Failed to retrieve address from candidate %s",
@ -663,54 +884,26 @@ gst_webrtc_ice_add_candidate (GstWebRTCICE * ice, GstWebRTCICEStream * stream,
goto done;
}
/* FIXME: async */
if (!(new_addr = _resolve_host (ice, address))) {
GST_WARNING_OBJECT (ice, "Failed to resolve %s", address);
goto done;
}
rc = g_new0 (struct resolve_candidate_data, 1);
rc->nice_stream_id = item->nice_stream_id;
rc->prefix = prefix;
rc->postfix = postfix;
resolve_host_async (ice, address,
(GAsyncReadyCallback) on_candidate_resolved, rc,
(GDestroyNotify) free_resolve_candidate_data);
new_candv[0] = prefix;
new_candv[1] = new_addr;
new_candv[2] = postfix;
new_candv[3] = NULL;
new_candidate = g_strjoinv (" ", new_candv);
GST_DEBUG_OBJECT (ice, "resolved to candidate %s", new_candidate);
cand =
nice_agent_parse_remote_candidate_sdp (ice->priv->nice_agent,
item->nice_stream_id, new_candidate);
if (!cand) {
GST_WARNING_OBJECT (ice, "Could not parse candidate \'%s\'",
new_candidate);
goto done;
}
failure = FALSE;
prefix = NULL;
postfix = NULL;
done:
g_free (prefix);
g_free (address);
g_free (postfix);
g_free (new_addr);
g_free (new_candidate);
if (failure)
return;
}
g_clear_pointer (&address, g_free);
g_clear_pointer (&prefix, g_free);
g_clear_pointer (&postfix, g_free);
if (cand->component_id == 2) {
/* we only support rtcp-mux so rtcp candidates are useless for us */
GST_INFO_OBJECT (ice, "Dropping RTCP candidate %s", candidate);
nice_candidate_free (cand);
return;
}
candidates = g_slist_append (candidates, cand);
nice_agent_set_remote_candidates (ice->priv->nice_agent, item->nice_stream_id,
cand->component_id, candidates);
g_slist_free (candidates);
add_ice_candidate_to_libnice (ice, item->nice_stream_id, cand);
nice_candidate_free (cand);
}