From d6a5e4ff168a17d621e2b703a6ef0b8844b85bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 23 Oct 2015 18:38:33 +0300 Subject: [PATCH] mxfmux: Port to GstAggregator --- gst/mxf/Makefile.am | 3 + gst/mxf/mxfmux.c | 581 ++++++++++++++++++++++++-------------------- gst/mxf/mxfmux.h | 33 +-- 3 files changed, 328 insertions(+), 289 deletions(-) diff --git a/gst/mxf/Makefile.am b/gst/mxf/Makefile.am index 852c976a37..2d87907fb1 100644 --- a/gst/mxf/Makefile.am +++ b/gst/mxf/Makefile.am @@ -20,10 +20,13 @@ libgstmxf_la_SOURCES = \ mxfdms1.c libgstmxf_la_CFLAGS = \ + -I$(top_srcdir)/gst-libs \ + -I$(top_builddir)/gst-libs \ $(GST_PLUGINS_BASE_CFLAGS) \ $(GST_BASE_CFLAGS) \ $(GST_CFLAGS) libgstmxf_la_LIBADD = \ + $(top_builddir)/gst-libs/gst/base/libgstbadbase-$(GST_API_VERSION).la \ $(GST_PLUGINS_BASE_LIBS) \ -lgstvideo-@GST_API_VERSION@ \ -lgstaudio-@GST_API_VERSION@ \ diff --git a/gst/mxf/mxfmux.c b/gst/mxf/mxfmux.c index c5d7f803e8..21c37cf6a6 100644 --- a/gst/mxf/mxfmux.c +++ b/gst/mxf/mxfmux.c @@ -46,6 +46,66 @@ GST_DEBUG_CATEGORY_STATIC (mxfmux_debug); #define GST_CAT_DEFAULT mxfmux_debug +#define GST_TYPE_MXF_MUX_PAD (gst_mxf_mux_pad_get_type()) +#define GST_MXF_MUX_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MXF_MUX_PAD, GstMXFMuxPad)) +#define GST_MXF_MUX_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MXF_MUX_PAD, GstMXFMuxPadClass)) +#define GST_MXF_MUX_PAD_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj),GST_TYPE_MXF_MUX_PAD, GstMXFMuxPadClass)) +#define GST_IS_MXF_MUX_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MXF_MUX_PAD)) +#define GST_IS_MXF_MUX_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MXF_MUX_PAD)) + +typedef struct +{ + GstAggregatorPad parent; + + guint64 pos; + GstClockTime last_timestamp; + + MXFMetadataFileDescriptor *descriptor; + + GstAdapter *adapter; + gboolean have_complete_edit_unit; + + gpointer mapping_data; + const MXFEssenceElementWriter *writer; + MXFEssenceElementWriteFunc write_func; + + MXFMetadataSourcePackage *source_package; + MXFMetadataTimelineTrack *source_track; +} GstMXFMuxPad; + +typedef struct +{ + GstAggregatorPadClass parent_class; +} GstMXFMuxPadClass; + +GType gst_mxf_mux_pad_get_type (void); + +G_DEFINE_TYPE (GstMXFMuxPad, gst_mxf_mux_pad, GST_TYPE_AGGREGATOR_PAD); + +static void +gst_mxf_mux_pad_finalize (GObject * object) +{ + GstMXFMuxPad *pad = GST_MXF_MUX_PAD (object); + + g_object_unref (pad->adapter); + g_free (pad->mapping_data); + + G_OBJECT_CLASS (gst_mxf_mux_pad_parent_class)->finalize (object); +} + +static void +gst_mxf_mux_pad_class_init (GstMXFMuxPadClass * klass) +{ + GObjectClass *object_class = (GObjectClass *) klass; + + object_class->finalize = gst_mxf_mux_pad_finalize; +} + +static void +gst_mxf_mux_pad_init (GstMXFMuxPad * pad) +{ +} + static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, @@ -58,7 +118,7 @@ enum }; #define gst_mxf_mux_parent_class parent_class -G_DEFINE_TYPE (GstMXFMux, gst_mxf_mux, GST_TYPE_ELEMENT); +G_DEFINE_TYPE (GstMXFMux, gst_mxf_mux, GST_TYPE_AGGREGATOR); static void gst_mxf_mux_finalize (GObject * object); static void gst_mxf_mux_set_property (GObject * object, @@ -66,19 +126,16 @@ static void gst_mxf_mux_set_property (GObject * object, static void gst_mxf_mux_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn gst_mxf_mux_collected (GstCollectPads * pads, - gpointer user_data); +static GstFlowReturn gst_mxf_mux_aggregate (GstAggregator * aggregator, + gboolean timeout); +static gboolean gst_mxf_mux_stop (GstAggregator * aggregator); -static gboolean gst_mxf_mux_handle_src_event (GstPad * pad, GstObject * parent, +static gboolean gst_mxf_mux_src_event (GstAggregator * aggregator, GstEvent * event); -static gboolean gst_mxf_mux_handle_sink_event (GstCollectPads * pads, - GstCollectData * data, GstEvent * event, gpointer user_data); -static GstPad *gst_mxf_mux_request_new_pad (GstElement * element, +static gboolean gst_mxf_mux_sink_event (GstAggregator * aggregator, + GstAggregatorPad * aggpad, GstEvent * event); +static GstAggregatorPad *gst_mxf_mux_create_new_pad (GstAggregator * aggregator, GstPadTemplate * templ, const gchar * name, const GstCaps * caps); -static void gst_mxf_mux_release_pad (GstElement * element, GstPad * pad); - -static GstStateChangeReturn -gst_mxf_mux_change_state (GstElement * element, GstStateChange transition); static void gst_mxf_mux_reset (GstMXFMux * mux); @@ -88,7 +145,7 @@ gst_mxf_mux_push (GstMXFMux * mux, GstBuffer * buf) guint size = gst_buffer_get_size (buf); GstFlowReturn ret; - ret = gst_pad_push (mux->srcpad, buf); + ret = gst_aggregator_finish_buffer (GST_AGGREGATOR (mux), buf); mux->offset += size; return ret; @@ -99,21 +156,26 @@ gst_mxf_mux_class_init (GstMXFMuxClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstAggregatorClass *gstaggregator_class; const GstPadTemplate **p; GST_DEBUG_CATEGORY_INIT (mxfmux_debug, "mxfmux", 0, "MXF muxer"); gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstaggregator_class = (GstAggregatorClass *) klass; gobject_class->finalize = gst_mxf_mux_finalize; gobject_class->set_property = gst_mxf_mux_set_property; gobject_class->get_property = gst_mxf_mux_get_property; - gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_mxf_mux_change_state); - gstelement_class->request_new_pad = - GST_DEBUG_FUNCPTR (gst_mxf_mux_request_new_pad); - gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_mxf_mux_release_pad); + gstaggregator_class->create_new_pad = + GST_DEBUG_FUNCPTR (gst_mxf_mux_create_new_pad); + gstaggregator_class->src_event = GST_DEBUG_FUNCPTR (gst_mxf_mux_src_event); + gstaggregator_class->sink_event = GST_DEBUG_FUNCPTR (gst_mxf_mux_sink_event); + gstaggregator_class->stop = GST_DEBUG_FUNCPTR (gst_mxf_mux_stop); + gstaggregator_class->aggregate = GST_DEBUG_FUNCPTR (gst_mxf_mux_aggregate); + gstaggregator_class->sinkpads_type = GST_TYPE_MXF_MUX_PAD; gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&src_templ)); @@ -134,16 +196,6 @@ gst_mxf_mux_class_init (GstMXFMuxClass * klass) static void gst_mxf_mux_init (GstMXFMux * mux) { - mux->srcpad = gst_pad_new_from_static_template (&src_templ, "src"); - gst_pad_set_event_function (mux->srcpad, gst_mxf_mux_handle_src_event); - gst_element_add_pad (GST_ELEMENT (mux), mux->srcpad); - - mux->collect = gst_collect_pads_new (); - gst_collect_pads_set_event_function (mux->collect, - GST_DEBUG_FUNCPTR (gst_mxf_mux_handle_sink_event), mux); - gst_collect_pads_set_function (mux->collect, - GST_DEBUG_FUNCPTR (gst_mxf_mux_collected), mux); - gst_mxf_mux_reset (mux); } @@ -161,8 +213,6 @@ gst_mxf_mux_finalize (GObject * object) mux->metadata_list = NULL; } - gst_object_unref (mux->collect); - G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -195,16 +245,19 @@ gst_mxf_mux_get_property (GObject * object, static void gst_mxf_mux_reset (GstMXFMux * mux) { - GSList *sl; + GList *l; - while ((sl = mux->collect->data) != NULL) { - GstMXFMuxPad *cpad = (GstMXFMuxPad *) sl->data; + GST_OBJECT_LOCK (mux); + while ((l = GST_ELEMENT_CAST (mux)->sinkpads) != NULL) { + GstPad *pad = (GstPad *) l->data; - g_object_unref (cpad->adapter); - g_free (cpad->mapping_data); - - gst_collect_pads_remove_pad (mux->collect, cpad->collect.pad); + gst_object_ref (pad); + GST_OBJECT_UNLOCK (mux); + gst_element_release_request_pad (GST_ELEMENT_CAST (mux), pad); + gst_object_unref (pad); + GST_OBJECT_LOCK (mux); } + GST_OBJECT_UNLOCK (mux); mux->state = GST_MXF_MUX_STATE_HEADER; mux->n_pads = 0; @@ -226,33 +279,27 @@ gst_mxf_mux_reset (GstMXFMux * mux) } static gboolean -gst_mxf_mux_handle_src_event (GstPad * pad, GstObject * parent, - GstEvent * event) +gst_mxf_mux_src_event (GstAggregator * aggregator, GstEvent * event) { - GstEventType type; - - type = event ? GST_EVENT_TYPE (event) : GST_EVENT_UNKNOWN; - - switch (type) { + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: /* disable seeking for now */ gst_event_unref (event); return FALSE; default: + return GST_AGGREGATOR_CLASS (parent_class)->src_event (aggregator, event); break; } - return gst_pad_event_default (pad, parent, event); + g_assert_not_reached (); } static gboolean -gst_mxf_mux_event_caps (GstPad * pad, GstCaps * caps) +gst_mxf_mux_set_caps (GstMXFMux * mux, GstMXFMuxPad * pad, GstCaps * caps) { - GstMXFMux *mux = GST_MXF_MUX (gst_pad_get_parent (pad)); - GstMXFMuxPad *cpad = (GstMXFMuxPad *) gst_pad_get_element_private (pad); gboolean ret = TRUE; MXFUUID d_instance_uid = { {0,} }; - MXFMetadataFileDescriptor *old_descriptor = cpad->descriptor; + MXFMetadataFileDescriptor *old_descriptor = pad->descriptor; GList *l; GST_DEBUG_OBJECT (pad, "Setting caps %" GST_PTR_FORMAT, caps); @@ -260,27 +307,26 @@ gst_mxf_mux_event_caps (GstPad * pad, GstCaps * caps) if (old_descriptor) { memcpy (&d_instance_uid, &MXF_METADATA_BASE (old_descriptor)->instance_uid, 16); - cpad->descriptor = NULL; - g_free (cpad->mapping_data); - cpad->mapping_data = NULL; + pad->descriptor = NULL; + g_free (pad->mapping_data); + pad->mapping_data = NULL; } - cpad->descriptor = - cpad->writer->get_descriptor (GST_PAD_PAD_TEMPLATE (pad), caps, - &cpad->write_func, &cpad->mapping_data); + pad->descriptor = + pad->writer->get_descriptor (GST_PAD_PAD_TEMPLATE (pad), caps, + &pad->write_func, &pad->mapping_data); - if (!cpad->descriptor) { + if (!pad->descriptor) { GST_ERROR_OBJECT (mux, "Couldn't get descriptor for pad '%s' with caps %" GST_PTR_FORMAT, GST_PAD_NAME (pad), caps); - gst_object_unref (mux); return FALSE; } if (mxf_uuid_is_zero (&d_instance_uid)) mxf_uuid_init (&d_instance_uid, mux->metadata); - memcpy (&MXF_METADATA_BASE (cpad->descriptor)->instance_uid, &d_instance_uid, + memcpy (&MXF_METADATA_BASE (pad->descriptor)->instance_uid, &d_instance_uid, 16); if (old_descriptor) { @@ -288,16 +334,16 @@ gst_mxf_mux_event_caps (GstPad * pad, GstCaps * caps) MXFMetadataBase *tmp = l->data; if (mxf_uuid_is_equal (&d_instance_uid, &tmp->instance_uid)) { - l->data = cpad->descriptor; + l->data = pad->descriptor; break; } } } else { - mux->metadata_list = g_list_prepend (mux->metadata_list, cpad->descriptor); + mux->metadata_list = g_list_prepend (mux->metadata_list, pad->descriptor); } g_hash_table_replace (mux->metadata, - &MXF_METADATA_BASE (cpad->descriptor)->instance_uid, cpad->descriptor); + &MXF_METADATA_BASE (pad->descriptor)->instance_uid, pad->descriptor); if (old_descriptor) { if (mux->preface && mux->preface->content_storage && @@ -326,48 +372,52 @@ gst_mxf_mux_event_caps (GstPad * pad, GstCaps * caps) if (tmp->sub_descriptors[j] == MXF_METADATA_GENERIC_DESCRIPTOR (old_descriptor)) { tmp->sub_descriptors[j] = - MXF_METADATA_GENERIC_DESCRIPTOR (cpad->descriptor); + MXF_METADATA_GENERIC_DESCRIPTOR (pad->descriptor); memcpy (&tmp->sub_descriptors_uids[j], &d_instance_uid, 16); } } } else if (package->descriptor == MXF_METADATA_GENERIC_DESCRIPTOR (old_descriptor)) { package->descriptor = - MXF_METADATA_GENERIC_DESCRIPTOR (cpad->descriptor); + MXF_METADATA_GENERIC_DESCRIPTOR (pad->descriptor); memcpy (&package->descriptor_uid, &d_instance_uid, 16); } } } } - gst_object_unref (mux); - return ret; } static gboolean -gst_mxf_mux_handle_sink_event (GstCollectPads * pads, GstCollectData * data, - GstEvent * event, gpointer user_data) +gst_mxf_mux_sink_event (GstAggregator * aggregator, + GstAggregatorPad * aggpad, GstEvent * event) { - GstCaps *caps; + GstMXFMux *mux = GST_MXF_MUX (aggregator); gboolean ret = TRUE; switch (GST_EVENT_TYPE (event)) { case GST_EVENT_TAG: /* TODO: do something with the tags */ break; - case GST_EVENT_CAPS: + case GST_EVENT_CAPS:{ + GstCaps *caps; + gst_event_parse_caps (event, &caps); - gst_mxf_mux_event_caps (data->pad, caps); + ret = gst_mxf_mux_set_caps (mux, GST_MXF_MUX_PAD (aggpad), caps); + break; + } default: break; } - /* now GstCollectPads can take care of the rest, e.g. EOS */ + /* now GstAggregator can take care of the rest, e.g. EOS */ if (ret) - ret = gst_collect_pads_event_default (pads, data, event, FALSE); + ret = + GST_AGGREGATOR_CLASS (parent_class)->sink_event (aggregator, aggpad, + event); return ret; } @@ -384,13 +434,12 @@ gst_mxf_mux_create_pad_name (GstPadTemplate * templ, guint id) return g_string_free (string, FALSE); } -static GstPad * -gst_mxf_mux_request_new_pad (GstElement * element, +static GstAggregatorPad * +gst_mxf_mux_create_new_pad (GstAggregator * aggregator, GstPadTemplate * templ, const gchar * pad_name, const GstCaps * caps) { - GstMXFMux *mux = GST_MXF_MUX (element); - GstMXFMuxPad *cpad; - GstPad *pad = NULL; + GstMXFMux *mux = GST_MXF_MUX (aggregator); + GstMXFMuxPad *pad; guint pad_number; gchar *name = NULL; const MXFEssenceElementWriter *writer; @@ -409,58 +458,52 @@ gst_mxf_mux_request_new_pad (GstElement * element, name = gst_mxf_mux_create_pad_name (templ, pad_number); GST_DEBUG_OBJECT (mux, "Creating pad '%s'", name); - pad = gst_pad_new_from_template (templ, name); + pad = + g_object_new (GST_TYPE_MXF_MUX_PAD, "name", name, "direction", + GST_PAD_SINK, "template", templ, NULL); g_free (name); - cpad = (GstMXFMuxPad *) - gst_collect_pads_add_pad (mux->collect, pad, sizeof (GstMXFMuxPad), NULL, - TRUE); - cpad->last_timestamp = 0; - cpad->adapter = gst_adapter_new (); - cpad->writer = writer; + pad->last_timestamp = 0; + pad->adapter = gst_adapter_new (); + pad->writer = writer; - gst_pad_use_fixed_caps (pad); - gst_pad_set_active (pad, TRUE); - gst_element_add_pad (element, pad); + gst_pad_use_fixed_caps (GST_PAD_CAST (pad)); - return pad; -} - -static void -gst_mxf_mux_release_pad (GstElement * element, GstPad * pad) -{ - /*GstMXFMux *mux = GST_MXF_MUX (GST_PAD_PARENT (pad)); - GstMXFMuxPad *cpad = (GstMXFMuxPad *) gst_pad_get_element_private (pad); - - g_object_unref (cpad->adapter); - g_free (cpad->mapping_data); - - gst_collect_pads_remove_pad (mux->collect, pad); - gst_element_remove_pad (element, pad); */ + return GST_AGGREGATOR_PAD (pad); } static GstFlowReturn gst_mxf_mux_create_metadata (GstMXFMux * mux) { GstFlowReturn ret = GST_FLOW_OK; - GSList *l; + GList *l; GArray *tmp; GST_DEBUG_OBJECT (mux, "Creating MXF metadata"); - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + GST_OBJECT_LOCK (mux); + + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; GstCaps *caps; + GstBuffer *buffer; - if (!cpad || !cpad->descriptor) + if (!pad || !pad->descriptor) { + GST_OBJECT_UNLOCK (mux); return GST_FLOW_ERROR; + } - caps = gst_pad_get_current_caps (cpad->collect.pad); - if (!caps) + caps = gst_pad_get_current_caps (GST_PAD_CAST (pad)); + if (!caps) { + GST_OBJECT_UNLOCK (mux); return GST_FLOW_ERROR; + } - if (cpad->writer->update_descriptor) - cpad->writer->update_descriptor (cpad->descriptor, - caps, cpad->mapping_data, cpad->collect.buffer); + buffer = gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad)); + if (pad->writer->update_descriptor) + pad->writer->update_descriptor (pad->descriptor, + caps, pad->mapping_data, buffer); + if (buffer) + gst_buffer_unref (buffer); gst_caps_unref (caps); } @@ -481,17 +524,19 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) TRUE, FALSE); tmp = g_array_new (FALSE, FALSE, sizeof (MXFUL)); - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; guint i; gboolean found = FALSE; - if (!cpad || !cpad->descriptor || - mxf_ul_is_zero (&cpad->descriptor->essence_container)) + if (!pad || !pad->descriptor || + mxf_ul_is_zero (&pad->descriptor->essence_container)) { + GST_OBJECT_UNLOCK (mux); return GST_FLOW_ERROR; + } for (i = 0; i < tmp->len; i++) { - if (mxf_ul_is_equal (&cpad->descriptor->essence_container, + if (mxf_ul_is_equal (&pad->descriptor->essence_container, &g_array_index (tmp, MXFUL, i))) { found = TRUE; break; @@ -501,7 +546,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) if (found) continue; - g_array_append_val (tmp, cpad->descriptor->essence_container); + g_array_append_val (tmp, pad->descriptor->essence_container); } mux->preface->n_essence_containers = tmp->len; mux->preface->essence_containers = (MXFUL *) g_array_free (tmp, FALSE); @@ -612,7 +657,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) memcpy (&p->parent.package_modified_date, &mux->preface->last_modified_date, sizeof (MXFTimestamp)); - p->parent.n_tracks = g_slist_length (mux->collect->data); + p->parent.n_tracks = GST_ELEMENT_CAST (mux)->numsinkpads; p->parent.tracks = g_new0 (MXFMetadataTrack *, p->parent.n_tracks); if (p->parent.n_tracks > 1) { @@ -636,12 +681,13 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) guint n = 0; /* Essence tracks */ - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; MXFMetadataTimelineTrack *track; MXFMetadataSequence *sequence; MXFMetadataSourceClip *clip; GstCaps *caps; + GstBuffer *buffer; p->parent.tracks[n] = (MXFMetadataTrack *) g_object_new (MXF_TYPE_METADATA_TIMELINE_TRACK, NULL); @@ -652,15 +698,17 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) &MXF_METADATA_BASE (track)->instance_uid, track); mux->metadata_list = g_list_prepend (mux->metadata_list, track); - caps = gst_pad_get_current_caps (cpad->collect.pad); + caps = gst_pad_get_current_caps (GST_PAD_CAST (pad)); + buffer = gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad)); track->parent.track_id = n + 1; track->parent.track_number = - cpad->writer->get_track_number_template (cpad->descriptor, - caps, cpad->mapping_data); + pad->writer->get_track_number_template (pad->descriptor, + caps, pad->mapping_data); - cpad->writer->get_edit_rate (cpad->descriptor, - caps, cpad->mapping_data, - cpad->collect.buffer, p, track, &track->edit_rate); + pad->writer->get_edit_rate (pad->descriptor, + caps, pad->mapping_data, buffer, p, track, &track->edit_rate); + if (buffer) + gst_buffer_unref (buffer); gst_caps_unref (caps); sequence = track->parent.sequence = (MXFMetadataSequence *) @@ -671,7 +719,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) &MXF_METADATA_BASE (sequence)->instance_uid, sequence); mux->metadata_list = g_list_prepend (mux->metadata_list, sequence); - memcpy (&sequence->data_definition, &cpad->writer->data_definition, + memcpy (&sequence->data_definition, &pad->writer->data_definition, 16); sequence->n_structural_components = 1; @@ -692,15 +740,15 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) 16); clip->start_position = 0; - cpad->source_package = p; - cpad->source_track = track; - cpad->descriptor->linked_track_id = n + 1; + pad->source_package = p; + pad->source_track = track; + pad->descriptor->linked_track_id = n + 1; if (p->parent.n_tracks == 1) { - p->descriptor = (MXFMetadataGenericDescriptor *) cpad->descriptor; + p->descriptor = (MXFMetadataGenericDescriptor *) pad->descriptor; } else { MXF_METADATA_MULTIPLE_DESCRIPTOR (p-> descriptor)->sub_descriptors[n] = - (MXFMetadataGenericDescriptor *) cpad->descriptor; + (MXFMetadataGenericDescriptor *) pad->descriptor; } n++; @@ -732,7 +780,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) memcpy (&p->package_modified_date, &mux->preface->last_modified_date, sizeof (MXFTimestamp)); - p->n_tracks = g_slist_length (mux->collect->data) + 1; + p->n_tracks = GST_ELEMENT_CAST (mux)->numsinkpads + 1; p->tracks = g_new0 (MXFMetadataTrack *, p->n_tracks); /* Tracks */ @@ -741,9 +789,10 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) n = 1; /* Essence tracks */ - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; GstCaps *caps; + GstBuffer *buffer; MXFMetadataSourcePackage *source_package; MXFMetadataTimelineTrack *track, *source_track; MXFMetadataSequence *sequence; @@ -766,11 +815,13 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) track->parent.track_id = n + 1; track->parent.track_number = 0; - caps = gst_pad_get_current_caps (cpad->collect.pad); - cpad->writer->get_edit_rate (cpad->descriptor, - caps, cpad->mapping_data, - cpad->collect.buffer, source_package, source_track, - &track->edit_rate); + caps = gst_pad_get_current_caps (GST_PAD_CAST (pad)); + buffer = gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad)); + pad->writer->get_edit_rate (pad->descriptor, + caps, pad->mapping_data, + buffer, source_package, source_track, &track->edit_rate); + if (buffer) + gst_buffer_unref (buffer); gst_caps_unref (caps); if (track->edit_rate.n != source_track->edit_rate.n || @@ -781,6 +832,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) if (track->edit_rate.d <= 0 || track->edit_rate.n <= 0) { GST_ERROR_OBJECT (mux, "Invalid edit rate"); + GST_OBJECT_UNLOCK (mux); return GST_FLOW_ERROR; } @@ -799,7 +851,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) &MXF_METADATA_BASE (sequence)->instance_uid, sequence); mux->metadata_list = g_list_prepend (mux->metadata_list, sequence); - memcpy (&sequence->data_definition, &cpad->writer->data_definition, + memcpy (&sequence->data_definition, &pad->writer->data_definition, 16); sequence->n_structural_components = 1; sequence->structural_components = @@ -977,6 +1029,8 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) } } + GST_OBJECT_UNLOCK (mux); + mux->metadata_list = g_list_reverse (mux->metadata_list); return ret; @@ -985,7 +1039,7 @@ gst_mxf_mux_create_metadata (GstMXFMux * mux) static GstFlowReturn gst_mxf_mux_init_partition_pack (GstMXFMux * mux) { - GSList *l; + GList *l; guint i = 0; mxf_partition_pack_reset (&mux->partition); @@ -1006,17 +1060,18 @@ gst_mxf_mux_init_partition_pack (GstMXFMux * mux) memcpy (&mux->partition.operational_pattern, &mux->preface->operational_pattern, 16); - mux->partition.n_essence_containers = g_slist_length (mux->collect->data); + GST_OBJECT_LOCK (mux); + mux->partition.n_essence_containers = GST_ELEMENT_CAST (mux)->numsinkpads; mux->partition.essence_containers = g_new0 (MXFUL, mux->partition.n_essence_containers); - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; guint j; gboolean found = FALSE; for (j = 0; j <= i; j++) { - if (mxf_ul_is_equal (&cpad->descriptor->essence_container, + if (mxf_ul_is_equal (&pad->descriptor->essence_container, &mux->partition.essence_containers[j])) { found = TRUE; break; @@ -1027,10 +1082,11 @@ gst_mxf_mux_init_partition_pack (GstMXFMux * mux) continue; memcpy (&mux->partition.essence_containers[i], - &cpad->descriptor->essence_container, 16); + &pad->descriptor->essence_container, 16); i++; } mux->partition.n_essence_containers = i; + GST_OBJECT_UNLOCK (mux); return GST_FLOW_OK; } @@ -1090,53 +1146,56 @@ static const guint8 _gc_essence_element_ul[] = { }; static GstFlowReturn -gst_mxf_mux_handle_buffer (GstMXFMux * mux, GstMXFMuxPad * cpad) +gst_mxf_mux_handle_buffer (GstMXFMux * mux, GstMXFMuxPad * pad) { - GstBuffer *buf = NULL; + GstBuffer *buf = gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad)); GstBuffer *outbuf = NULL; GstBuffer *packet; GstMapInfo map; GstMapInfo readmap; GstFlowReturn ret = GST_FLOW_OK; guint8 slen, ber[9]; - gboolean flush = ((cpad->collect.state & GST_COLLECT_PADS_STATE_EOS) - && !cpad->have_complete_edit_unit && cpad->collect.buffer == NULL); + gboolean flush = gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)) + && !pad->have_complete_edit_unit && buf == NULL; - if (cpad->have_complete_edit_unit) { - GST_DEBUG_OBJECT (cpad->collect.pad, + if (pad->have_complete_edit_unit) { + GST_DEBUG_OBJECT (pad, "Handling remaining buffer for track %u at position %" G_GINT64_FORMAT, - cpad->source_track->parent.track_id, cpad->pos); + pad->source_track->parent.track_id, pad->pos); + if (buf) + gst_buffer_unref (buf); buf = NULL; } else if (!flush) { - buf = gst_collect_pads_pop (mux->collect, &cpad->collect); + if (buf) + gst_buffer_unref (buf); + buf = gst_aggregator_pad_steal_buffer (GST_AGGREGATOR_PAD (pad)); } if (buf) { - GST_DEBUG_OBJECT (cpad->collect.pad, + GST_DEBUG_OBJECT (pad, "Handling buffer of size %" G_GSIZE_FORMAT " for track %u at position %" G_GINT64_FORMAT, gst_buffer_get_size (buf), - cpad->source_track->parent.track_id, cpad->pos); + pad->source_track->parent.track_id, pad->pos); } else { flush = TRUE; - GST_DEBUG_OBJECT (cpad->collect.pad, + GST_DEBUG_OBJECT (pad, "Flushing for track %u at position %" G_GINT64_FORMAT, - cpad->source_track->parent.track_id, cpad->pos); + pad->source_track->parent.track_id, pad->pos); } - ret = cpad->write_func (buf, - cpad->mapping_data, cpad->adapter, &outbuf, flush); + ret = pad->write_func (buf, pad->mapping_data, pad->adapter, &outbuf, flush); if (ret != GST_FLOW_OK && ret != GST_FLOW_CUSTOM_SUCCESS) { - GST_ERROR_OBJECT (cpad->collect.pad, + GST_ERROR_OBJECT (pad, "Failed handling buffer for track %u, reason %s", - cpad->source_track->parent.track_id, gst_flow_get_name (ret)); + pad->source_track->parent.track_id, gst_flow_get_name (ret)); return ret; } if (ret == GST_FLOW_CUSTOM_SUCCESS) { - cpad->have_complete_edit_unit = TRUE; + pad->have_complete_edit_unit = TRUE; ret = GST_FLOW_OK; } else { - cpad->have_complete_edit_unit = FALSE; + pad->have_complete_edit_unit = FALSE; } buf = outbuf; @@ -1148,29 +1207,29 @@ gst_mxf_mux_handle_buffer (GstMXFMux * mux, GstMXFMuxPad * cpad) packet = gst_buffer_new_and_alloc (16 + slen + readmap.size); gst_buffer_map (packet, &map, GST_MAP_WRITE); memcpy (map.data, _gc_essence_element_ul, 16); - GST_WRITE_UINT32_BE (map.data + 12, cpad->source_track->parent.track_number); + GST_WRITE_UINT32_BE (map.data + 12, pad->source_track->parent.track_number); memcpy (map.data + 16, ber, slen); memcpy (map.data + 16 + slen, readmap.data, readmap.size); gst_buffer_unmap (buf, &readmap); gst_buffer_unref (buf); - GST_DEBUG_OBJECT (cpad->collect.pad, + GST_DEBUG_OBJECT (pad, "Pushing buffer of size %" G_GSIZE_FORMAT " for track %u", map.size, - cpad->source_track->parent.track_id); + pad->source_track->parent.track_id); gst_buffer_unmap (packet, &map); if ((ret = gst_mxf_mux_push (mux, packet)) != GST_FLOW_OK) { - GST_ERROR_OBJECT (cpad->collect.pad, + GST_ERROR_OBJECT (pad, "Failed pushing buffer for track %u, reason %s", - cpad->source_track->parent.track_id, gst_flow_get_name (ret)); + pad->source_track->parent.track_id, gst_flow_get_name (ret)); return ret; } - cpad->pos++; - cpad->last_timestamp = - gst_util_uint64_scale (GST_SECOND * cpad->pos, - cpad->source_track->edit_rate.d, cpad->source_track->edit_rate.n); + pad->pos++; + pad->last_timestamp = + gst_util_uint64_scale (GST_SECOND * pad->pos, + pad->source_track->edit_rate.d, pad->source_track->edit_rate.n); return ret; } @@ -1198,7 +1257,7 @@ gst_mxf_mux_write_body_partition (GstMXFMux * mux) static GstFlowReturn gst_mxf_mux_handle_eos (GstMXFMux * mux) { - GSList *l; + GList *l; gboolean have_data = FALSE; GstBuffer *packet; @@ -1207,33 +1266,49 @@ gst_mxf_mux_handle_eos (GstMXFMux * mux) have_data = FALSE; - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + GST_OBJECT_LOCK (mux); + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; + GstBuffer *buffer = + gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad)); + GstClockTime next_gc_timestamp = gst_util_uint64_scale ((mux->last_gc_position + 1) * GST_SECOND, mux->min_edit_rate.d, mux->min_edit_rate.n); + if (best) + gst_object_unref (best); best = NULL; - if (cpad->have_complete_edit_unit || - gst_adapter_available (cpad->adapter) > 0 || cpad->collect.buffer) { + if (pad->have_complete_edit_unit || + gst_adapter_available (pad->adapter) > 0 || buffer) { have_data = TRUE; - if (cpad->last_timestamp < next_gc_timestamp) { - best = cpad; + if (pad->last_timestamp < next_gc_timestamp) { + if (best) + gst_object_unref (best); + best = gst_object_ref (pad); + if (buffer) + gst_buffer_unref (buffer); break; } } + if (buffer) + gst_buffer_unref (buffer); if (have_data && !l->next) { mux->last_gc_position++; mux->last_gc_timestamp = next_gc_timestamp; + if (best) + gst_object_unref (best); best = NULL; break; } } + GST_OBJECT_UNLOCK (mux); if (best) { gst_mxf_mux_handle_buffer (mux, best); + gst_object_unref (best); have_data = TRUE; } } while (have_data); @@ -1244,14 +1319,15 @@ gst_mxf_mux_handle_eos (GstMXFMux * mux) mux->min_edit_rate.d, mux->min_edit_rate.n); /* Update essence track durations */ - for (l = mux->collect->data; l; l = l->next) { - GstMXFMuxPad *cpad = l->data; + GST_OBJECT_LOCK (mux); + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { + GstMXFMuxPad *pad = l->data; guint i; /* Update durations */ - cpad->source_track->parent.sequence->duration = cpad->pos; - MXF_METADATA_SOURCE_CLIP (cpad->source_track->parent. - sequence->structural_components[0])->parent.duration = cpad->pos; + pad->source_track->parent.sequence->duration = pad->pos; + MXF_METADATA_SOURCE_CLIP (pad->source_track->parent. + sequence->structural_components[0])->parent.duration = pad->pos; for (i = 0; i < mux->preface->content_storage->packages[0]->n_tracks; i++) { MXFMetadataTimelineTrack *track; @@ -1267,12 +1343,13 @@ gst_mxf_mux_handle_eos (GstMXFMux * mux) content_storage->packages[0]->tracks[i]); if (MXF_METADATA_SOURCE_CLIP (track->parent. sequence->structural_components[0])->source_track_id == - cpad->source_track->parent.track_id) { - track->parent.sequence->structural_components[0]->duration = cpad->pos; - track->parent.sequence->duration = cpad->pos; + pad->source_track->parent.track_id) { + track->parent.sequence->structural_components[0]->duration = pad->pos; + track->parent.sequence->duration = pad->pos; } } } + GST_OBJECT_UNLOCK (mux); /* Update timecode track duration */ { @@ -1329,7 +1406,8 @@ gst_mxf_mux_handle_eos (GstMXFMux * mux) /* Rewrite header partition with updated values */ gst_segment_init (&segment, GST_FORMAT_BYTES); - if (gst_pad_push_event (mux->srcpad, gst_event_new_segment (&segment))) { + if (gst_pad_push_event (GST_AGGREGATOR_SRC_PAD (mux), + gst_event_new_segment (&segment))) { mux->offset = 0; mux->partition.type = MXF_PARTITION_PACK_HEADER; mux->partition.closed = TRUE; @@ -1372,14 +1450,24 @@ _sort_mux_pads (gconstpointer a, gconstpointer b) pa->source_track->parent.track_number; } -static GstFlowReturn -gst_mxf_mux_collected (GstCollectPads * pads, gpointer user_data) + +static gboolean +gst_mxf_mux_stop (GstAggregator * aggregator) { - GstMXFMux *mux = GST_MXF_MUX (user_data); + GstMXFMux *mux = GST_MXF_MUX (aggregator); + + gst_mxf_mux_reset (mux); + + return TRUE; +} + +static GstFlowReturn +gst_mxf_mux_aggregate (GstAggregator * aggregator, gboolean timeout) +{ + GstMXFMux *mux = GST_MXF_MUX (aggregator); GstMXFMuxPad *best = NULL; GstFlowReturn ret; - GstSegment segment; - GSList *sl; + GList *l; gboolean eos = TRUE; if (mux->state == GST_MXF_MUX_STATE_ERROR) { @@ -1392,41 +1480,32 @@ gst_mxf_mux_collected (GstCollectPads * pads, gpointer user_data) if (mux->state == GST_MXF_MUX_STATE_HEADER) { GstCaps *caps; - gchar s_id[32]; - if (mux->collect->data == NULL) { + if (GST_ELEMENT_CAST (mux)->sinkpads == NULL) { GST_ELEMENT_ERROR (mux, STREAM, MUX, (NULL), ("No input streams configured")); ret = GST_FLOW_ERROR; goto error; } - /* stream-start (FIXME: create id based on input ids) */ - g_snprintf (s_id, sizeof (s_id), "mxfmux-%08x", g_random_int ()); - gst_pad_push_event (mux->srcpad, gst_event_new_stream_start (s_id)); - caps = gst_caps_new_empty_simple ("application/mxf"); - gst_pad_set_caps (mux->srcpad, caps); + gst_aggregator_set_src_caps (GST_AGGREGATOR (mux), caps); gst_caps_unref (caps); - gst_segment_init (&segment, GST_FORMAT_BYTES); - if (gst_pad_push_event (mux->srcpad, gst_event_new_segment (&segment))) { - if ((ret = gst_mxf_mux_create_metadata (mux)) != GST_FLOW_OK) - goto error; + if ((ret = gst_mxf_mux_create_metadata (mux)) != GST_FLOW_OK) + goto error; - if ((ret = gst_mxf_mux_init_partition_pack (mux)) != GST_FLOW_OK) - goto error; + if ((ret = gst_mxf_mux_init_partition_pack (mux)) != GST_FLOW_OK) + goto error; - ret = gst_mxf_mux_write_header_metadata (mux); - } else { - ret = GST_FLOW_ERROR; - } - - if (ret != GST_FLOW_OK) + if ((ret = gst_mxf_mux_write_header_metadata (mux)) != GST_FLOW_OK) goto error; /* Sort pads, we will always write in that order */ - mux->collect->data = g_slist_sort (mux->collect->data, _sort_mux_pads); + GST_OBJECT_LOCK (mux); + GST_ELEMENT_CAST (mux)->sinkpads = + g_list_sort (GST_ELEMENT_CAST (mux)->sinkpads, _sort_mux_pads); + GST_OBJECT_UNLOCK (mux); /* Write body partition */ ret = gst_mxf_mux_write_body_partition (mux); @@ -1438,43 +1517,64 @@ gst_mxf_mux_collected (GstCollectPads * pads, gpointer user_data) g_return_val_if_fail (g_hash_table_size (mux->metadata) > 0, GST_FLOW_ERROR); do { - for (sl = mux->collect->data; sl; sl = sl->next) { + GST_OBJECT_LOCK (mux); + for (l = GST_ELEMENT_CAST (mux)->sinkpads; l; l = l->next) { gboolean pad_eos; - GstMXFMuxPad *cpad = sl->data; + GstMXFMuxPad *pad = l->data; + GstBuffer *buffer; GstClockTime next_gc_timestamp = gst_util_uint64_scale ((mux->last_gc_position + 1) * GST_SECOND, mux->min_edit_rate.d, mux->min_edit_rate.n); - pad_eos = cpad->collect.state & GST_COLLECT_PADS_STATE_EOS; + pad_eos = gst_aggregator_pad_is_eos (GST_AGGREGATOR_PAD (pad)); if (!pad_eos) eos = FALSE; - if ((!pad_eos || cpad->have_complete_edit_unit || - gst_adapter_available (cpad->adapter) > 0 || cpad->collect.buffer) - && cpad->last_timestamp < next_gc_timestamp) { - best = cpad; + buffer = gst_aggregator_pad_get_buffer (GST_AGGREGATOR_PAD (pad)); + + if ((!pad_eos || pad->have_complete_edit_unit || + gst_adapter_available (pad->adapter) > 0 || buffer) + && pad->last_timestamp < next_gc_timestamp) { + if (best) + gst_object_unref (best); + if (buffer) + gst_buffer_unref (buffer); + best = gst_object_ref (pad); break; - } else if (!eos && !sl->next) { + } else if (!eos && !l->next) { mux->last_gc_position++; mux->last_gc_timestamp = next_gc_timestamp; eos = FALSE; + if (best) + gst_object_unref (best); + if (buffer) + gst_buffer_unref (buffer); best = NULL; break; } + if (buffer) + gst_buffer_unref (buffer); } + GST_OBJECT_UNLOCK (mux); } while (!eos && best == NULL); if (!eos && best) { ret = gst_mxf_mux_handle_buffer (mux, best); + gst_object_unref (best); if (ret != GST_FLOW_OK) goto error; } else if (eos) { GST_DEBUG_OBJECT (mux, "Handling EOS"); + if (best) + gst_object_unref (best); + gst_mxf_mux_handle_eos (mux); - gst_pad_push_event (mux->srcpad, gst_event_new_eos ()); mux->state = GST_MXF_MUX_STATE_EOS; return GST_FLOW_EOS; + } else { + if (best) + gst_object_unref (best); } return GST_FLOW_OK; @@ -1482,45 +1582,6 @@ gst_mxf_mux_collected (GstCollectPads * pads, gpointer user_data) error: { mux->state = GST_MXF_MUX_STATE_ERROR; - gst_pad_push_event (mux->srcpad, gst_event_new_eos ()); return ret; } } - -static GstStateChangeReturn -gst_mxf_mux_change_state (GstElement * element, GstStateChange transition) -{ - GstStateChangeReturn ret; - GstMXFMux *mux = GST_MXF_MUX (element); - - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - break; - case GST_STATE_CHANGE_READY_TO_PAUSED: - gst_collect_pads_start (mux->collect); - break; - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_collect_pads_stop (mux->collect); - break; - default: - break; - } - - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_mxf_mux_reset (mux); - break; - case GST_STATE_CHANGE_READY_TO_NULL: - break; - default: - break; - } - - return ret; -} diff --git a/gst/mxf/mxfmux.h b/gst/mxf/mxfmux.h index 7b83427374..8cb7867511 100644 --- a/gst/mxf/mxfmux.h +++ b/gst/mxf/mxfmux.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include "mxfessence.h" @@ -39,26 +39,6 @@ G_BEGIN_DECLS #define GST_IS_MXF_MUX_CLASS(klass) \ (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_MXF_MUX)) -typedef struct -{ - GstCollectData collect; - - guint64 pos; - GstClockTime last_timestamp; - - MXFMetadataFileDescriptor *descriptor; - - GstAdapter *adapter; - gboolean have_complete_edit_unit; - - gpointer mapping_data; - const MXFEssenceElementWriter *writer; - MXFEssenceElementWriteFunc write_func; - - MXFMetadataSourcePackage *source_package; - MXFMetadataTimelineTrack *source_track; -} GstMXFMuxPad; - typedef enum { GST_MXF_MUX_STATE_HEADER, @@ -68,14 +48,9 @@ typedef enum } GstMXFMuxState; typedef struct _GstMXFMux { - GstElement element; - - GstPad *srcpad; - GstCollectPads *collect; + GstAggregator parent; /* */ - GstPadEventFunction collect_event; - GstMXFMuxState state; guint n_pads; @@ -83,7 +58,7 @@ typedef struct _GstMXFMux { MXFPartitionPack partition; MXFPrimerPack primer; - + GHashTable *metadata; GList *metadata_list; MXFMetadataPreface *preface; @@ -96,7 +71,7 @@ typedef struct _GstMXFMux { } GstMXFMux; typedef struct _GstMXFMuxClass { - GstElementClass parent; + GstAggregatorClass parent; } GstMXFMuxClass; GType gst_mxf_mux_get_type (void);