tsmux: Lock mux->tsmux, the programs hash table, and pad streams

They contain implementations that are not thread-safe (e.g. GList, GHashTable).

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1800>
This commit is contained in:
Vivia Nikolaidou 2022-02-01 14:51:27 +02:00 committed by Tim-Philipp Müller
parent f843d3bee3
commit 28c63c1931
2 changed files with 74 additions and 5 deletions

View file

@ -129,6 +129,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
/* Send initial segments again after a flush-stop, and also resend the /* Send initial segments again after a flush-stop, and also resend the
* header sections */ * header sections */
g_mutex_lock (&mux->lock);
mux->first = TRUE; mux->first = TRUE;
/* output PAT, SI tables */ /* output PAT, SI tables */
@ -141,6 +142,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
tsmux_resend_pmt (program); tsmux_resend_pmt (program);
} }
g_mutex_unlock (&mux->lock);
return GST_FLOW_OK; return GST_FLOW_OK;
} }
@ -297,6 +299,7 @@ steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
return TRUE; return TRUE;
} }
/* Must be called with mux->lock held */
static void static void
gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc) gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
{ {
@ -372,6 +375,7 @@ release_buffer_cb (guint8 * data, void *user_data)
stream_data_free ((StreamData *) user_data); stream_data_free ((StreamData *) user_data);
} }
/* Must be called with mux->lock held */
static GstFlowReturn static GstFlowReturn
gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux, gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux,
GstBaseTsMuxPad * ts_pad, GstCaps * caps) GstBaseTsMuxPad * ts_pad, GstCaps * caps)
@ -746,6 +750,7 @@ is_valid_pmt_pid (guint16 pmt_pid)
return TRUE; return TRUE;
} }
/* Must be called with mux->lock held */
static GstFlowReturn static GstFlowReturn
gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad) gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
{ {
@ -767,6 +772,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
return ret; return ret;
} }
/* Must be called with mux->lock held */
static GstFlowReturn static GstFlowReturn
gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad) gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
{ {
@ -876,6 +882,7 @@ no_stream:
} }
} }
/* Must be called with mux->lock held */
static gboolean static gboolean
gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad, gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
gpointer user_data) gpointer user_data)
@ -887,6 +894,7 @@ gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
return *ret == GST_FLOW_OK; return *ret == GST_FLOW_OK;
} }
/* Must be called with mux->lock held */
static GstFlowReturn static GstFlowReturn
gst_base_ts_mux_create_streams (GstBaseTsMux * mux) gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
{ {
@ -1189,11 +1197,13 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
return GST_FLOW_OK; return GST_FLOW_OK;
} }
g_mutex_lock (&mux->lock);
if (G_UNLIKELY (mux->first)) { if (G_UNLIKELY (mux->first)) {
ret = gst_base_ts_mux_create_streams (mux); ret = gst_base_ts_mux_create_streams (mux);
if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (buf) if (buf)
gst_buffer_unref (buf); gst_buffer_unref (buf);
g_mutex_unlock (&mux->lock);
return ret; return ret;
} }
@ -1232,6 +1242,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) { if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
GstEvent *event; GstEvent *event;
g_mutex_unlock (&mux->lock);
event = check_pending_key_unit_event (mux->force_key_unit_event, event = check_pending_key_unit_event (mux->force_key_unit_event,
&agg_pad->segment, GST_BUFFER_PTS (buf), &agg_pad->segment, GST_BUFFER_PTS (buf),
GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts); GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
@ -1251,6 +1262,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
GST_TIME_ARGS (running_time), count); GST_TIME_ARGS (running_time), count);
gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event); gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
g_mutex_lock (&mux->lock);
/* output PAT, SI tables */ /* output PAT, SI tables */
tsmux_resend_pat (mux->tsmux); tsmux_resend_pat (mux->tsmux);
tsmux_resend_si (mux->tsmux); tsmux_resend_si (mux->tsmux);
@ -1261,6 +1273,8 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
tsmux_resend_pmt (program); tsmux_resend_pmt (program);
} }
} else {
g_mutex_lock (&mux->lock);
} }
} }
@ -1315,6 +1329,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported"); GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
gst_buffer_unref (buf); gst_buffer_unref (buf);
g_mutex_unlock (&mux->lock);
return GST_FLOW_OK; return GST_FLOW_OK;
} }
@ -1344,6 +1359,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
goto write_fail; goto write_fail;
} }
} }
g_mutex_unlock (&mux->lock);
/* flush packet cache */ /* flush packet cache */
return gst_base_ts_mux_push_packets (mux, FALSE); return gst_base_ts_mux_push_packets (mux, FALSE);
@ -1385,9 +1401,12 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
GstPad *pad = NULL; GstPad *pad = NULL;
gchar *free_name = NULL; gchar *free_name = NULL;
g_mutex_lock (&mux->lock);
if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) { if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
if (tsmux_find_stream (mux->tsmux, pid)) if (tsmux_find_stream (mux->tsmux, pid)) {
g_mutex_unlock (&mux->lock);
goto stream_exists; goto stream_exists;
}
/* Make sure we don't use reserved PID. /* Make sure we don't use reserved PID.
* FIXME : This should be extended to other variants (ex: ATSC) reserved PID */ * FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
if (pid < TSMUX_START_ES_PID) if (pid < TSMUX_START_ES_PID)
@ -1400,6 +1419,7 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
/* Name the pad correctly after the selected pid */ /* Name the pad correctly after the selected pid */
name = free_name = g_strdup_printf ("sink_%d", pid); name = free_name = g_strdup_printf ("sink_%d", pid);
} }
g_mutex_unlock (&mux->lock);
pad = (GstPad *) pad = (GstPad *)
GST_ELEMENT_CLASS (parent_class)->request_new_pad (element, GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
@ -1433,6 +1453,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
{ {
GstBaseTsMux *mux = GST_BASE_TS_MUX (element); GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
g_mutex_lock (&mux->lock);
if (mux->tsmux) { if (mux->tsmux) {
GList *cur; GList *cur;
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad); GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
@ -1457,6 +1478,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
tsmux_resend_pmt (program); tsmux_resend_pmt (program);
} }
} }
g_mutex_unlock (&mux->lock);
GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad); GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
} }
@ -1875,8 +1897,10 @@ gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) { if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
handle_scte35_section (mux, event, section, 0, NULL); handle_scte35_section (mux, event, section, 0, NULL);
} else { } else {
g_mutex_lock (&mux->lock);
/* TODO: Check that the section type is supported */ /* TODO: Check that the section type is supported */
tsmux_add_mpegts_si_section (mux->tsmux, section); tsmux_add_mpegts_si_section (mux->tsmux, section);
g_mutex_unlock (&mux->lock);
} }
gst_event_unref (event); gst_event_unref (event);
@ -1906,18 +1930,25 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
GstFlowReturn ret; GstFlowReturn ret;
GList *cur; GList *cur;
if (ts_pad->stream == NULL) g_mutex_lock (&mux->lock);
if (ts_pad->stream == NULL) {
g_mutex_unlock (&mux->lock);
break; break;
}
forward = FALSE; forward = FALSE;
gst_event_parse_caps (event, &caps); gst_event_parse_caps (event, &caps);
if (!caps || !gst_caps_is_fixed (caps)) if (!caps || !gst_caps_is_fixed (caps)) {
g_mutex_unlock (&mux->lock);
break; break;
}
ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps); ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
if (ret != GST_FLOW_OK) if (ret != GST_FLOW_OK) {
g_mutex_unlock (&mux->lock);
break; break;
}
mux->tsmux->pat_changed = TRUE; mux->tsmux->pat_changed = TRUE;
mux->tsmux->si_changed = TRUE; mux->tsmux->si_changed = TRUE;
@ -1931,6 +1962,7 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
program->pmt_changed = TRUE; program->pmt_changed = TRUE;
tsmux_resend_pmt (program); tsmux_resend_pmt (program);
} }
g_mutex_unlock (&mux->lock);
res = TRUE; res = TRUE;
break; break;
@ -2336,7 +2368,11 @@ done:
static gboolean static gboolean
gst_base_ts_mux_start (GstAggregator * agg) gst_base_ts_mux_start (GstAggregator * agg)
{ {
gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE); GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (mux, TRUE);
g_mutex_unlock (&mux->lock);
return TRUE; return TRUE;
} }
@ -2344,7 +2380,11 @@ gst_base_ts_mux_start (GstAggregator * agg)
static gboolean static gboolean
gst_base_ts_mux_stop (GstAggregator * agg) gst_base_ts_mux_stop (GstAggregator * agg)
{ {
GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE); gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
g_mutex_unlock (&mux->lock);
return TRUE; return TRUE;
} }
@ -2356,6 +2396,7 @@ gst_base_ts_mux_dispose (GObject * object)
{ {
GstBaseTsMux *mux = GST_BASE_TS_MUX (object); GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (mux, FALSE); gst_base_ts_mux_reset (mux, FALSE);
if (mux->out_adapter) { if (mux->out_adapter) {
@ -2370,16 +2411,28 @@ gst_base_ts_mux_dispose (GObject * object)
g_hash_table_destroy (mux->programs); g_hash_table_destroy (mux->programs);
mux->programs = NULL; mux->programs = NULL;
} }
g_mutex_unlock (&mux->lock);
GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object)); GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
} }
static void
gst_base_ts_mux_finalize (GObject * object)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
g_mutex_clear (&mux->lock);
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
}
static void static void
gst_base_ts_mux_constructed (GObject * object) gst_base_ts_mux_constructed (GObject * object)
{ {
GstBaseTsMux *mux = GST_BASE_TS_MUX (object); GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
/* initial state */ /* initial state */
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (mux, TRUE); gst_base_ts_mux_reset (mux, TRUE);
g_mutex_unlock (&mux->lock);
} }
static void static void
@ -2404,8 +2457,10 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
} }
case PROP_PAT_INTERVAL: case PROP_PAT_INTERVAL:
mux->pat_interval = g_value_get_uint (value); mux->pat_interval = g_value_get_uint (value);
g_mutex_lock (&mux->lock);
if (mux->tsmux) if (mux->tsmux)
tsmux_set_pat_interval (mux->tsmux, mux->pat_interval); tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
g_mutex_unlock (&mux->lock);
break; break;
case PROP_PMT_INTERVAL: case PROP_PMT_INTERVAL:
mux->pmt_interval = g_value_get_uint (value); mux->pmt_interval = g_value_get_uint (value);
@ -2413,7 +2468,9 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data); GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
g_mutex_lock (&mux->lock);
tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval); tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
g_mutex_unlock (&mux->lock);
} }
GST_OBJECT_UNLOCK (mux); GST_OBJECT_UNLOCK (mux);
break; break;
@ -2422,17 +2479,23 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
break; break;
case PROP_SI_INTERVAL: case PROP_SI_INTERVAL:
mux->si_interval = g_value_get_uint (value); mux->si_interval = g_value_get_uint (value);
g_mutex_lock (&mux->lock);
tsmux_set_si_interval (mux->tsmux, mux->si_interval); tsmux_set_si_interval (mux->tsmux, mux->si_interval);
g_mutex_unlock (&mux->lock);
break; break;
case PROP_BITRATE: case PROP_BITRATE:
mux->bitrate = g_value_get_uint64 (value); mux->bitrate = g_value_get_uint64 (value);
g_mutex_lock (&mux->lock);
if (mux->tsmux) if (mux->tsmux)
tsmux_set_bitrate (mux->tsmux, mux->bitrate); tsmux_set_bitrate (mux->tsmux, mux->bitrate);
g_mutex_unlock (&mux->lock);
break; break;
case PROP_PCR_INTERVAL: case PROP_PCR_INTERVAL:
mux->pcr_interval = g_value_get_uint (value); mux->pcr_interval = g_value_get_uint (value);
g_mutex_lock (&mux->lock);
if (mux->tsmux) if (mux->tsmux)
tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval); tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
g_mutex_unlock (&mux->lock);
break; break;
case PROP_SCTE_35_PID: case PROP_SCTE_35_PID:
mux->scte35_pid = g_value_get_uint (value); mux->scte35_pid = g_value_get_uint (value);
@ -2556,6 +2619,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
gobject_class->get_property = gobject_class->get_property =
GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property); GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
gobject_class->dispose = gst_base_ts_mux_dispose; gobject_class->dispose = gst_base_ts_mux_dispose;
gobject_class->finalize = gst_base_ts_mux_finalize;
gobject_class->constructed = gst_base_ts_mux_constructed; gobject_class->constructed = gst_base_ts_mux_constructed;
gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad; gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
@ -2656,4 +2720,6 @@ gst_base_ts_mux_init (GstBaseTsMux * mux)
mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH; mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
mux->automatic_alignment = 0; mux->automatic_alignment = 0;
g_mutex_init (&mux->lock);
} }

View file

@ -183,6 +183,9 @@ struct GstBaseTsMux {
GstAdapter *out_adapter; GstAdapter *out_adapter;
GstBuffer *out_buffer; GstBuffer *out_buffer;
GstClockTimeDiff output_ts_offset; GstClockTimeDiff output_ts_offset;
/* protects the tsmux object, the programs hash table, and pad streams */
GMutex lock;
}; };
/** /**