tsmux: Lock mux->tsmux, the programs hash table, and pad streams

They contain implementations that are not thread-safe (e.g. GList, GHashTable).

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1616>
This commit is contained in:
Vivia Nikolaidou 2022-02-01 14:51:27 +02:00 committed by GStreamer Marge Bot
parent e119cdee3b
commit e0d5e022a1
2 changed files with 74 additions and 5 deletions

View file

@ -129,6 +129,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
/* Send initial segments again after a flush-stop, and also resend the
* header sections */
g_mutex_lock (&mux->lock);
mux->first = TRUE;
/* output PAT, SI tables */
@ -141,6 +142,7 @@ gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg)
tsmux_resend_pmt (program);
}
g_mutex_unlock (&mux->lock);
return GST_FLOW_OK;
}
@ -297,6 +299,7 @@ steal_si_section (GstMpegtsSectionType * type, TsMuxSection * section,
return TRUE;
}
/* Must be called with mux->lock held */
static void
gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc)
{
@ -372,6 +375,7 @@ release_buffer_cb (guint8 * data, void *user_data)
stream_data_free ((StreamData *) user_data);
}
/* Must be called with mux->lock held */
static GstFlowReturn
gst_base_ts_mux_create_or_update_stream (GstBaseTsMux * mux,
GstBaseTsMuxPad * ts_pad, GstCaps * caps)
@ -746,6 +750,7 @@ is_valid_pmt_pid (guint16 pmt_pid)
return TRUE;
}
/* Must be called with mux->lock held */
static GstFlowReturn
gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
{
@ -767,6 +772,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad)
return ret;
}
/* Must be called with mux->lock held */
static GstFlowReturn
gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
{
@ -876,6 +882,7 @@ no_stream:
}
}
/* Must be called with mux->lock held */
static gboolean
gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
gpointer user_data)
@ -887,6 +894,7 @@ gst_base_ts_mux_create_pad_stream_func (GstElement * element, GstPad * pad,
return *ret == GST_FLOW_OK;
}
/* Must be called with mux->lock held */
static GstFlowReturn
gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
{
@ -1189,11 +1197,13 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
return GST_FLOW_OK;
}
g_mutex_lock (&mux->lock);
if (G_UNLIKELY (mux->first)) {
ret = gst_base_ts_mux_create_streams (mux);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (buf)
gst_buffer_unref (buf);
g_mutex_unlock (&mux->lock);
return ret;
}
@ -1232,6 +1242,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
if (mux->force_key_unit_event != NULL && best->stream->is_video_stream) {
GstEvent *event;
g_mutex_unlock (&mux->lock);
event = check_pending_key_unit_event (mux->force_key_unit_event,
&agg_pad->segment, GST_BUFFER_PTS (buf),
GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts);
@ -1251,6 +1262,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
GST_TIME_ARGS (running_time), count);
gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event);
g_mutex_lock (&mux->lock);
/* output PAT, SI tables */
tsmux_resend_pat (mux->tsmux);
tsmux_resend_si (mux->tsmux);
@ -1261,6 +1273,8 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
tsmux_resend_pmt (program);
}
} else {
g_mutex_lock (&mux->lock);
}
}
@ -1315,6 +1329,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
GST_WARNING_OBJECT (mux, "KLV meta unit too big, splitting not supported");
gst_buffer_unref (buf);
g_mutex_unlock (&mux->lock);
return GST_FLOW_OK;
}
@ -1344,6 +1359,7 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
goto write_fail;
}
}
g_mutex_unlock (&mux->lock);
/* flush packet cache */
return gst_base_ts_mux_push_packets (mux, FALSE);
@ -1385,9 +1401,12 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
GstPad *pad = NULL;
gchar *free_name = NULL;
g_mutex_lock (&mux->lock);
if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) {
if (tsmux_find_stream (mux->tsmux, pid))
if (tsmux_find_stream (mux->tsmux, pid)) {
g_mutex_unlock (&mux->lock);
goto stream_exists;
}
/* Make sure we don't use reserved PID.
* FIXME : This should be extended to other variants (ex: ATSC) reserved PID */
if (pid < TSMUX_START_ES_PID)
@ -1400,6 +1419,7 @@ gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ,
/* Name the pad correctly after the selected pid */
name = free_name = g_strdup_printf ("sink_%d", pid);
}
g_mutex_unlock (&mux->lock);
pad = (GstPad *)
GST_ELEMENT_CLASS (parent_class)->request_new_pad (element,
@ -1433,6 +1453,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
g_mutex_lock (&mux->lock);
if (mux->tsmux) {
GList *cur;
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
@ -1457,6 +1478,7 @@ gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
tsmux_resend_pmt (program);
}
}
g_mutex_unlock (&mux->lock);
GST_ELEMENT_CLASS (parent_class)->release_pad (element, pad);
}
@ -1875,8 +1897,10 @@ gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
handle_scte35_section (mux, event, section, 0, NULL);
} else {
g_mutex_lock (&mux->lock);
/* TODO: Check that the section type is supported */
tsmux_add_mpegts_si_section (mux->tsmux, section);
g_mutex_unlock (&mux->lock);
}
gst_event_unref (event);
@ -1906,18 +1930,25 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
GstFlowReturn ret;
GList *cur;
if (ts_pad->stream == NULL)
g_mutex_lock (&mux->lock);
if (ts_pad->stream == NULL) {
g_mutex_unlock (&mux->lock);
break;
}
forward = FALSE;
gst_event_parse_caps (event, &caps);
if (!caps || !gst_caps_is_fixed (caps))
if (!caps || !gst_caps_is_fixed (caps)) {
g_mutex_unlock (&mux->lock);
break;
}
ret = gst_base_ts_mux_create_or_update_stream (mux, ts_pad, caps);
if (ret != GST_FLOW_OK)
if (ret != GST_FLOW_OK) {
g_mutex_unlock (&mux->lock);
break;
}
mux->tsmux->pat_changed = TRUE;
mux->tsmux->si_changed = TRUE;
@ -1931,6 +1962,7 @@ gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad,
program->pmt_changed = TRUE;
tsmux_resend_pmt (program);
}
g_mutex_unlock (&mux->lock);
res = TRUE;
break;
@ -2336,7 +2368,11 @@ done:
static gboolean
gst_base_ts_mux_start (GstAggregator * agg)
{
gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (mux, TRUE);
g_mutex_unlock (&mux->lock);
return TRUE;
}
@ -2344,7 +2380,11 @@ gst_base_ts_mux_start (GstAggregator * agg)
static gboolean
gst_base_ts_mux_stop (GstAggregator * agg)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (agg);
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE);
g_mutex_unlock (&mux->lock);
return TRUE;
}
@ -2356,6 +2396,7 @@ gst_base_ts_mux_dispose (GObject * object)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (mux, FALSE);
if (mux->out_adapter) {
@ -2370,16 +2411,28 @@ gst_base_ts_mux_dispose (GObject * object)
g_hash_table_destroy (mux->programs);
mux->programs = NULL;
}
g_mutex_unlock (&mux->lock);
GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
}
static void
gst_base_ts_mux_finalize (GObject * object)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
g_mutex_clear (&mux->lock);
GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
}
static void
gst_base_ts_mux_constructed (GObject * object)
{
GstBaseTsMux *mux = GST_BASE_TS_MUX (object);
/* initial state */
g_mutex_lock (&mux->lock);
gst_base_ts_mux_reset (mux, TRUE);
g_mutex_unlock (&mux->lock);
}
static void
@ -2404,8 +2457,10 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
}
case PROP_PAT_INTERVAL:
mux->pat_interval = g_value_get_uint (value);
g_mutex_lock (&mux->lock);
if (mux->tsmux)
tsmux_set_pat_interval (mux->tsmux, mux->pat_interval);
g_mutex_unlock (&mux->lock);
break;
case PROP_PMT_INTERVAL:
mux->pmt_interval = g_value_get_uint (value);
@ -2413,7 +2468,9 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) {
GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (l->data);
g_mutex_lock (&mux->lock);
tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
g_mutex_unlock (&mux->lock);
}
GST_OBJECT_UNLOCK (mux);
break;
@ -2422,17 +2479,23 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id,
break;
case PROP_SI_INTERVAL:
mux->si_interval = g_value_get_uint (value);
g_mutex_lock (&mux->lock);
tsmux_set_si_interval (mux->tsmux, mux->si_interval);
g_mutex_unlock (&mux->lock);
break;
case PROP_BITRATE:
mux->bitrate = g_value_get_uint64 (value);
g_mutex_lock (&mux->lock);
if (mux->tsmux)
tsmux_set_bitrate (mux->tsmux, mux->bitrate);
g_mutex_unlock (&mux->lock);
break;
case PROP_PCR_INTERVAL:
mux->pcr_interval = g_value_get_uint (value);
g_mutex_lock (&mux->lock);
if (mux->tsmux)
tsmux_set_pcr_interval (mux->tsmux, mux->pcr_interval);
g_mutex_unlock (&mux->lock);
break;
case PROP_SCTE_35_PID:
mux->scte35_pid = g_value_get_uint (value);
@ -2556,6 +2619,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
gobject_class->get_property =
GST_DEBUG_FUNCPTR (gst_base_ts_mux_get_property);
gobject_class->dispose = gst_base_ts_mux_dispose;
gobject_class->finalize = gst_base_ts_mux_finalize;
gobject_class->constructed = gst_base_ts_mux_constructed;
gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
@ -2656,4 +2720,6 @@ gst_base_ts_mux_init (GstBaseTsMux * mux)
mux->packet_size = GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH;
mux->automatic_alignment = 0;
g_mutex_init (&mux->lock);
}

View file

@ -183,6 +183,9 @@ struct GstBaseTsMux {
GstAdapter *out_adapter;
GstBuffer *out_buffer;
GstClockTimeDiff output_ts_offset;
/* protects the tsmux object, the programs hash table, and pad streams */
GMutex lock;
};
/**