mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-12-18 22:36:33 +00:00
shout2send: use non-blocking I/O and a configurable network operations timeout
This allows timing out on network errors much earlier (currently it takes ~15min to timeout) and we can still unlock and change state in the meantime. https://bugzilla.gnome.org/show_bug.cgi?id=571722
This commit is contained in:
parent
34d08a0169
commit
b26d44501c
2 changed files with 131 additions and 4 deletions
|
@ -70,7 +70,10 @@ enum
|
||||||
ARG_PROTOCOL, /* Protocol to connect with */
|
ARG_PROTOCOL, /* Protocol to connect with */
|
||||||
|
|
||||||
ARG_MOUNT, /* mountpoint of stream (icecast only) */
|
ARG_MOUNT, /* mountpoint of stream (icecast only) */
|
||||||
ARG_URL /* the stream's homepage URL */
|
ARG_URL, /* the stream's homepage URL */
|
||||||
|
|
||||||
|
ARG_TIMEOUT /* The max amount of time to wait for
|
||||||
|
network activity */
|
||||||
};
|
};
|
||||||
|
|
||||||
#define DEFAULT_IP "127.0.0.1"
|
#define DEFAULT_IP "127.0.0.1"
|
||||||
|
@ -84,6 +87,7 @@ enum
|
||||||
#define DEFAULT_MOUNT ""
|
#define DEFAULT_MOUNT ""
|
||||||
#define DEFAULT_URL ""
|
#define DEFAULT_URL ""
|
||||||
#define DEFAULT_PROTOCOL SHOUT2SEND_PROTOCOL_HTTP
|
#define DEFAULT_PROTOCOL SHOUT2SEND_PROTOCOL_HTTP
|
||||||
|
#define DEFAULT_TIMEOUT 10000
|
||||||
|
|
||||||
#ifdef SHOUT_FORMAT_WEBM
|
#ifdef SHOUT_FORMAT_WEBM
|
||||||
#define WEBM_CAPS "; video/webm; audio/webm"
|
#define WEBM_CAPS "; video/webm; audio/webm"
|
||||||
|
@ -207,6 +211,12 @@ gst_shout2send_class_init (GstShout2sendClass * klass)
|
||||||
g_param_spec_string ("url", "url", "the stream's homepage URL",
|
g_param_spec_string ("url", "url", "the stream's homepage URL",
|
||||||
DEFAULT_URL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
DEFAULT_URL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
||||||
|
|
||||||
|
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT,
|
||||||
|
g_param_spec_uint ("timeout", "timeout",
|
||||||
|
"Max amount of time to wait for network activity, in milliseconds",
|
||||||
|
1, G_MAXUINT, DEFAULT_TIMEOUT,
|
||||||
|
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
|
||||||
|
|
||||||
/* signals */
|
/* signals */
|
||||||
gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM] =
|
gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM] =
|
||||||
g_signal_new ("connection-problem", G_TYPE_FROM_CLASS (klass),
|
g_signal_new ("connection-problem", G_TYPE_FROM_CLASS (klass),
|
||||||
|
@ -253,6 +263,7 @@ gst_shout2send_init (GstShout2send * shout2send)
|
||||||
shout2send->url = g_strdup (DEFAULT_URL);
|
shout2send->url = g_strdup (DEFAULT_URL);
|
||||||
shout2send->protocol = DEFAULT_PROTOCOL;
|
shout2send->protocol = DEFAULT_PROTOCOL;
|
||||||
shout2send->ispublic = DEFAULT_PUBLIC;
|
shout2send->ispublic = DEFAULT_PUBLIC;
|
||||||
|
shout2send->timeout = DEFAULT_TIMEOUT;
|
||||||
|
|
||||||
shout2send->format = -1;
|
shout2send->format = -1;
|
||||||
shout2send->tags = gst_tag_list_new_empty ();
|
shout2send->tags = gst_tag_list_new_empty ();
|
||||||
|
@ -518,20 +529,53 @@ set_failed:
|
||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_shout2send_connect (GstShout2send * sink)
|
gst_shout2send_connect (GstShout2send * sink)
|
||||||
{
|
{
|
||||||
|
GstFlowReturn fret = GST_FLOW_OK;
|
||||||
|
gint ret;
|
||||||
|
GstClockTime start_ts;
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (sink, "Connection format is: %d", sink->format);
|
GST_DEBUG_OBJECT (sink, "Connection format is: %d", sink->format);
|
||||||
|
|
||||||
if (sink->format == -1)
|
if (sink->format == -1)
|
||||||
goto no_caps;
|
goto no_caps;
|
||||||
|
|
||||||
|
if (shout_set_nonblocking (sink->conn, 1) != SHOUTERR_SUCCESS)
|
||||||
|
goto could_not_set_nonblocking;
|
||||||
|
|
||||||
if (shout_set_format (sink->conn, sink->format) != SHOUTERR_SUCCESS)
|
if (shout_set_format (sink->conn, sink->format) != SHOUTERR_SUCCESS)
|
||||||
goto could_not_set_format;
|
goto could_not_set_format;
|
||||||
|
|
||||||
if (shout_open (sink->conn) != SHOUTERR_SUCCESS)
|
GST_DEBUG_OBJECT (sink, "connecting");
|
||||||
|
|
||||||
|
start_ts = gst_util_get_timestamp ();
|
||||||
|
ret = shout_open (sink->conn);
|
||||||
|
|
||||||
|
/* wait for connection or timeout */
|
||||||
|
while (ret == SHOUTERR_BUSY) {
|
||||||
|
if (gst_util_get_timestamp () - start_ts > sink->timeout * GST_MSECOND) {
|
||||||
|
goto connection_timeout;
|
||||||
|
}
|
||||||
|
if (gst_poll_wait (sink->timer, 10 * GST_MSECOND) == -1) {
|
||||||
|
GST_LOG_OBJECT (sink, "unlocked");
|
||||||
|
|
||||||
|
fret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
|
||||||
|
if (fret != GST_FLOW_OK)
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
ret = shout_get_connected (sink->conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret != SHOUTERR_CONNECTED && ret != SHOUTERR_SUCCESS)
|
||||||
goto could_not_connect;
|
goto could_not_connect;
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (sink, "connected to server");
|
GST_DEBUG_OBJECT (sink, "connected to server");
|
||||||
sink->connected = TRUE;
|
sink->connected = TRUE;
|
||||||
|
|
||||||
|
/* initialize sending rate monitoring */
|
||||||
|
sink->prev_queuelen = 0;
|
||||||
|
sink->data_sent = 0;
|
||||||
|
sink->stalled = TRUE;
|
||||||
|
sink->datasent_reset_ts = sink->stalled_ts = gst_util_get_timestamp ();
|
||||||
|
|
||||||
/* let's set metadata */
|
/* let's set metadata */
|
||||||
if (sink->songmetadata) {
|
if (sink->songmetadata) {
|
||||||
shout_metadata_t *pmetadata;
|
shout_metadata_t *pmetadata;
|
||||||
|
@ -543,7 +587,8 @@ gst_shout2send_connect (GstShout2send * sink)
|
||||||
shout_metadata_free (pmetadata);
|
shout_metadata_free (pmetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
return GST_FLOW_OK;
|
done:
|
||||||
|
return fret;
|
||||||
|
|
||||||
/* ERRORS */
|
/* ERRORS */
|
||||||
no_caps:
|
no_caps:
|
||||||
|
@ -553,6 +598,14 @@ no_caps:
|
||||||
return GST_FLOW_NOT_NEGOTIATED;
|
return GST_FLOW_NOT_NEGOTIATED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
could_not_set_nonblocking:
|
||||||
|
{
|
||||||
|
GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
|
||||||
|
("Error configuring libshout to use non-blocking i/o: %s",
|
||||||
|
shout_get_error (sink->conn)));
|
||||||
|
return GST_FLOW_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
could_not_set_format:
|
could_not_set_format:
|
||||||
{
|
{
|
||||||
GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
|
GST_ELEMENT_ERROR (sink, LIBRARY, SETTINGS, (NULL),
|
||||||
|
@ -569,6 +622,15 @@ could_not_connect:
|
||||||
shout_get_errno (sink->conn));
|
shout_get_errno (sink->conn));
|
||||||
return GST_FLOW_ERROR;
|
return GST_FLOW_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connection_timeout:
|
||||||
|
{
|
||||||
|
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
|
||||||
|
(_("Could not connect to server")), ("connection timed out"));
|
||||||
|
g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
|
||||||
|
shout_get_errno (sink->conn));
|
||||||
|
return GST_FLOW_ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
|
@ -628,6 +690,8 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf)
|
||||||
gint delay;
|
gint delay;
|
||||||
GstFlowReturn fret = GST_FLOW_OK;
|
GstFlowReturn fret = GST_FLOW_OK;
|
||||||
GstMapInfo map;
|
GstMapInfo map;
|
||||||
|
GstClockTime now;
|
||||||
|
ssize_t queuelen;
|
||||||
|
|
||||||
sink = GST_SHOUT2SEND (basesink);
|
sink = GST_SHOUT2SEND (basesink);
|
||||||
|
|
||||||
|
@ -655,13 +719,54 @@ gst_shout2send_render (GstBaseSink * basesink, GstBuffer * buf)
|
||||||
GST_LOG_OBJECT (sink, "we're %d msec late", -delay);
|
GST_LOG_OBJECT (sink, "we're %d msec late", -delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* accumulate how much data have actually been sent
|
||||||
|
* to the network since the last call to shout_send() */
|
||||||
|
queuelen = shout_queuelen (sink->conn);
|
||||||
|
if (sink->prev_queuelen > 0)
|
||||||
|
sink->data_sent += sink->prev_queuelen - queuelen;
|
||||||
|
|
||||||
gst_buffer_map (buf, &map, GST_MAP_READ);
|
gst_buffer_map (buf, &map, GST_MAP_READ);
|
||||||
GST_LOG_OBJECT (sink, "sending %u bytes of data", (guint) map.size);
|
|
||||||
|
/* add map.size instead of re-reading the queue length because
|
||||||
|
* the data may actually be sent immediately */
|
||||||
|
sink->prev_queuelen = queuelen + map.size;
|
||||||
|
|
||||||
|
GST_LOG_OBJECT (sink, "sending %u bytes of data, queue length now is %"
|
||||||
|
G_GUINT64_FORMAT, (guint) map.size, sink->prev_queuelen);
|
||||||
|
|
||||||
ret = shout_send (sink->conn, map.data, map.size);
|
ret = shout_send (sink->conn, map.data, map.size);
|
||||||
|
|
||||||
gst_buffer_unmap (buf, &map);
|
gst_buffer_unmap (buf, &map);
|
||||||
if (ret != SHOUTERR_SUCCESS)
|
if (ret != SHOUTERR_SUCCESS)
|
||||||
goto send_error;
|
goto send_error;
|
||||||
|
|
||||||
|
now = gst_util_get_timestamp ();
|
||||||
|
if (now - sink->datasent_reset_ts >= 500 * GST_MSECOND) {
|
||||||
|
guint64 send_rate;
|
||||||
|
|
||||||
|
send_rate = gst_util_uint64_scale (sink->data_sent, GST_SECOND,
|
||||||
|
now - sink->datasent_reset_ts);
|
||||||
|
|
||||||
|
if (send_rate == 0 && !sink->stalled) {
|
||||||
|
sink->stalled = TRUE;
|
||||||
|
sink->stalled_ts = now;
|
||||||
|
} else if (send_rate > 0 && sink->stalled) {
|
||||||
|
sink->stalled = FALSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
sink->data_sent = 0;
|
||||||
|
sink->datasent_reset_ts = now;
|
||||||
|
|
||||||
|
GST_DEBUG_OBJECT (sink, "sending rate is %" G_GUINT64_FORMAT " bps, "
|
||||||
|
"stalled %d, stalled_ts %" GST_TIME_FORMAT, send_rate, sink->stalled,
|
||||||
|
GST_TIME_ARGS (sink->stalled_ts));
|
||||||
|
|
||||||
|
if (sink->stalled && now - sink->stalled_ts >= sink->timeout * GST_MSECOND) {
|
||||||
|
GST_WARNING_OBJECT (sink, "network send queue is stalled for too long");
|
||||||
|
goto network_error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
|
||||||
return fret;
|
return fret;
|
||||||
|
@ -675,6 +780,15 @@ send_error:
|
||||||
shout_get_errno (sink->conn));
|
shout_get_errno (sink->conn));
|
||||||
return GST_FLOW_ERROR;
|
return GST_FLOW_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
network_error:
|
||||||
|
{
|
||||||
|
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
|
||||||
|
("network timeout reached"));
|
||||||
|
g_signal_emit (sink, gst_shout2send_signals[SIGNAL_CONNECTION_PROBLEM], 0,
|
||||||
|
SHOUTERR_BUSY);
|
||||||
|
return GST_FLOW_ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -727,6 +841,9 @@ gst_shout2send_set_property (GObject * object, guint prop_id,
|
||||||
g_free (shout2send->url);
|
g_free (shout2send->url);
|
||||||
shout2send->url = g_strdup (g_value_get_string (value));
|
shout2send->url = g_strdup (g_value_get_string (value));
|
||||||
break;
|
break;
|
||||||
|
case ARG_TIMEOUT:
|
||||||
|
shout2send->timeout = g_value_get_uint (value);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||||
break;
|
break;
|
||||||
|
@ -775,6 +892,9 @@ gst_shout2send_get_property (GObject * object, guint prop_id,
|
||||||
case ARG_URL: /* the stream's homepage URL */
|
case ARG_URL: /* the stream's homepage URL */
|
||||||
g_value_set_string (value, shout2send->url);
|
g_value_set_string (value, shout2send->url);
|
||||||
break;
|
break;
|
||||||
|
case ARG_TIMEOUT:
|
||||||
|
g_value_set_uint (value, shout2send->timeout);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -46,6 +46,12 @@ struct _GstShout2send {
|
||||||
|
|
||||||
shout_t *conn;
|
shout_t *conn;
|
||||||
|
|
||||||
|
guint64 prev_queuelen;
|
||||||
|
guint64 data_sent;
|
||||||
|
GstClockTime datasent_reset_ts;
|
||||||
|
gboolean stalled;
|
||||||
|
GstClockTime stalled_ts;
|
||||||
|
|
||||||
gchar *ip;
|
gchar *ip;
|
||||||
guint port;
|
guint port;
|
||||||
gchar *password;
|
gchar *password;
|
||||||
|
@ -61,6 +67,7 @@ struct _GstShout2send {
|
||||||
gchar *songartist;
|
gchar *songartist;
|
||||||
gchar *songtitle;
|
gchar *songtitle;
|
||||||
int format;
|
int format;
|
||||||
|
uint timeout;
|
||||||
|
|
||||||
GstTagList* tags;
|
GstTagList* tags;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue