diff --git a/gst/mpegtsmux/gstatscmux.c b/gst/mpegtsmux/gstatscmux.c index 34b55d284f..732d8a6916 100644 --- a/gst/mpegtsmux/gstatscmux.c +++ b/gst/mpegtsmux/gstatscmux.c @@ -34,7 +34,7 @@ GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS ("video/mpegts, " - "systemstream = (boolean) true, " "packetsize = (int) 192 ") + "systemstream = (boolean) true, " "packetsize = (int) 188 ") ); static GstStaticPadTemplate gst_atsc_mux_sink_factory = @@ -155,7 +155,7 @@ gst_atsc_mux_create_ts_mux (GstBaseTsMux * mpegtsmux) static guint gst_atsc_mux_handle_media_type (GstBaseTsMux * mux, const gchar * media_type, - GstBaseTsPadData * ts_data) + GstBaseTsMuxPad * pad) { guint ret = TSMUX_ST_RESERVED; @@ -182,11 +182,11 @@ gst_atsc_mux_class_init (GstATSCMuxClass * klass) mpegtsmux_class->create_ts_mux = gst_atsc_mux_create_ts_mux; mpegtsmux_class->handle_media_type = gst_atsc_mux_handle_media_type; - gst_element_class_add_static_pad_template (gstelement_class, - &gst_atsc_mux_sink_factory); + gst_element_class_add_static_pad_template_with_gtype (gstelement_class, + &gst_atsc_mux_sink_factory, GST_TYPE_BASE_TS_MUX_PAD); - gst_element_class_add_static_pad_template (gstelement_class, - &gst_atsc_mux_src_factory); + gst_element_class_add_static_pad_template_with_gtype (gstelement_class, + &gst_atsc_mux_src_factory, GST_TYPE_AGGREGATOR_PAD); } static void diff --git a/gst/mpegtsmux/gstbasetsmux.c b/gst/mpegtsmux/gstbasetsmux.c index ecec427fc7..f96959f99d 100644 --- a/gst/mpegtsmux/gstbasetsmux.c +++ b/gst/mpegtsmux/gstbasetsmux.c @@ -101,7 +101,73 @@ GST_DEBUG_CATEGORY (gst_base_ts_mux_debug); #define GST_CAT_DEFAULT gst_base_ts_mux_debug -#define COLLECT_DATA_PAD(collect_data) (((GstCollectData *)(collect_data))->pad) +/* GstBaseTsMuxPad */ + +G_DEFINE_TYPE (GstBaseTsMuxPad, gst_base_ts_mux_pad, GST_TYPE_AGGREGATOR_PAD); + +/* Internals */ + +static void +gst_base_ts_mux_pad_reset (GstBaseTsMuxPad * pad) +{ + pad->dts = GST_CLOCK_STIME_NONE; + pad->prog_id = -1; + + if (pad->free_func) + pad->free_func (pad->prepare_data); + pad->prepare_data = NULL; + pad->prepare_func = NULL; + pad->free_func = NULL; + + if (pad->codec_data) + gst_buffer_replace (&pad->codec_data, NULL); + + /* reference owned elsewhere */ + pad->stream = NULL; + pad->prog = NULL; + + if (pad->language) { + g_free (pad->language); + pad->language = NULL; + } +} + +/* GstAggregatorPad implementation */ + +static GstFlowReturn +gst_base_ts_mux_pad_flush (GstAggregatorPad * agg_pad, GstAggregator * agg) +{ + gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (agg_pad)); + + return GST_FLOW_OK; +} + +/* GObject implementation */ + +static void +gst_base_ts_mux_pad_dispose (GObject * obj) +{ + GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (obj); + + gst_base_ts_mux_pad_reset (ts_pad); +} + +static void +gst_base_ts_mux_pad_class_init (GstBaseTsMuxPadClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstAggregatorPadClass *gstaggpad_class = GST_AGGREGATOR_PAD_CLASS (klass); + + gobject_class->dispose = gst_base_ts_mux_pad_dispose; + gstaggpad_class->flush = gst_base_ts_mux_pad_flush; +} + +static void +gst_base_ts_mux_pad_init (GstBaseTsMuxPad * vaggpad) +{ +} + +/* GstBaseTsMux */ enum { @@ -145,7 +211,7 @@ typedef struct GstBuffer *buffer; } StreamData; -G_DEFINE_TYPE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_ELEMENT); +G_DEFINE_TYPE (GstBaseTsMux, gst_base_ts_mux, GST_TYPE_AGGREGATOR); /* Internals */ @@ -172,32 +238,6 @@ stream_data_free (StreamData * data) #define parent_class gst_base_ts_mux_parent_class -static void -gst_base_ts_mux_pad_reset (GstBaseTsPadData * pad_data) -{ - pad_data->dts = GST_CLOCK_STIME_NONE; - pad_data->prog_id = -1; - - if (pad_data->free_func) - pad_data->free_func (pad_data->prepare_data); - pad_data->prepare_data = NULL; - pad_data->prepare_func = NULL; - pad_data->free_func = NULL; - - if (pad_data->codec_data) - gst_buffer_replace (&pad_data->codec_data, NULL); - - /* reference owned elsewhere */ - pad_data->stream = NULL; - pad_data->prog = NULL; - - if (pad_data->language) { - g_free (pad_data->language); - pad_data->language = NULL; - } - -} - static void gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux) { @@ -207,7 +247,9 @@ gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux) GValue value = { 0 }; GstCaps *caps; - caps = gst_caps_make_writable (gst_pad_get_current_caps (mux->srcpad)); + caps = + gst_caps_make_writable (gst_pad_get_current_caps (GST_AGGREGATOR_SRC_PAD + (mux))); structure = gst_caps_get_structure (caps, 0); g_value_init (&array, GST_TYPE_ARRAY); @@ -223,44 +265,15 @@ gst_base_ts_mux_set_header_on_caps (GstBaseTsMux * mux) } gst_structure_set_value (structure, "streamheader", &array); - gst_pad_set_caps (mux->srcpad, caps); + gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps); g_value_unset (&array); gst_caps_unref (caps); } -static void -gst_base_ts_mux_prepare_srcpad (GstBaseTsMux * mux) -{ - GstSegment seg; - /* we are not going to seek */ - GstEvent *new_seg; - gchar s_id[32]; - GstCaps *caps = gst_caps_new_simple ("video/mpegts", - "systemstream", G_TYPE_BOOLEAN, TRUE, - "packetsize", G_TYPE_INT, mux->packet_size, - NULL); - - /* stream-start (FIXME: create id based on input ids) */ - g_snprintf (s_id, sizeof (s_id), "basetsmux-%08x", g_random_int ()); - gst_pad_push_event (mux->srcpad, gst_event_new_stream_start (s_id)); - - gst_segment_init (&seg, GST_FORMAT_TIME); - new_seg = gst_event_new_segment (&seg); - - /* Set caps on src pad from our template and push new segment */ - gst_pad_set_caps (mux->srcpad, caps); - gst_caps_unref (caps); - - if (!gst_pad_push_event (mux->srcpad, new_seg)) { - GST_WARNING_OBJECT (mux, "New segment event was not handled downstream"); - } -} - static void gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc) { GstBuffer *buf; - GSList *walk; GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux); mux->first = TRUE; @@ -292,13 +305,6 @@ gst_base_ts_mux_reset (GstBaseTsMux * mux, gboolean alloc) gst_event_replace (&mux->force_key_unit_event, NULL); gst_buffer_replace (&mux->out_buffer, NULL); - if (mux->collect) { - GST_COLLECT_PADS_STREAM_LOCK (mux->collect); - for (walk = mux->collect->data; walk != NULL; walk = g_slist_next (walk)) - gst_base_ts_mux_pad_reset ((GstBaseTsPadData *) walk->data); - GST_COLLECT_PADS_STREAM_UNLOCK (mux->collect); - } - if (alloc) { g_assert (klass->create_ts_mux); @@ -316,7 +322,7 @@ release_buffer_cb (guint8 * data, void *user_data) } static GstFlowReturn -gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data) +gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsMuxPad * ts_pad) { GstFlowReturn ret = GST_FLOW_ERROR; GstCaps *caps; @@ -333,13 +339,13 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data) guint8 color_spec = 0; j2k_private_data *private_data = NULL; - pad = ts_data->collect.pad; + pad = GST_PAD (ts_pad); caps = gst_pad_get_current_caps (pad); if (caps == NULL) goto not_negotiated; GST_DEBUG_OBJECT (pad, "Creating stream with PID 0x%04x for caps %" - GST_PTR_FORMAT, ts_data->pid, caps); + GST_PTR_FORMAT, ts_pad->pid, caps); s = gst_caps_get_structure (caps, 0); @@ -382,10 +388,10 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data) GST_DEBUG_OBJECT (pad, "we have additional codec data (%" G_GSIZE_FORMAT " bytes)", gst_buffer_get_size (codec_data)); - ts_data->codec_data = gst_buffer_ref (codec_data); - ts_data->prepare_func = gst_base_ts_mux_prepare_aac; + ts_pad->codec_data = gst_buffer_ref (codec_data); + ts_pad->prepare_func = gst_base_ts_mux_prepare_aac; } else { - ts_data->codec_data = NULL; + ts_pad->codec_data = NULL; } break; } @@ -420,7 +426,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data) } else if (strcmp (mt, "application/x-teletext") == 0) { st = TSMUX_ST_PS_TELETEXT; /* needs a particularly sized layout */ - ts_data->prepare_func = gst_base_ts_mux_prepare_teletext; + ts_pad->prepare_func = gst_base_ts_mux_prepare_teletext; } else if (strcmp (mt, "audio/x-opus") == 0) { guint8 channels, mapping_family, stream_count, coupled_count; guint8 channel_mapping[256]; @@ -480,7 +486,7 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data) } st = TSMUX_ST_PS_OPUS; - ts_data->prepare_func = gst_base_ts_mux_prepare_opus; + ts_pad->prepare_func = gst_base_ts_mux_prepare_opus; } else if (strcmp (mt, "meta/x-klv") == 0) { st = TSMUX_ST_PS_KLV; } else if (strcmp (mt, "image/x-jpc") == 0) { @@ -568,53 +574,53 @@ gst_base_ts_mux_create_stream (GstBaseTsMux * mux, GstBaseTsPadData * ts_data) goto not_negotiated; } st = TSMUX_ST_VIDEO_JP2K; - ts_data->prepare_func = gst_base_ts_mux_prepare_jpeg2000; - ts_data->prepare_data = private_data; - ts_data->free_func = gst_base_ts_mux_free_jpeg2000; + ts_pad->prepare_func = gst_base_ts_mux_prepare_jpeg2000; + ts_pad->prepare_data = private_data; + ts_pad->free_func = gst_base_ts_mux_free_jpeg2000; } else { GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux); if (klass->handle_media_type) { - st = klass->handle_media_type (mux, mt, ts_data); + st = klass->handle_media_type (mux, mt, ts_pad); } } if (st != TSMUX_ST_RESERVED) { - ts_data->stream = tsmux_create_stream (mux->tsmux, st, ts_data->pid, - ts_data->language); + ts_pad->stream = tsmux_create_stream (mux->tsmux, st, ts_pad->pid, + ts_pad->language); } else { GST_DEBUG_OBJECT (pad, "Failed to determine stream type"); } - if (ts_data->stream != NULL) { + if (ts_pad->stream != NULL) { const char *interlace_mode = gst_structure_get_string (s, "interlace-mode"); - gst_structure_get_int (s, "rate", &ts_data->stream->audio_sampling); - gst_structure_get_int (s, "channels", &ts_data->stream->audio_channels); - gst_structure_get_int (s, "bitrate", &ts_data->stream->audio_bitrate); + gst_structure_get_int (s, "rate", &ts_pad->stream->audio_sampling); + gst_structure_get_int (s, "channels", &ts_pad->stream->audio_channels); + gst_structure_get_int (s, "bitrate", &ts_pad->stream->audio_bitrate); /* frame rate */ - gst_structure_get_fraction (s, "framerate", &ts_data->stream->num, - &ts_data->stream->den); + gst_structure_get_fraction (s, "framerate", &ts_pad->stream->num, + &ts_pad->stream->den); /* Interlace mode */ - ts_data->stream->interlace_mode = FALSE; + ts_pad->stream->interlace_mode = FALSE; if (interlace_mode) { - ts_data->stream->interlace_mode = + ts_pad->stream->interlace_mode = g_str_equal (interlace_mode, "interleaved"); } /* Width and Height */ - gst_structure_get_int (s, "width", &ts_data->stream->horizontal_size); - gst_structure_get_int (s, "height", &ts_data->stream->vertical_size); + gst_structure_get_int (s, "width", &ts_pad->stream->horizontal_size); + gst_structure_get_int (s, "height", &ts_pad->stream->vertical_size); - ts_data->stream->color_spec = color_spec; - ts_data->stream->max_bitrate = max_rate; - ts_data->stream->profile_and_level = profile | main_level; + ts_pad->stream->color_spec = color_spec; + ts_pad->stream->max_bitrate = max_rate; + ts_pad->stream->profile_and_level = profile | main_level; - ts_data->stream->opus_channel_config_code = opus_channel_config_code; + ts_pad->stream->opus_channel_config_code = opus_channel_config_code; - tsmux_stream_set_buffer_release_func (ts_data->stream, release_buffer_cb); - tsmux_program_add_stream (ts_data->prog, ts_data->stream); + tsmux_stream_set_buffer_release_func (ts_pad->stream, release_buffer_cb); + tsmux_program_add_stream (ts_pad->prog, ts_pad->stream); ret = GST_FLOW_OK; } @@ -635,19 +641,19 @@ static GstFlowReturn gst_base_ts_mux_create_streams (GstBaseTsMux * mux) { GstFlowReturn ret = GST_FLOW_OK; - GSList *walk = mux->collect->data; + GList *walk = GST_ELEMENT (mux)->sinkpads; /* Create the streams */ while (walk) { - GstCollectData *c_data = (GstCollectData *) walk->data; - GstBaseTsPadData *ts_data = (GstBaseTsPadData *) walk->data; + GstPad *pad = GST_PAD (walk->data); + GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data); gchar *name = NULL; gchar *pcr_name; - walk = g_slist_next (walk); + walk = g_list_next (walk); - if (ts_data->prog_id == -1) { - name = GST_PAD_NAME (c_data->pad); + if (ts_pad->prog_id == -1) { + name = GST_PAD_NAME (pad); if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) { gint idx; @@ -663,47 +669,47 @@ gst_base_ts_mux_create_streams (GstBaseTsMux * mux) idx, name, DEFAULT_PROG_ID); idx = DEFAULT_PROG_ID; } - ts_data->prog_id = idx; + ts_pad->prog_id = idx; } else { - ts_data->prog_id = DEFAULT_PROG_ID; + ts_pad->prog_id = DEFAULT_PROG_ID; } } - ts_data->prog = + ts_pad->prog = (TsMuxProgram *) g_hash_table_lookup (mux->programs, - GINT_TO_POINTER (ts_data->prog_id)); - if (ts_data->prog == NULL) { - ts_data->prog = tsmux_program_new (mux->tsmux, ts_data->prog_id); - if (ts_data->prog == NULL) + GINT_TO_POINTER (ts_pad->prog_id)); + if (ts_pad->prog == NULL) { + ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id); + if (ts_pad->prog == NULL) goto no_program; - tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval); + tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval); g_hash_table_insert (mux->programs, - GINT_TO_POINTER (ts_data->prog_id), ts_data->prog); + GINT_TO_POINTER (ts_pad->prog_id), ts_pad->prog); /* Take the first stream of the program for the PCR */ - GST_DEBUG_OBJECT (COLLECT_DATA_PAD (ts_data), + GST_DEBUG_OBJECT (ts_pad, "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)", - ts_data->pid, ts_data->prog_id); + ts_pad->pid, ts_pad->prog_id); - tsmux_program_set_pcr_stream (ts_data->prog, ts_data->stream); + tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream); } - if (ts_data->stream == NULL) { - ret = gst_base_ts_mux_create_stream (mux, ts_data); + if (ts_pad->stream == NULL) { + ret = gst_base_ts_mux_create_stream (mux, ts_pad); if (ret != GST_FLOW_OK) goto no_stream; } /* Check for user-specified PCR PID */ - pcr_name = g_strdup_printf ("PCR_%d", ts_data->prog->pgm_number); + pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number); if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) { const gchar *sink_name = gst_structure_get_string (mux->prog_map, pcr_name); if (!g_strcmp0 (name, sink_name)) { GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for " - "program (prog_id = %d)", ts_data->pid, ts_data->prog->pgm_number); - tsmux_program_set_pcr_stream (ts_data->prog, ts_data->stream); + "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number); + tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream); } } g_free (pcr_name); @@ -770,6 +776,29 @@ new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data, } } +static GstFlowReturn +finish_buffer_list (GstBaseTsMux * mux, GstBufferList * list) +{ + guint i; + guint l = gst_buffer_list_length (list); + GstFlowReturn ret = GST_FLOW_OK; + + for (i = 0; i < l; i++) { + GstBuffer *buf = gst_buffer_list_get (list, i); + + ret = + gst_aggregator_finish_buffer (GST_AGGREGATOR (mux), + gst_buffer_ref (buf)); + + if (ret != GST_FLOW_OK) + break; + } + + gst_buffer_list_unref (list); + + return ret; +} + static GstFlowReturn gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force) { @@ -791,7 +820,7 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force) /* no alignment, just push all available data */ if (align == 0) { buffer_list = gst_adapter_take_buffer_list (mux->out_adapter, av); - return gst_pad_push_list (mux->srcpad, buffer_list); + return finish_buffer_list (mux, buffer_list); } align *= packet_size; @@ -867,7 +896,7 @@ gst_base_ts_mux_push_packets (GstBaseTsMux * mux, gboolean force) gst_buffer_list_add (buffer_list, buf); } - return gst_pad_push_list (mux->srcpad, buffer_list); + return finish_buffer_list (mux, buffer_list); } static GstFlowReturn @@ -966,421 +995,12 @@ alloc_packet_cb (GstBuffer ** buf, void *user_data) klass->allocate_packet (mux, buf); } -/* GstElement implementation */ - -static gboolean -gst_base_ts_mux_sink_event (GstCollectPads * pads, GstCollectData * data, - GstEvent * event, gpointer user_data) -{ - GstBaseTsMux *mux = GST_BASE_TS_MUX (user_data); - gboolean res = FALSE; - gboolean forward = TRUE; - GstBaseTsPadData *pad_data = (GstBaseTsPadData *) data; - -#ifndef GST_DISABLE_GST_DEBUG - GstPad *pad; - - pad = data->pad; -#endif - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_CUSTOM_DOWNSTREAM: - { - GstClockTime timestamp, stream_time, running_time; - gboolean all_headers; - guint count; - - if (!gst_video_event_is_force_key_unit (event)) - goto out; - - res = TRUE; - forward = FALSE; - - gst_video_event_parse_downstream_force_key_unit (event, - ×tamp, &stream_time, &running_time, &all_headers, &count); - GST_INFO_OBJECT (pad, "have downstream force-key-unit event, " - "seqnum %d, running-time %" GST_TIME_FORMAT " count %d", - gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count); - - if (mux->force_key_unit_event != NULL) { - GST_INFO_OBJECT (mux, "skipping downstream force key unit event " - "as an upstream force key unit is already queued"); - goto out; - } - - if (!all_headers) - goto out; - - mux->pending_key_unit_ts = running_time; - gst_event_replace (&mux->force_key_unit_event, event); - break; - } - case GST_EVENT_TAG:{ - GstTagList *list; - gchar *lang = NULL; - - GST_DEBUG_OBJECT (mux, "received tag event"); - gst_event_parse_tag (event, &list); - - /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */ - if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) { - const gchar *lang_code; - - lang_code = gst_tag_get_language_code_iso_639_2B (lang); - if (lang_code) { - GST_DEBUG_OBJECT (pad, "Setting language to '%s'", lang_code); - - g_free (pad_data->language); - pad_data->language = g_strdup (lang_code); - } else { - GST_WARNING_OBJECT (pad, "Did not get language code for '%s'", lang); - } - g_free (lang); - } - - /* handled this, don't want collectpads to forward it downstream */ - res = TRUE; - forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL; - break; - } - case GST_EVENT_STREAM_START:{ - GstStreamFlags flags; - - gst_event_parse_stream_flags (event, &flags); - - /* Don't wait for data on sparse inputs like metadata streams */ - if ((flags & GST_STREAM_FLAG_SPARSE)) { - GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED); - gst_collect_pads_set_waiting (pads, data, FALSE); - GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED); - } - break; - } - case GST_EVENT_FLUSH_STOP:{ - GList *cur; - - /* Send initial segments again after a flush-stop, and also resend the - * header sections */ - mux->first = TRUE; - - /* output PAT, SI tables */ - tsmux_resend_pat (mux->tsmux); - tsmux_resend_si (mux->tsmux); - - /* output PMT for each program */ - for (cur = mux->tsmux->programs; cur; cur = cur->next) { - TsMuxProgram *program = (TsMuxProgram *) cur->data; - - tsmux_resend_pmt (program); - } - break; - } - default: - break; - } - -out: - if (!forward) - gst_event_unref (event); - else - res = gst_collect_pads_event_default (pads, data, event, FALSE); - - return res; -} - -static gboolean -gst_base_ts_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) -{ - GstBaseTsMux *mux = GST_BASE_TS_MUX (parent); - gboolean res = TRUE, forward = TRUE; - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_CUSTOM_UPSTREAM: - { - GstIterator *iter; - GstIteratorResult iter_ret; - GstPad *sinkpad; - GValue sinkpad_value = G_VALUE_INIT; - GstClockTime running_time; - gboolean all_headers, done, res = FALSE; - guint count; - - if (!gst_video_event_is_force_key_unit (event)) - break; - - forward = FALSE; - - gst_video_event_parse_upstream_force_key_unit (event, - &running_time, &all_headers, &count); - - GST_INFO_OBJECT (mux, "received upstream force-key-unit event, " - "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d", - gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), - all_headers, count); - - if (!all_headers) - break; - - mux->pending_key_unit_ts = running_time; - gst_event_replace (&mux->force_key_unit_event, event); - - iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux)); - done = FALSE; - while (!done) { - gboolean tmp; - - iter_ret = gst_iterator_next (iter, &sinkpad_value); - sinkpad = GST_PAD (g_value_get_object (&sinkpad_value)); - - switch (iter_ret) { - case GST_ITERATOR_DONE: - done = TRUE; - break; - case GST_ITERATOR_OK: - GST_INFO_OBJECT (pad, "forwarding"); - tmp = gst_pad_push_event (sinkpad, gst_event_ref (event)); - GST_INFO_OBJECT (mux, "result %d", tmp); - /* succeed if at least one pad succeeds */ - res |= tmp; - break; - case GST_ITERATOR_ERROR: - done = TRUE; - break; - case GST_ITERATOR_RESYNC: - break; - } - g_value_reset (&sinkpad_value); - } - g_value_unset (&sinkpad_value); - gst_iterator_free (iter); - break; - } - default: - break; - } - - if (forward) - res = gst_pad_event_default (pad, parent, event); - else - gst_event_unref (event); - - return res; -} - -static GstPad * -gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ, - const gchar * name, const GstCaps * caps) -{ - GstBaseTsMux *mux = GST_BASE_TS_MUX (element); - gint pid = -1; - gchar *pad_name = NULL; - GstPad *pad = NULL; - GstBaseTsPadData *pad_data = NULL; - - if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) { - if (tsmux_find_stream (mux->tsmux, pid)) - goto stream_exists; - } else { - pid = tsmux_get_new_pid (mux->tsmux); - } - - pad_name = g_strdup_printf ("sink_%d", pid); - pad = gst_pad_new_from_template (templ, pad_name); - g_free (pad_name); - - pad_data = (GstBaseTsPadData *) - gst_collect_pads_add_pad (mux->collect, pad, sizeof (GstBaseTsPadData), - (GstCollectDataDestroyNotify) (gst_base_ts_mux_pad_reset), TRUE); - if (pad_data == NULL) - goto pad_failure; - - gst_base_ts_mux_pad_reset (pad_data); - pad_data->pid = pid; - - if (G_UNLIKELY (!gst_element_add_pad (element, pad))) - goto could_not_add; - - return pad; - - /* ERRORS */ -stream_exists: - { - GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"), - (NULL)); - return NULL; - } -could_not_add: - { - GST_ELEMENT_ERROR (element, STREAM, FAILED, - ("Internal data stream error."), ("Could not add pad to element")); - gst_collect_pads_remove_pad (mux->collect, pad); - gst_object_unref (pad); - return NULL; - } -pad_failure: - { - GST_ELEMENT_ERROR (element, STREAM, FAILED, - ("Internal data stream error."), ("Could not add pad to collectpads")); - gst_object_unref (pad); - return NULL; - } -} - -static void -gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad) -{ - GstBaseTsMux *mux = GST_BASE_TS_MUX (element); - - GST_DEBUG_OBJECT (mux, "Pad %" GST_PTR_FORMAT " being released", pad); - - if (mux->collect) { - gst_collect_pads_remove_pad (mux->collect, pad); - } - - /* chain up */ - gst_element_remove_pad (element, pad); -} - -static GstStateChangeReturn -gst_base_ts_mux_change_state (GstElement * element, GstStateChange transition) -{ - GstBaseTsMux *mux = GST_BASE_TS_MUX (element); - GstStateChangeReturn ret; - - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - break; - case GST_STATE_CHANGE_READY_TO_PAUSED: - gst_collect_pads_start (mux->collect); - break; - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_collect_pads_stop (mux->collect); - break; - case GST_STATE_CHANGE_READY_TO_NULL: - break; - default: - break; - } - - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_base_ts_mux_reset (mux, TRUE); - break; - case GST_STATE_CHANGE_READY_TO_NULL: - break; - default: - break; - } - - return ret; -} - -static gboolean -gst_base_ts_mux_send_event (GstElement * element, GstEvent * event) -{ - GstMpegtsSection *section; - GstBaseTsMux *mux = GST_BASE_TS_MUX (element); - - section = gst_event_parse_mpegts_section (event); - gst_event_unref (event); - - if (section) { - GST_DEBUG ("Received event with mpegts section"); - - /* TODO: Check that the section type is supported */ - tsmux_add_mpegts_si_section (mux->tsmux, section); - - return TRUE; - } - - return FALSE; -} - -/* CollectPads implementation */ - static GstFlowReturn -gst_base_ts_mux_clip_inc_running_time (GstCollectPads * pads, - GstCollectData * cdata, GstBuffer * buf, GstBuffer ** outbuf, - gpointer user_data) -{ - GstBaseTsPadData *pad_data = (GstBaseTsPadData *) cdata; - GstClockTime time; - - *outbuf = buf; - - /* PTS */ - time = GST_BUFFER_PTS (buf); - - /* invalid left alone and passed */ - if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) { - time = gst_segment_to_running_time (&cdata->segment, GST_FORMAT_TIME, time); - if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) { - GST_DEBUG_OBJECT (cdata->pad, "clipping buffer on pad outside segment"); - gst_buffer_unref (buf); - *outbuf = NULL; - goto beach; - } else { - GST_LOG_OBJECT (cdata->pad, "buffer pts %" GST_TIME_FORMAT " -> %" - GST_TIME_FORMAT " running time", - GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time)); - buf = *outbuf = gst_buffer_make_writable (buf); - GST_BUFFER_PTS (*outbuf) = time; - } - } - - /* DTS */ - time = GST_BUFFER_DTS (buf); - - /* invalid left alone and passed */ - if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) { - gint sign; - gint64 dts; - - sign = gst_segment_to_running_time_full (&cdata->segment, GST_FORMAT_TIME, - time, &time); - - if (sign > 0) - dts = (gint64) time; - else - dts = -((gint64) time); - - GST_LOG_OBJECT (cdata->pad, "buffer dts %" GST_TIME_FORMAT " -> %" - GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)), - GST_STIME_ARGS (dts)); - - if (GST_CLOCK_STIME_IS_VALID (pad_data->dts) && dts < pad_data->dts) { - /* Ignore DTS going backward */ - GST_WARNING_OBJECT (cdata->pad, "ignoring DTS going backward"); - dts = pad_data->dts; - } - - *outbuf = gst_buffer_make_writable (buf); - if (sign > 0) - GST_BUFFER_DTS (*outbuf) = time; - else - GST_BUFFER_DTS (*outbuf) = GST_CLOCK_TIME_NONE; - - pad_data->dts = dts; - } else { - pad_data->dts = GST_CLOCK_STIME_NONE; - } - -beach: - return GST_FLOW_OK; -} - -static GstFlowReturn -gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data, - GstBuffer * buf, GstBaseTsMux * mux) +gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux, + GstAggregatorPad * agg_pad, GstBuffer * buf) { GstFlowReturn ret = GST_FLOW_OK; - GstBaseTsPadData *best = (GstBaseTsPadData *) data; + GstBaseTsMuxPad *best = GST_BASE_TS_MUX_PAD (agg_pad); TsMuxProgram *prog; gint64 pts = GST_CLOCK_STIME_NONE; gint64 dts = GST_CLOCK_STIME_NONE; @@ -1397,27 +1017,9 @@ gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data, return ret; } - gst_base_ts_mux_prepare_srcpad (mux); - mux->first = FALSE; } - if (G_UNLIKELY (best == NULL)) { - GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux); - /* EOS */ - GST_INFO_OBJECT (mux, "EOS"); - /* drain some possibly cached data */ - if (klass->drain) - klass->drain (mux); - gst_base_ts_mux_push_packets (mux, TRUE); - gst_pad_push_event (mux->srcpad, gst_event_new_eos ()); - - if (buf) - gst_buffer_unref (buf); - - return GST_FLOW_OK; - } - prog = best->prog; if (prog == NULL) goto no_program; @@ -1437,7 +1039,7 @@ gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data, GstEvent *event; event = check_pending_key_unit_event (mux->force_key_unit_event, - &best->collect.segment, GST_BUFFER_PTS (buf), + &agg_pad->segment, GST_BUFFER_PTS (buf), GST_BUFFER_FLAGS (buf), mux->pending_key_unit_ts); if (event) { GstClockTime running_time; @@ -1453,7 +1055,7 @@ gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data, GST_INFO_OBJECT (mux, "pushing downstream force-key-unit event %d " "%" GST_TIME_FORMAT " count %d", gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count); - gst_pad_push_event (mux->srcpad, event); + gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), event); /* output PAT, SI tables */ tsmux_resend_pat (mux->tsmux); @@ -1470,7 +1072,7 @@ gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data, if (G_UNLIKELY (prog->pcr_stream == NULL)) { /* Take the first data stream for the PCR */ - GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best), + GST_DEBUG_OBJECT (best, "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)", best->pid, best->prog_id); @@ -1478,8 +1080,7 @@ gst_base_ts_mux_collected_buffer (GstCollectPads * pads, GstCollectData * data, tsmux_program_set_pcr_stream (prog, best->stream); } - GST_DEBUG_OBJECT (COLLECT_DATA_PAD (best), - "Chose stream for output (PID: 0x%04x)", best->pid); + GST_DEBUG_OBJECT (best, "Chose stream for output (PID: 0x%04x)", best->pid); if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (buf))) { pts = GSTTIME_TO_MPEGTIME (GST_BUFFER_PTS (buf)); @@ -1551,12 +1152,450 @@ no_program: gst_buffer_unref (buf); GST_ELEMENT_ERROR (mux, STREAM, MUX, ("Stream on pad %" GST_PTR_FORMAT - " is not associated with any program", COLLECT_DATA_PAD (best)), - (NULL)); + " is not associated with any program", best), (NULL)); return GST_FLOW_ERROR; } } +/* GstElement implementation */ + +static GstPad * +gst_base_ts_mux_request_new_pad (GstElement * element, GstPadTemplate * templ, + const gchar * name, const GstCaps * caps) +{ + GstBaseTsMux *mux = GST_BASE_TS_MUX (element); + gint pid = -1; + GstPad *pad = NULL; + + if (name != NULL && sscanf (name, "sink_%d", &pid) == 1) { + if (tsmux_find_stream (mux->tsmux, pid)) + goto stream_exists; + } else { + pid = tsmux_get_new_pid (mux->tsmux); + } + + pad = (GstPad *) + GST_ELEMENT_CLASS (parent_class)->request_new_pad (element, + templ, name, caps); + + gst_base_ts_mux_pad_reset (GST_BASE_TS_MUX_PAD (pad)); + GST_BASE_TS_MUX_PAD (pad)->pid = pid; + + return pad; + + /* ERRORS */ +stream_exists: + { + GST_ELEMENT_ERROR (element, STREAM, MUX, ("Duplicate PID requested"), + (NULL)); + return NULL; + } +} + +static gboolean +gst_base_ts_mux_send_event (GstElement * element, GstEvent * event) +{ + GstMpegtsSection *section; + GstBaseTsMux *mux = GST_BASE_TS_MUX (element); + + section = gst_event_parse_mpegts_section (event); + gst_event_unref (event); + + if (section) { + GST_DEBUG ("Received event with mpegts section"); + + /* TODO: Check that the section type is supported */ + tsmux_add_mpegts_si_section (mux->tsmux, section); + + return TRUE; + } + + return FALSE; +} + +/* GstAggregator implementation */ + +static gboolean +gst_base_ts_mux_sink_event (GstAggregator * agg, GstAggregatorPad * agg_pad, + GstEvent * event) +{ + GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class); + GstBaseTsMux *mux = GST_BASE_TS_MUX (agg); + GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (agg_pad); + gboolean res = FALSE; + gboolean forward = TRUE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_DOWNSTREAM: + { + GstClockTime timestamp, stream_time, running_time; + gboolean all_headers; + guint count; + + if (!gst_video_event_is_force_key_unit (event)) + goto out; + + res = TRUE; + forward = FALSE; + + gst_video_event_parse_downstream_force_key_unit (event, + ×tamp, &stream_time, &running_time, &all_headers, &count); + GST_INFO_OBJECT (ts_pad, "have downstream force-key-unit event, " + "seqnum %d, running-time %" GST_TIME_FORMAT " count %d", + gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), count); + + if (mux->force_key_unit_event != NULL) { + GST_INFO_OBJECT (mux, "skipping downstream force key unit event " + "as an upstream force key unit is already queued"); + goto out; + } + + if (!all_headers) + goto out; + + mux->pending_key_unit_ts = running_time; + gst_event_replace (&mux->force_key_unit_event, event); + break; + } + case GST_EVENT_TAG:{ + GstTagList *list; + gchar *lang = NULL; + + GST_DEBUG_OBJECT (mux, "received tag event"); + gst_event_parse_tag (event, &list); + + /* Matroska wants ISO 639-2B code, taglist most likely contains 639-1 */ + if (gst_tag_list_get_string (list, GST_TAG_LANGUAGE_CODE, &lang)) { + const gchar *lang_code; + + lang_code = gst_tag_get_language_code_iso_639_2B (lang); + if (lang_code) { + GST_DEBUG_OBJECT (ts_pad, "Setting language to '%s'", lang_code); + + g_free (ts_pad->language); + ts_pad->language = g_strdup (lang_code); + } else { + GST_WARNING_OBJECT (ts_pad, "Did not get language code for '%s'", + lang); + } + g_free (lang); + } + + /* handled this, don't want collectpads to forward it downstream */ + res = TRUE; + forward = gst_tag_list_get_scope (list) == GST_TAG_SCOPE_GLOBAL; + break; + } + case GST_EVENT_STREAM_START:{ + GstStreamFlags flags; + + gst_event_parse_stream_flags (event, &flags); + + /* Don't wait for data on sparse inputs like metadata streams */ + /* + if ((flags & GST_STREAM_FLAG_SPARSE)) { + GST_COLLECT_PADS_STATE_UNSET (data, GST_COLLECT_PADS_STATE_LOCKED); + gst_collect_pads_set_waiting (pads, data, FALSE); + GST_COLLECT_PADS_STATE_SET (data, GST_COLLECT_PADS_STATE_LOCKED); + } + */ + break; + } + case GST_EVENT_FLUSH_STOP:{ + GList *cur; + + /* Send initial segments again after a flush-stop, and also resend the + * header sections */ + mux->first = TRUE; + + /* output PAT, SI tables */ + tsmux_resend_pat (mux->tsmux); + tsmux_resend_si (mux->tsmux); + + /* output PMT for each program */ + for (cur = mux->tsmux->programs; cur; cur = cur->next) { + TsMuxProgram *program = (TsMuxProgram *) cur->data; + + tsmux_resend_pmt (program); + } + break; + } + default: + break; + } + +out: + if (!forward) + gst_event_unref (event); + else + res = agg_class->sink_event (agg, agg_pad, event); + + return res; +} + +static gboolean +gst_base_ts_mux_src_event (GstAggregator * agg, GstEvent * event) +{ + GstAggregatorClass *agg_class = GST_AGGREGATOR_CLASS (parent_class); + GstBaseTsMux *mux = GST_BASE_TS_MUX (agg); + gboolean res = TRUE, forward = TRUE; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CUSTOM_UPSTREAM: + { + GstIterator *iter; + GstIteratorResult iter_ret; + GstPad *sinkpad; + GValue sinkpad_value = G_VALUE_INIT; + GstClockTime running_time; + gboolean all_headers, done, res = FALSE; + guint count; + + if (!gst_video_event_is_force_key_unit (event)) + break; + + forward = FALSE; + + gst_video_event_parse_upstream_force_key_unit (event, + &running_time, &all_headers, &count); + + GST_INFO_OBJECT (mux, "received upstream force-key-unit event, " + "seqnum %d running_time %" GST_TIME_FORMAT " all_headers %d count %d", + gst_event_get_seqnum (event), GST_TIME_ARGS (running_time), + all_headers, count); + + if (!all_headers) + break; + + mux->pending_key_unit_ts = running_time; + gst_event_replace (&mux->force_key_unit_event, event); + + iter = gst_element_iterate_sink_pads (GST_ELEMENT_CAST (mux)); + done = FALSE; + while (!done) { + gboolean tmp; + + iter_ret = gst_iterator_next (iter, &sinkpad_value); + sinkpad = GST_PAD (g_value_get_object (&sinkpad_value)); + + switch (iter_ret) { + case GST_ITERATOR_DONE: + done = TRUE; + break; + case GST_ITERATOR_OK: + GST_INFO_OBJECT (GST_AGGREGATOR_SRC_PAD (agg), "forwarding"); + tmp = gst_pad_push_event (sinkpad, gst_event_ref (event)); + GST_INFO_OBJECT (mux, "result %d", tmp); + /* succeed if at least one pad succeeds */ + res |= tmp; + break; + case GST_ITERATOR_ERROR: + done = TRUE; + break; + case GST_ITERATOR_RESYNC: + break; + } + g_value_reset (&sinkpad_value); + } + g_value_unset (&sinkpad_value); + gst_iterator_free (iter); + break; + } + default: + break; + } + + if (forward) + res = agg_class->src_event (agg, event); + else + gst_event_unref (event); + + return res; +} + +static GstBuffer * +gst_base_ts_mux_clip (GstAggregator * agg, + GstAggregatorPad * agg_pad, GstBuffer * buf) +{ + GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (agg_pad); + GstClockTime time; + GstBuffer *ret; + + ret = buf; + + /* PTS */ + time = GST_BUFFER_PTS (buf); + + /* invalid left alone and passed */ + if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) { + time = + gst_segment_to_running_time (&agg_pad->segment, GST_FORMAT_TIME, time); + if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time))) { + GST_DEBUG_OBJECT (pad, "clipping buffer on pad outside segment"); + gst_buffer_unref (buf); + ret = NULL; + goto beach; + } else { + GST_LOG_OBJECT (pad, "buffer pts %" GST_TIME_FORMAT " -> %" + GST_TIME_FORMAT " running time", + GST_TIME_ARGS (GST_BUFFER_PTS (buf)), GST_TIME_ARGS (time)); + buf = ret = gst_buffer_make_writable (buf); + GST_BUFFER_PTS (ret) = time; + } + } + + /* DTS */ + time = GST_BUFFER_DTS (buf); + + /* invalid left alone and passed */ + if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (time))) { + gint sign; + gint64 dts; + + sign = gst_segment_to_running_time_full (&agg_pad->segment, GST_FORMAT_TIME, + time, &time); + + if (sign > 0) + dts = (gint64) time; + else + dts = -((gint64) time); + + GST_LOG_OBJECT (pad, "buffer dts %" GST_TIME_FORMAT " -> %" + GST_STIME_FORMAT " running time", GST_TIME_ARGS (GST_BUFFER_DTS (buf)), + GST_STIME_ARGS (dts)); + + if (GST_CLOCK_STIME_IS_VALID (pad->dts) && dts < pad->dts) { + /* Ignore DTS going backward */ + GST_WARNING_OBJECT (pad, "ignoring DTS going backward"); + dts = pad->dts; + } + + ret = gst_buffer_make_writable (buf); + if (sign > 0) + GST_BUFFER_DTS (ret) = time; + else + GST_BUFFER_DTS (ret) = GST_CLOCK_TIME_NONE; + + pad->dts = dts; + } else { + pad->dts = GST_CLOCK_STIME_NONE; + } + +beach: + return ret; +} + +static GstFlowReturn +gst_base_ts_mux_update_src_caps (GstAggregator * agg, GstCaps * caps, + GstCaps ** ret) +{ + GstBaseTsMux *mux = GST_BASE_TS_MUX (agg); + GstStructure *s; + + *ret = gst_caps_copy (caps); + s = gst_caps_get_structure (*ret, 0); + gst_structure_set (s, "packetsize", G_TYPE_INT, mux->packet_size, NULL); + + return GST_FLOW_OK; +} + +static GstBaseTsMuxPad * +gst_base_ts_mux_find_best_pad (GstAggregator * aggregator) +{ + GstBaseTsMuxPad *pad, *best = NULL; + GList *l; + GstBuffer *buffer; + GstClockTime best_ts = GST_CLOCK_TIME_NONE; + + for (l = GST_ELEMENT_CAST (aggregator)->sinkpads; l; l = l->next) { + pad = GST_BASE_TS_MUX_PAD (l->data); + buffer = gst_aggregator_pad_peek_buffer (GST_AGGREGATOR_PAD (pad)); + if (!buffer) + continue; + if (best_ts == GST_CLOCK_TIME_NONE) { + best = pad; + best_ts = GST_BUFFER_DTS_OR_PTS (buffer); + } else if (GST_BUFFER_DTS_OR_PTS (buffer) != GST_CLOCK_TIME_NONE) { + GstClockTime t = GST_BUFFER_DTS_OR_PTS (buffer); + if (t < best_ts) { + best = pad; + best_ts = t; + } + } + gst_buffer_unref (buffer); + } + GST_DEBUG_OBJECT (aggregator, + "Best pad found with %" GST_TIME_FORMAT ": %" GST_PTR_FORMAT, + GST_TIME_ARGS (best_ts), best); + + return best; +} + +static gboolean +gst_base_ts_mux_are_all_pads_eos (GstBaseTsMux * mux) +{ + GList *l; + + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstBaseTsMuxPad *pad = GST_BASE_TS_MUX_PAD (l->data); + + if (!gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad))) + return FALSE; + } + return TRUE; +} + + +static GstFlowReturn +gst_base_ts_mux_aggregate (GstAggregator * agg, gboolean timeout) +{ + GstBaseTsMux *mux = GST_BASE_TS_MUX (agg); + GstFlowReturn ret = GST_FLOW_OK; + GstBaseTsMuxPad *best = gst_base_ts_mux_find_best_pad (agg); + + if (best) { + GstBuffer *buffer; + + buffer = gst_aggregator_pad_pop_buffer (GST_AGGREGATOR_PAD (best)); + + ret = + gst_base_ts_mux_aggregate_buffer (GST_BASE_TS_MUX (agg), + GST_AGGREGATOR_PAD (best), buffer); + + if (ret != GST_FLOW_OK) + goto done; + } + + if (gst_base_ts_mux_are_all_pads_eos (mux)) { + GstBaseTsMuxClass *klass = GST_BASE_TS_MUX_GET_CLASS (mux); + /* drain some possibly cached data */ + if (klass->drain) + klass->drain (mux); + gst_base_ts_mux_push_packets (mux, TRUE); + + ret = GST_FLOW_EOS; + } + +done: + return ret; +} + +static gboolean +gst_base_ts_mux_start (GstAggregator * agg) +{ + gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), TRUE); + + return TRUE; +} + +static gboolean +gst_base_ts_mux_stop (GstAggregator * agg) +{ + gst_base_ts_mux_reset (GST_BASE_TS_MUX (agg), FALSE); + + return TRUE; +} + /* GObject implementation */ static void @@ -1570,10 +1609,6 @@ gst_base_ts_mux_dispose (GObject * object) g_object_unref (mux->out_adapter); mux->out_adapter = NULL; } - if (mux->collect) { - gst_object_unref (mux->collect); - mux->collect = NULL; - } if (mux->prog_map) { gst_structure_free (mux->prog_map); mux->prog_map = NULL; @@ -1599,7 +1634,7 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstBaseTsMux *mux = GST_BASE_TS_MUX (object); - GSList *walk; + GList *walk; switch (prop_id) { case PROP_PROG_MAP: @@ -1620,14 +1655,14 @@ gst_base_ts_mux_set_property (GObject * object, guint prop_id, tsmux_set_pat_interval (mux->tsmux, mux->pat_interval); break; case PROP_PMT_INTERVAL: - walk = mux->collect->data; + walk = GST_ELEMENT (object)->sinkpads; mux->pmt_interval = g_value_get_uint (value); while (walk) { - GstBaseTsPadData *ts_data = (GstBaseTsPadData *) walk->data; + GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data); - tsmux_set_pmt_interval (ts_data->prog, mux->pmt_interval); - walk = g_slist_next (walk); + tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval); + walk = g_list_next (walk); } break; case PROP_ALIGNMENT: @@ -1687,6 +1722,7 @@ gst_base_ts_mux_default_create_ts_mux (GstBaseTsMux * mux) TsMux *tsmux = tsmux_new (); tsmux_set_write_func (tsmux, new_packet_cb, mux); tsmux_set_alloc_func (tsmux, alloc_packet_cb, mux); + tsmux_set_bitrate (tsmux, mux->bitrate); return tsmux; } @@ -1729,6 +1765,7 @@ static void gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass) { GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + GstAggregatorClass *gstagg_class = GST_AGGREGATOR_CLASS (klass); GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GST_DEBUG_CATEGORY_INIT (gst_base_ts_mux_debug, "basetsmux", 0, @@ -1747,10 +1784,16 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass) gobject_class->constructed = gst_base_ts_mux_constructed; gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad; - gstelement_class->release_pad = gst_base_ts_mux_release_pad; - gstelement_class->change_state = gst_base_ts_mux_change_state; gstelement_class->send_event = gst_base_ts_mux_send_event; + gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps; + gstagg_class->aggregate = gst_base_ts_mux_aggregate; + gstagg_class->clip = gst_base_ts_mux_clip; + gstagg_class->sink_event = gst_base_ts_mux_sink_event; + gstagg_class->src_event = gst_base_ts_mux_src_event; + gstagg_class->start = gst_base_ts_mux_start; + gstagg_class->stop = gst_base_ts_mux_stop; + klass->create_ts_mux = gst_base_ts_mux_default_create_ts_mux; klass->allocate_packet = gst_base_ts_mux_default_allocate_packet; klass->output_packet = gst_base_ts_mux_default_output_packet; @@ -1792,29 +1835,14 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass) " to achieve multiplex-wide constant bitrate", 0, G_MAXUINT64, TSMUX_DEFAULT_BITRATE, (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS))); + + gst_element_class_add_static_pad_template_with_gtype (gstelement_class, + &gst_base_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD); } static void gst_base_ts_mux_init (GstBaseTsMux * mux) { - mux->srcpad = - gst_pad_new_from_static_template (&gst_base_ts_mux_src_factory, "src"); - gst_pad_use_fixed_caps (mux->srcpad); - gst_pad_set_event_function (mux->srcpad, - GST_DEBUG_FUNCPTR (gst_base_ts_mux_src_event)); - gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad); - - mux->collect = gst_collect_pads_new (); - gst_collect_pads_set_buffer_function (mux->collect, - (GstCollectPadsBufferFunction) - GST_DEBUG_FUNCPTR (gst_base_ts_mux_collected_buffer), mux); - - gst_collect_pads_set_event_function (mux->collect, - (GstCollectPadsEventFunction) - GST_DEBUG_FUNCPTR (gst_base_ts_mux_sink_event), mux); - gst_collect_pads_set_clip_function (mux->collect, (GstCollectPadsClipFunction) - GST_DEBUG_FUNCPTR (gst_base_ts_mux_clip_inc_running_time), mux); - mux->out_adapter = gst_adapter_new (); /* properties */ diff --git a/gst/mpegtsmux/gstbasetsmux.h b/gst/mpegtsmux/gstbasetsmux.h index 9a3ff203ad..7b89225cfe 100644 --- a/gst/mpegtsmux/gstbasetsmux.h +++ b/gst/mpegtsmux/gstbasetsmux.h @@ -86,12 +86,73 @@ #include #include #include -#include +#include G_BEGIN_DECLS #include "tsmux/tsmux.h" +#define GST_TYPE_BASE_TS_MUX_PAD (gst_base_ts_mux_pad_get_type()) +#define GST_BASE_TS_MUX_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASE_TS_MUX_PAD, GstBaseTsMuxPad)) +#define GST_BASE_TS_MUX_PAD_CAST(obj) ((GstBaseTsMuxPad *)(obj)) +#define GST_BASE_TS_MUX_PAD_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASE_TS_MUX_PAD, GstBaseTsMuxPadClass)) +#define GST_IS_BASE_TS_MUX_PAD(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASE_TS_MUX_PAD)) +#define GST_IS_BASE_TS_MUX_PAD_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASE_TS_MUX_PAD)) +#define GST_BASE_TS_MUX_PAD_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS((obj),GST_TYPE_BASE_TS_MUX_PAD,GstBaseTsMuxPadClass)) + +typedef struct _GstBaseTsMuxPad GstBaseTsMuxPad; +typedef struct _GstBaseTsMuxPadClass GstBaseTsMuxPadClass; +typedef struct _GstBaseTsMuxPadPrivate GstBaseTsMuxPadPrivate; +typedef struct GstBaseTsMux GstBaseTsMux; +typedef struct GstBaseTsMuxClass GstBaseTsMuxClass; +typedef struct GstBaseTsPadData GstBaseTsPadData; + +typedef GstBuffer * (*GstBaseTsMuxPadPrepareFunction) (GstBuffer * buf, + GstBaseTsMuxPad * data, GstBaseTsMux * mux); + +typedef void (*GstBaseTsMuxPadFreePrepareDataFunction) (gpointer prepare_data); + +struct _GstBaseTsMuxPad +{ + GstAggregatorPad parent; + + gint pid; + TsMuxStream *stream; + + /* most recent DTS */ + gint64 dts; + + /* optional codec data available in the caps */ + GstBuffer *codec_data; + + /* Opaque data pointer to a structure used by the prepare function */ + gpointer prepare_data; + + /* handler to prepare input data */ + GstBaseTsMuxPadPrepareFunction prepare_func; + /* handler to free the private data */ + GstBaseTsMuxPadFreePrepareDataFunction free_func; + + /* program id to which it is attached to (not program pid) */ + gint prog_id; + /* program this stream belongs to */ + TsMuxProgram *prog; + + gchar *language; +}; + +struct _GstBaseTsMuxPadClass +{ + GstAggregatorPadClass parent_class; +}; + +GType gst_base_ts_mux_pad_get_type (void); + #define GST_TYPE_BASE_TS_MUX (gst_base_ts_mux_get_type()) #define GST_BASE_TS_MUX(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_BASE_TS_MUX, GstBaseTsMux)) #define GST_BASE_TS_MUX_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BASE_TS_MUX, GstBaseTsMuxClass)) @@ -99,21 +160,8 @@ G_BEGIN_DECLS #define GST_BASE_TS_MUX_NORMAL_PACKET_LENGTH 188 -typedef struct GstBaseTsMux GstBaseTsMux; -typedef struct GstBaseTsMuxClass GstBaseTsMuxClass; -typedef struct GstBaseTsPadData GstBaseTsPadData; - -typedef GstBuffer * (*GstBaseTsPadDataPrepareFunction) (GstBuffer * buf, - GstBaseTsPadData * data, GstBaseTsMux * mux); - -typedef void (*GstBaseTsPadDataFreePrepareDataFunction) (gpointer prepare_data); - struct GstBaseTsMux { - GstElement parent; - - GstPad *srcpad; - - GstCollectPads *collect; + GstAggregator parent; TsMux *tsmux; GHashTable *programs; @@ -166,10 +214,10 @@ struct GstBaseTsMux { * Called at EOS, if the subclass has data it needs to drain. */ struct GstBaseTsMuxClass { - GstElementClass parent_class; + GstAggregatorClass parent_class; TsMux * (*create_ts_mux) (GstBaseTsMux *mux); - guint (*handle_media_type) (GstBaseTsMux *mux, const gchar *media_type, GstBaseTsPadData * ts_data); + guint (*handle_media_type) (GstBaseTsMux *mux, const gchar *media_type, GstBaseTsMuxPad * pad); void (*allocate_packet) (GstBaseTsMux *mux, GstBuffer **buffer); gboolean (*output_packet) (GstBaseTsMux *mux, GstBuffer *buffer, gint64 new_pcr); void (*reset) (GstBaseTsMux *mux); @@ -179,6 +227,11 @@ struct GstBaseTsMuxClass { void gst_base_ts_mux_set_packet_size (GstBaseTsMux *mux, gsize size); void gst_base_ts_mux_set_automatic_alignment (GstBaseTsMux *mux, gsize alignment); +typedef GstBuffer * (*GstBaseTsPadDataPrepareFunction) (GstBuffer * buf, + GstBaseTsPadData * data, GstBaseTsMux * mux); + +typedef void (*GstBaseTsPadDataFreePrepareDataFunction) (gpointer prepare_data); + struct GstBaseTsPadData { /* parent */ GstCollectData collect; @@ -210,7 +263,6 @@ struct GstBaseTsPadData { GType gst_base_ts_mux_get_type (void); - G_END_DECLS #endif diff --git a/gst/mpegtsmux/gstbasetsmuxaac.c b/gst/mpegtsmux/gstbasetsmuxaac.c index a4e85eeb0f..04dcd002b2 100644 --- a/gst/mpegtsmux/gstbasetsmuxaac.c +++ b/gst/mpegtsmux/gstbasetsmuxaac.c @@ -90,7 +90,7 @@ #define GST_CAT_DEFAULT gst_base_ts_mux_debug GstBuffer * -gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data, +gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux) { guint8 adts_header[7] = { 0, }; @@ -106,7 +106,7 @@ gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data, gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA | GST_BUFFER_COPY_TIMESTAMPS, 0, 0); - gst_buffer_map (data->codec_data, &codec_data_map, GST_MAP_READ); + gst_buffer_map (pad->codec_data, &codec_data_map, GST_MAP_READ); /* Generate ADTS header */ obj_type = GST_READ_UINT8 (codec_data_map.data) >> 3; @@ -149,7 +149,7 @@ gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data, /* Now copy complete frame */ gst_buffer_fill (out_buf, out_offset, buf_map.data, buf_map.size); - gst_buffer_unmap (data->codec_data, &codec_data_map); + gst_buffer_unmap (pad->codec_data, &codec_data_map); gst_buffer_unmap (buf, &buf_map); return out_buf; diff --git a/gst/mpegtsmux/gstbasetsmuxaac.h b/gst/mpegtsmux/gstbasetsmuxaac.h index 2ae5091972..9e60a56ced 100644 --- a/gst/mpegtsmux/gstbasetsmuxaac.h +++ b/gst/mpegtsmux/gstbasetsmuxaac.h @@ -85,7 +85,7 @@ #include "gstbasetsmux.h" -GstBuffer * gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsPadData * data, +GstBuffer * gst_base_ts_mux_prepare_aac (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux); #endif /* __BASETSMUX_AAC_H__ */ diff --git a/gst/mpegtsmux/gstbasetsmuxjpeg2000.c b/gst/mpegtsmux/gstbasetsmuxjpeg2000.c index fca79368dc..996b76a35c 100644 --- a/gst/mpegtsmux/gstbasetsmuxjpeg2000.c +++ b/gst/mpegtsmux/gstbasetsmuxjpeg2000.c @@ -36,10 +36,10 @@ #define GST_CAT_DEFAULT gst_base_ts_mux_debug GstBuffer * -gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsPadData * data, +gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux) { - j2k_private_data *private_data = data->prepare_data; + j2k_private_data *private_data = pad->prepare_data; GstByteWriter wr; GstBuffer *out_buf = NULL; guint8 *elsm_header = NULL; diff --git a/gst/mpegtsmux/gstbasetsmuxjpeg2000.h b/gst/mpegtsmux/gstbasetsmuxjpeg2000.h index 73fbc38117..284f530b10 100644 --- a/gst/mpegtsmux/gstbasetsmuxjpeg2000.h +++ b/gst/mpegtsmux/gstbasetsmuxjpeg2000.h @@ -55,7 +55,7 @@ typedef struct j2k_private_data guint8 color_spec; } j2k_private_data; -GstBuffer *gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsPadData * data, +GstBuffer *gst_base_ts_mux_prepare_jpeg2000 (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux); void gst_base_ts_mux_free_jpeg2000 (gpointer prepare_data); diff --git a/gst/mpegtsmux/gstbasetsmuxopus.c b/gst/mpegtsmux/gstbasetsmuxopus.c index e642f09248..41a8b20bff 100644 --- a/gst/mpegtsmux/gstbasetsmuxopus.c +++ b/gst/mpegtsmux/gstbasetsmuxopus.c @@ -91,7 +91,7 @@ #define GST_CAT_DEFAULT gst_base_ts_mux_debug GstBuffer * -gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsPadData * pad_data, +gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux) { gssize insize = gst_buffer_get_size (buf); diff --git a/gst/mpegtsmux/gstbasetsmuxopus.h b/gst/mpegtsmux/gstbasetsmuxopus.h index b1af5ce365..375de5218a 100644 --- a/gst/mpegtsmux/gstbasetsmuxopus.h +++ b/gst/mpegtsmux/gstbasetsmuxopus.h @@ -85,7 +85,7 @@ #include "gstbasetsmux.h" -GstBuffer * gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsPadData * data, +GstBuffer * gst_base_ts_mux_prepare_opus (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux); #endif /* __BASETSMUX_OPUS_H__ */ diff --git a/gst/mpegtsmux/gstbasetsmuxttxt.c b/gst/mpegtsmux/gstbasetsmuxttxt.c index 9f9d9fed05..f9a16e4e9f 100644 --- a/gst/mpegtsmux/gstbasetsmuxttxt.c +++ b/gst/mpegtsmux/gstbasetsmuxttxt.c @@ -98,7 +98,7 @@ */ GstBuffer * -gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsPadData * pad_data, +gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux) { GstBuffer *out_buf; diff --git a/gst/mpegtsmux/gstbasetsmuxttxt.h b/gst/mpegtsmux/gstbasetsmuxttxt.h index 4975aa1517..bb2082fe41 100644 --- a/gst/mpegtsmux/gstbasetsmuxttxt.h +++ b/gst/mpegtsmux/gstbasetsmuxttxt.h @@ -85,7 +85,7 @@ #include "gstbasetsmux.h" -GstBuffer * gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsPadData * data, +GstBuffer * gst_base_ts_mux_prepare_teletext (GstBuffer * buf, GstBaseTsMuxPad * pad, GstBaseTsMux * mux); #endif /* __BASETSMUX_TTXT_H__ */ diff --git a/gst/mpegtsmux/gstmpegtsmux.c b/gst/mpegtsmux/gstmpegtsmux.c index 2f244de685..f7212eb8c7 100644 --- a/gst/mpegtsmux/gstmpegtsmux.c +++ b/gst/mpegtsmux/gstmpegtsmux.c @@ -395,11 +395,11 @@ gst_mpeg_ts_mux_class_init (GstMpegTsMuxClass * klass) "Multiplexes media streams into an MPEG Transport Stream", "Fluendo "); - gst_element_class_add_static_pad_template (gstelement_class, - &gst_mpeg_ts_mux_sink_factory); + gst_element_class_add_static_pad_template_with_gtype (gstelement_class, + &gst_mpeg_ts_mux_sink_factory, GST_TYPE_BASE_TS_MUX_PAD); - gst_element_class_add_static_pad_template (gstelement_class, - &gst_mpeg_ts_mux_src_factory); + gst_element_class_add_static_pad_template_with_gtype (gstelement_class, + &gst_mpeg_ts_mux_src_factory, GST_TYPE_AGGREGATOR_PAD); g_object_class_install_property (gobject_class, PROP_M2TS_MODE, g_param_spec_boolean ("m2ts-mode", "M2TS(192 bytes) Mode", diff --git a/gst/mpegtsmux/tsmux/tsmux.c b/gst/mpegtsmux/tsmux/tsmux.c index d3a3bb26e8..71550d41c8 100644 --- a/gst/mpegtsmux/tsmux/tsmux.c +++ b/gst/mpegtsmux/tsmux/tsmux.c @@ -1085,7 +1085,7 @@ pad_stream (TsMux * mux, TsMuxStream * stream, gint64 cur_ts, gint64 * cur_pcr) GST_LOG ("Transport stream bitrate: %" G_GUINT64_FORMAT, bitrate); if (bitrate < mux->bitrate) { - GST_LOG_OBJECT (mux, "Padding transport stream"); + GST_LOG ("Padding transport stream"); if (!tsmux_get_buffer (mux, &buf)) { ret = FALSE; diff --git a/tests/check/elements/mpegtsmux.c b/tests/check/elements/mpegtsmux.c index a8273b68f5..2c8a4951c1 100644 --- a/tests/check/elements/mpegtsmux.c +++ b/tests/check/elements/mpegtsmux.c @@ -75,14 +75,14 @@ setup_src_pad (GstElement * element, sinkpad = gst_element_get_request_pad (element, sinkname); fail_if (sinkpad == NULL, "Could not get sink pad from %s", GST_ELEMENT_NAME (element)); - /* references are owned by: 1) us, 2) tsmux, 3) collect pads */ - ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3); + /* references are owned by: 1) us, 2) tsmux */ + ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2); fail_unless (gst_pad_link (srcpad, sinkpad) == GST_PAD_LINK_OK, "Could not link source and %s sink pads", GST_ELEMENT_NAME (element)); gst_object_unref (sinkpad); /* because we got it higher up */ - /* references are owned by: 1) tsmux, 2) collect pads */ - ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2); + /* references are owned by: 1) tsmux */ + ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 1); if (padname) *padname = g_strdup (GST_PAD_NAME (sinkpad)); @@ -98,16 +98,16 @@ teardown_src_pad (GstElement * element, const gchar * sinkname) /* clean up floating src pad */ if (!(sinkpad = gst_element_get_static_pad (element, sinkname))) sinkpad = gst_element_get_request_pad (element, sinkname); - /* pad refs held by 1) tsmux 2) collectpads and 3) us (through _get) */ - ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3); + /* pad refs held by 1) tsmux 2) us (through _get) */ + ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2); srcpad = gst_pad_get_peer (sinkpad); gst_pad_unlink (srcpad, sinkpad); GST_DEBUG ("src %p", srcpad); /* after unlinking, pad refs still held by - * 1) tsmux and 2) collectpads and 3) us (through _get) */ - ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 3); + * 1) tsmux and 2) us (through _get) */ + ASSERT_OBJECT_REFCOUNT (sinkpad, "sinkpad", 2); gst_object_unref (sinkpad); /* one more ref is held by element itself */ @@ -161,6 +161,7 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate, gint i; gint pmt_pid = -1, el_pid = -1, pcr_pid = -1, packets = 0; gchar *padname; + GstQuery *drain; mux = setup_tsmux (srctemplate, sinkname, &padname); @@ -200,6 +201,10 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate, ts += 40 * GST_MSECOND; } + drain = gst_query_new_drain (); + gst_pad_peer_query (mysrcpad, drain); + gst_query_unref (drain); + if (check_func) check_func (buffers); @@ -366,339 +371,6 @@ GST_START_TEST (test_audio) GST_END_TEST; - -typedef struct _TestData -{ - GstEvent *sink_event; - gint src_events; -} TestData; - -typedef struct _ThreadData -{ - GstPad *pad; - GstBuffer *buffer; - GstFlowReturn flow_return; - GThread *thread; -} ThreadData; - -static gboolean -src_event (GstPad * pad, GstObject * parent, GstEvent * event) -{ - TestData *data = (TestData *) gst_pad_get_element_private (pad); - - if (event->type == GST_EVENT_CUSTOM_UPSTREAM) - data->src_events += 1; - - gst_event_unref (event); - return TRUE; -} - -static gboolean -sink_event (GstPad * pad, GstObject * parent, GstEvent * event) -{ - TestData *data = (TestData *) gst_pad_get_element_private (pad); - - if (event->type == GST_EVENT_CUSTOM_DOWNSTREAM) - data->sink_event = event; - - gst_event_unref (event); - return TRUE; -} - -static void -link_sinks (GstElement * mpegtsmux, - GstPad ** src1, GstPad ** src2, GstPad ** src3, TestData * test_data) -{ - GstPad *mux_sink1, *mux_sink2, *mux_sink3; - - /* link 3 sink pads, 2 video 1 audio */ - *src1 = gst_pad_new_from_static_template (&video_src_template, "src1"); - gst_pad_set_active (*src1, TRUE); - gst_pad_set_element_private (*src1, test_data); - gst_pad_set_event_function (*src1, src_event); - mux_sink1 = gst_element_get_request_pad (mpegtsmux, "sink_1"); - fail_unless (gst_pad_link (*src1, mux_sink1) == GST_PAD_LINK_OK); - - *src2 = gst_pad_new_from_static_template (&video_src_template, "src2"); - gst_pad_set_active (*src2, TRUE); - gst_pad_set_element_private (*src2, test_data); - gst_pad_set_event_function (*src2, src_event); - mux_sink2 = gst_element_get_request_pad (mpegtsmux, "sink_2"); - fail_unless (gst_pad_link (*src2, mux_sink2) == GST_PAD_LINK_OK); - - *src3 = gst_pad_new_from_static_template (&audio_src_template, "src3"); - gst_pad_set_active (*src3, TRUE); - gst_pad_set_element_private (*src3, test_data); - gst_pad_set_event_function (*src3, src_event); - mux_sink3 = gst_element_get_request_pad (mpegtsmux, "sink_3"); - fail_unless (gst_pad_link (*src3, mux_sink3) == GST_PAD_LINK_OK); - - gst_object_unref (mux_sink1); - gst_object_unref (mux_sink2); - gst_object_unref (mux_sink3); -} - -static void -link_src (GstElement * mpegtsmux, GstPad ** sink, TestData * test_data) -{ - GstPad *mux_src; - - mux_src = gst_element_get_static_pad (mpegtsmux, "src"); - *sink = gst_pad_new_from_static_template (&sink_template, "sink"); - gst_pad_set_active (*sink, TRUE); - gst_pad_set_event_function (*sink, sink_event); - gst_pad_set_element_private (*sink, test_data); - fail_unless (gst_pad_link (mux_src, *sink) == GST_PAD_LINK_OK); - - gst_object_unref (mux_src); -} - -static void -setup_caps (GstElement * mpegtsmux, GstPad * src1, GstPad * src2, GstPad * src3) -{ - GstSegment segment; - GstCaps *caps; - - gst_segment_init (&segment, GST_FORMAT_TIME); - - caps = gst_caps_new_simple ("video/x-h264", - "stream-format", G_TYPE_STRING, "byte-stream", - "alignment", G_TYPE_STRING, "nal", NULL); - gst_pad_push_event (src1, gst_event_new_stream_start ("1")); - gst_pad_push_event (src1, gst_event_new_caps (caps)); - gst_pad_push_event (src1, gst_event_new_segment (&segment)); - gst_pad_push_event (src2, gst_event_new_stream_start ("2")); - gst_pad_push_event (src2, gst_event_new_caps (caps)); - gst_pad_push_event (src2, gst_event_new_segment (&segment)); - gst_caps_unref (caps); - caps = gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 4, - "stream-format", G_TYPE_STRING, "raw", "framed", G_TYPE_BOOLEAN, TRUE, - NULL); - gst_pad_push_event (src3, gst_event_new_stream_start ("3")); - gst_pad_push_event (src3, gst_event_new_caps (caps)); - gst_pad_push_event (src3, gst_event_new_segment (&segment)); - gst_caps_unref (caps); -} - -static gpointer -pad_push_thread (gpointer user_data) -{ - ThreadData *data = (ThreadData *) user_data; - - data->flow_return = gst_pad_push (data->pad, data->buffer); - - return NULL; -} - -static ThreadData * -pad_push (GstPad * pad, GstBuffer * buffer, GstClockTime timestamp) -{ - ThreadData *data; - - data = g_new0 (ThreadData, 1); - data->pad = pad; - data->buffer = buffer; - GST_BUFFER_TIMESTAMP (buffer) = timestamp; - data->thread = g_thread_try_new ("gst-check", pad_push_thread, data, NULL); - - return data; -} - -GST_START_TEST (test_force_key_unit_event_downstream) -{ - GstElement *mpegtsmux; - GstPad *sink; - GstPad *src1; - GstPad *src2; - GstPad *src3; - GstEvent *sink_event; - GstClockTime timestamp, stream_time, running_time; - gboolean all_headers = TRUE; - gint count = 0; - ThreadData *thread_data_1, *thread_data_2, *thread_data_3, *thread_data_4; - TestData test_data = { 0, }; - - mpegtsmux = gst_check_setup_element ("mpegtsmux"); - - link_src (mpegtsmux, &sink, &test_data); - link_sinks (mpegtsmux, &src1, &src2, &src3, &test_data); - gst_element_set_state (mpegtsmux, GST_STATE_PLAYING); - setup_caps (mpegtsmux, src1, src2, src3); - - /* send a force-key-unit event with running_time=2s */ - timestamp = stream_time = running_time = 2 * GST_SECOND; - sink_event = gst_video_event_new_downstream_force_key_unit (timestamp, - stream_time, running_time, all_headers, count); - - fail_unless (gst_pad_push_event (src1, sink_event)); - fail_unless (test_data.sink_event == NULL); - - /* push 4 buffers, make sure mpegtsmux handles the force-key-unit event when - * the buffer with the requested running time is collected */ - thread_data_1 = pad_push (src1, gst_buffer_new (), 1 * GST_SECOND); - thread_data_2 = pad_push (src2, gst_buffer_new (), 2 * GST_SECOND); - thread_data_3 = pad_push (src3, gst_buffer_new (), 3 * GST_SECOND); - - g_thread_join (thread_data_1->thread); - fail_unless (test_data.sink_event == NULL); - - /* push again on src1 so that the buffer on src2 is collected */ - thread_data_4 = pad_push (src1, gst_buffer_new (), 4 * GST_SECOND); - - g_thread_join (thread_data_2->thread); - fail_unless (test_data.sink_event != NULL); - - gst_element_set_state (mpegtsmux, GST_STATE_NULL); - - g_thread_join (thread_data_3->thread); - g_thread_join (thread_data_4->thread); - - g_free (thread_data_1); - g_free (thread_data_2); - g_free (thread_data_3); - g_free (thread_data_4); - gst_object_unref (src1); - gst_object_unref (src2); - gst_object_unref (src3); - gst_object_unref (sink); - gst_object_unref (mpegtsmux); -} - -GST_END_TEST; - -GST_START_TEST (test_force_key_unit_event_upstream) -{ - GstElement *mpegtsmux; - GstPad *sink; - GstPad *src1; - GstPad *src2; - GstPad *src3; - GstClockTime timestamp, stream_time, running_time; - gboolean all_headers = TRUE; - gint count = 0; - TestData test_data = { 0, }; - ThreadData *thread_data_1, *thread_data_2, *thread_data_3, *thread_data_4; - GstEvent *event; - - mpegtsmux = gst_check_setup_element ("mpegtsmux"); - - link_src (mpegtsmux, &sink, &test_data); - link_sinks (mpegtsmux, &src1, &src2, &src3, &test_data); - gst_element_set_state (mpegtsmux, GST_STATE_PLAYING); - setup_caps (mpegtsmux, src1, src2, src3); - - /* send an upstream force-key-unit event with running_time=2s */ - timestamp = stream_time = running_time = 2 * GST_SECOND; - event = - gst_video_event_new_upstream_force_key_unit (running_time, TRUE, count); - fail_unless (gst_pad_push_event (sink, event)); - - fail_unless (test_data.sink_event == NULL); - fail_unless_equals_int (test_data.src_events, 3); - - /* send downstream events with unrelated seqnums */ - event = gst_video_event_new_downstream_force_key_unit (timestamp, - stream_time, running_time, all_headers, count); - fail_unless (gst_pad_push_event (src1, event)); - event = gst_video_event_new_downstream_force_key_unit (timestamp, - stream_time, running_time, all_headers, count); - fail_unless (gst_pad_push_event (src2, event)); - - /* events should be skipped */ - fail_unless (test_data.sink_event == NULL); - - /* push 4 buffers, make sure mpegtsmux handles the force-key-unit event when - * the buffer with the requested running time is collected */ - thread_data_1 = pad_push (src1, gst_buffer_new (), 1 * GST_SECOND); - thread_data_2 = pad_push (src2, gst_buffer_new (), 2 * GST_SECOND); - thread_data_3 = pad_push (src3, gst_buffer_new (), 3 * GST_SECOND); - - g_thread_join (thread_data_1->thread); - fail_unless (test_data.sink_event == NULL); - - /* push again on src1 so that the buffer on src2 is collected */ - thread_data_4 = pad_push (src1, gst_buffer_new (), 4 * GST_SECOND); - - g_thread_join (thread_data_2->thread); - fail_unless (test_data.sink_event != NULL); - - gst_element_set_state (mpegtsmux, GST_STATE_NULL); - - g_thread_join (thread_data_3->thread); - g_thread_join (thread_data_4->thread); - - g_free (thread_data_1); - g_free (thread_data_2); - g_free (thread_data_3); - g_free (thread_data_4); - - gst_object_unref (src1); - gst_object_unref (src2); - gst_object_unref (src3); - gst_object_unref (sink); - gst_object_unref (mpegtsmux); -} - -GST_END_TEST; - -static GstFlowReturn expected_flow; - -static GstFlowReturn -flow_test_stat_chain_func (GstPad * pad, GstObject * parent, GstBuffer * buffer) -{ - gst_buffer_unref (buffer); - - GST_INFO ("returning flow %s (%d)", gst_flow_get_name (expected_flow), - expected_flow); - return expected_flow; -} - -GST_START_TEST (test_propagate_flow_status) -{ - GstElement *mux; - gchar *padname; - GstBuffer *inbuffer; - GstCaps *caps; - guint i; - - GstFlowReturn expected[] = { GST_FLOW_OK, GST_FLOW_FLUSHING, GST_FLOW_EOS, - GST_FLOW_NOT_NEGOTIATED, GST_FLOW_ERROR, GST_FLOW_NOT_SUPPORTED - }; - - mux = setup_tsmux (&video_src_template, "sink_%d", &padname); - gst_pad_set_chain_function (mysinkpad, flow_test_stat_chain_func); - - fail_unless (gst_element_set_state (mux, - GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS, - "could not set to playing"); - - caps = gst_caps_from_string (VIDEO_CAPS_STRING); - gst_check_setup_events (mysrcpad, mux, caps, GST_FORMAT_TIME); - gst_caps_unref (caps); - - for (i = 0; i < G_N_ELEMENTS (expected); ++i) { - GstFlowReturn res; - - inbuffer = gst_buffer_new_and_alloc (1); - ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1); - - expected_flow = expected[i]; - GST_INFO ("expecting flow %s (%d)", gst_flow_get_name (expected_flow), - expected_flow); - - GST_BUFFER_TIMESTAMP (inbuffer) = i * GST_SECOND; - - res = gst_pad_push (mysrcpad, inbuffer); - - fail_unless_equals_int (res, expected[i]); - } - - cleanup_tsmux (mux, padname); - g_free (padname); -} - -GST_END_TEST; - GST_START_TEST (test_multiple_state_change) { GstElement *mux; @@ -716,7 +388,6 @@ GST_START_TEST (test_multiple_state_change) size_t num_transitions_to_test = 10; mux = setup_tsmux (&video_src_template, "sink_%d", &padname); - gst_pad_set_chain_function (mysinkpad, flow_test_stat_chain_func); gst_segment_init (&segment, GST_FORMAT_TIME); caps = gst_caps_from_string (VIDEO_CAPS_STRING); @@ -724,6 +395,7 @@ GST_START_TEST (test_multiple_state_change) gst_caps_unref (caps); for (i = 0; i < num_transitions_to_test; ++i) { + GstQuery *drain; GstState next_state = states[i % G_N_ELEMENTS (states)]; fail_unless (gst_element_set_state (mux, next_state) == GST_STATE_CHANGE_SUCCESS, @@ -739,9 +411,12 @@ GST_START_TEST (test_multiple_state_change) inbuffer = gst_buffer_new_and_alloc (1); ASSERT_BUFFER_REFCOUNT (inbuffer, "inbuffer", 1); - expected_flow = GST_FLOW_OK; GST_BUFFER_PTS (inbuffer) = 0; fail_unless (GST_FLOW_OK == gst_pad_push (mysrcpad, inbuffer)); + + drain = gst_query_new_drain (); + gst_pad_peer_query (mysrcpad, drain); + gst_query_unref (drain); } } @@ -813,9 +488,6 @@ mpegtsmux_suite (void) tcase_add_test (tc_chain, test_audio); tcase_add_test (tc_chain, test_video); - tcase_add_test (tc_chain, test_force_key_unit_event_downstream); - tcase_add_test (tc_chain, test_force_key_unit_event_upstream); - tcase_add_test (tc_chain, test_propagate_flow_status); tcase_add_test (tc_chain, test_multiple_state_change); tcase_add_test (tc_chain, test_align); tcase_add_test (tc_chain, test_keyframe_flag_propagation);