mpegtsmux: Make handling of sinkpads thread-safe

Ensure we take the object lock while accessing `GstElement.sinkpads`.
Use an iterator when the code isn't simple to avoid deadlock.

When we find the best pad, take a reference so a concurrent pad
release doesn't destroy the pad before we're done with it.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1553>
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-09-02 15:29:49 +02:00 committed by GStreamer Merge Bot
parent 3f9a7e5c73
commit 66f9d37c37
2 changed files with 66 additions and 42 deletions

View file

@ -344,10 +344,14 @@ gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
gst_event_replace (&mux->force_key_unit_event, NULL);
gst_buffer_replace (&mux->out_buffer, NULL);
GST_OBJECT_LOCK (mux);
for (l = GST_ELEMENT (mux)->sinkpads; l; l = l->next) {
gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (l->data));
}
GST_OBJECT_UNLOCK (mux);
if (alloc) {
g_assert (klass->create_ts_mux);
@ -810,23 +814,26 @@ no_stream:
}
}
static gboolean
gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
gpointer user_data)
{
GstFlowReturn *ret = user_data;
*ret = gst_base_ts_mux_create_pad_stream (GST_BASE_TS_MUX (element), pad);
return *ret == GST_FLOW_OK;
}
static GstFlowReturn
gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
{
GstFlowReturn ret = GST_FLOW_OK;
GList *walk = GST_ELEMENT (mux)->sinkpads;
/* Create the streams */
while (walk) {
GstPad *pad = GST_PAD (walk->data);
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (mux),
gst_base_ts_mux_create_pad_stream_func, &ret);
ret = gst_base_ts_mux_create_pad_stream (mux, pad);
if (ret != GST_FLOW_OK)
return ret;
walk = g_list_next (walk);
}
return GST_FLOW_OK;
return ret;
}
static void
@ -1478,11 +1485,9 @@ gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
case GST_EVENT_CUSTOM_UPSTREAM:
{
GstIterator *iter;
GstIteratorResult iter_ret;
GstPad *sinkpad;
GValue sinkpad_value = G_VALUE_INIT;
GstClockTime running_time;
gboolean all_headers, done, res = FALSE;
gboolean all_headers, done = FALSE, res = FALSE;
guint count;
if (!gst_video_event_is_force_key_unit (event))
@ -1505,28 +1510,28 @@ gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event)
gst_event_replace (&mux->force_key_unit_event, event);
iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux));
done = FALSE;
while (!done) {
gboolean tmp;
switch (gst_iterator_next (iter, &sinkpad_value)) {
case GST_ITERATOR_OK:{
GstPad *sinkpad = g_value_get_object (&sinkpad_value);
gboolean tmp;
iter_ret = gst_iterator_next (iter, &sinkpad_value);
sinkpad = GST_PAD (g_value_get_object (&sinkpad_value));
switch (iter_ret) {
case GST_ITERATOR_DONE:
done = TRUE;
break;
case GST_ITERATOR_OK:
GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding");
tmp = gst_pad_push_event (sinkpad, gst_event_ref (event));
GST_INFO_OBJECT (mux, "result %d", tmp);
/* succeed if at least one pad succeeds */
res |= tmp;
break;
case GST_ITERATOR_ERROR:
}
case GST_ITERATOR_DONE:
done = TRUE;
break;
case GST_ITERATOR_RESYNC:
gst_iterator_resync (iter);
break;
case GST_ITERATOR_ERROR:
g_assert_not_reached ();
break;
}
g_value_reset (&sinkpad_value);
@ -1636,28 +1641,38 @@ gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps,
static GstBaseTsMuxPad *
gst_base_ts_mux_find_best_pad (GstAggregator * aggregator)
{
GstBaseTsMuxPad *pad, *best = NULL;
GList *l;
GstBuffer *buffer;
GstBaseTsMuxPad *best = NULL;
GstClockTime best_ts = GST_CLOCK_TIME_NONE;
GList *l;
GST_OBJECT_LOCK (aggregator);
for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) {
pad = GST_BASE_TS_MUX_PAD (l->data);
buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (pad));
GstBaseTsMuxPad *tpad = GST_BASE_TS_MUX_PAD (l->data);
GstAggregatorPad *apad = GST_AGGREGATOR_PAD_CAST (tpad);
GstBuffer *buffer;
buffer = gst_aggregator_pad_peek_buffer (apad);
if (!buffer)
continue;
if (best_ts == GST_CLOCK_TIME_NONE) {
best = pad;
best = tpad;
best_ts = GST_BUFFER_DTS_OR_PTS (buffer);
} else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) {
GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer);
if (t < best_ts) {
best = pad;
best = tpad;
best_ts = t;
}
}
gst_buffer_unref (buffer);
}
if (best)
gst_object_ref (best);
GST_OBJECT_UNLOCK (aggregator);
GST_DEBUG_OBJECT (aggregator,
"Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT,
GST_TIME_ARGS (best_ts), best);
@ -1669,14 +1684,22 @@ static gboolean
gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux)
{
GList *l;
gboolean ret = TRUE;
GST_OBJECT_LOCK (mux);
for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data);
if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)))
return FALSE;
if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) {
ret = FALSE;
break;
}
}
return TRUE;
GST_OBJECT_UNLOCK (mux);
return ret;
}
@ -1696,6 +1719,8 @@ gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout)
gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg),
GST_AGGREGATOR_PAD (best), buffer);
gst_object_unref (best);
if (ret != GST_FLOW_OK)
goto done;
}
@ -1768,7 +1793,7 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
GList *walk;
GList *l;
switch (prop_id) {
case PROP_PROG_MAP:
@ -1789,15 +1814,14 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
break;
case PROP_PMT_INTERVAL:
walk = GST_ELEMENT (object)->sinkpads;
mux->pmt_interval = g_value_get_uint (value);
while (walk) {
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
GST_OBJECT_LOCK (mux);
for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
walk = g_list_next (walk);
}
GST_OBJECT_UNLOCK (mux);
break;
case PROP_ALIGNMENT:
mux->alignment = g_value_get_int (value);

View file

@ -176,7 +176,7 @@ struct GstBaseTsMux {
guint pcr_interval;
guint scte35_pid;
guint scte35_null_interval;
/* state */
gboolean first;
GstClockTime pending_key_unit_ts;