diff --git a/gst/mpegtsdemux/mpegtsbase.c b/gst/mpegtsdemux/mpegtsbase.c index 0ca72538c5..82243370ca 100644 --- a/gst/mpegtsdemux/mpegtsbase.c +++ b/gst/mpegtsdemux/mpegtsbase.c @@ -1077,7 +1077,7 @@ mpegts_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) break; case GST_EVENT_EOS: res = GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, event); - res = gst_mpegts_base_handle_eos (base); + res &= gst_mpegts_base_handle_eos (base); break; case GST_EVENT_CAPS: /* FIXME, do something */ @@ -1165,6 +1165,9 @@ mpegts_base_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) goto next; } + if (klass->inspect_packet) + klass->inspect_packet (base, &packet); + /* If it's a known PES, push it */ if (MPEGTS_BIT_IS_SET (base->is_pes, packet.pid)) { /* push the packet downstream */ diff --git a/gst/mpegtsdemux/mpegtsbase.h b/gst/mpegtsdemux/mpegtsbase.h index 6b2d7d3047..c427bd711f 100644 --- a/gst/mpegtsdemux/mpegtsbase.h +++ b/gst/mpegtsdemux/mpegtsbase.h @@ -166,6 +166,7 @@ struct _MpegTSBaseClass { /* Virtual methods */ void (*reset) (MpegTSBase *base); GstFlowReturn (*push) (MpegTSBase *base, MpegTSPacketizerPacket *packet, GstMpegtsSection * section); + void (*inspect_packet) (MpegTSBase *base, MpegTSPacketizerPacket *packet); /* takes ownership of @event */ gboolean (*push_event) (MpegTSBase *base, GstEvent * event); diff --git a/gst/mpegtsdemux/mpegtsparse.c b/gst/mpegtsdemux/mpegtsparse.c index 87281b27c3..132d3b4086 100644 --- a/gst/mpegtsdemux/mpegtsparse.c +++ b/gst/mpegtsdemux/mpegtsparse.c @@ -34,8 +34,8 @@ #include "mpegtsparse.h" #include "gstmpegdesc.h" -/* latency in mseconds */ -#define TS_LATENCY 700 +/* latency in mseconds is maximum 100 ms between PCR */ +#define TS_LATENCY 100 #define TABLE_ID_UNSET 0xFF #define RUNNING_STATUS_RUNNING 4 @@ -80,10 +80,18 @@ GST_STATIC_PAD_TEMPLATE ("program_%u", GST_PAD_SRC, enum { - ARG_0, + PROP_0, + PROP_SET_TIMESTAMPS, + PROP_SMOOTHING_LATENCY, + PROP_PCR_PID, /* FILL ME */ }; +static void mpegts_parse_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void mpegts_parse_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + static void mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program); static void @@ -92,6 +100,8 @@ mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program); static GstFlowReturn mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, GstMpegtsSection * section); +static void mpegts_parse_inspect_packet (MpegTSBase * base, + MpegTSPacketizerPacket * packet); static MpegTSParsePad *mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * name); @@ -111,13 +121,34 @@ G_DEFINE_TYPE (MpegTSParse2, mpegts_parse, GST_TYPE_MPEGTS_BASE); static void mpegts_parse_reset (MpegTSBase * base); static GstFlowReturn mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer); +static GstFlowReturn +drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all); static void mpegts_parse_class_init (MpegTSParse2Class * klass) { + GObjectClass *gobject_class = (GObjectClass *) (klass); GstElementClass *element_class; MpegTSBaseClass *ts_class; + gobject_class->set_property = mpegts_parse_set_property; + gobject_class->get_property = mpegts_parse_get_property; + + g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS, + g_param_spec_boolean ("set-timestamps", + "Timestamp (or re-timestamp) the output stream", + "If set, timestamps will be set on the output buffers using " + "PCRs and smoothed over the smoothing-latency period", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_SMOOTHING_LATENCY, + g_param_spec_uint ("smoothing-latency", "Smoothing Latency", + "Additional latency in microseconds for smoothing jitter in input timestamps on live capture", + 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PCR_PID, + g_param_spec_int ("pcr-pid", "PID containing PCR", + "Set the PID to use for PCR values (-1 for auto)", + -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + element_class = GST_ELEMENT_CLASS (klass); element_class->pad_removed = mpegts_parse_pad_removed; element_class->request_new_pad = mpegts_parse_request_new_pad; @@ -141,6 +172,7 @@ mpegts_parse_class_init (MpegTSParse2Class * klass) ts_class->program_stopped = GST_DEBUG_FUNCPTR (mpegts_parse_program_stopped); ts_class->reset = GST_DEBUG_FUNCPTR (mpegts_parse_reset); ts_class->input_done = GST_DEBUG_FUNCPTR (mpegts_parse_input_done); + ts_class->inspect_packet = GST_DEBUG_FUNCPTR (mpegts_parse_inspect_packet); } static void @@ -153,8 +185,12 @@ mpegts_parse_init (MpegTSParse2 * parse) base->push_data = FALSE; base->push_section = FALSE; + parse->user_pcr_pid = parse->pcr_pid = -1; + parse->srcpad = gst_pad_new_from_static_template (&src_template, "src"); parse->first = TRUE; + gst_pad_set_query_function (parse->srcpad, + GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query)); gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad); parse->have_group_id = FALSE; @@ -164,6 +200,8 @@ mpegts_parse_init (MpegTSParse2 * parse) static void mpegts_parse_reset (MpegTSBase * base) { + MpegTSParse2 *parse = (MpegTSParse2 *) base; + /* Set the various know PIDs we are interested in */ /* CAT */ @@ -187,58 +225,120 @@ mpegts_parse_reset (MpegTSBase * base) /* SIT */ MPEGTS_BIT_SET (base->known_psi, 0x1f); - GST_MPEGTS_PARSE (base)->first = TRUE; - GST_MPEGTS_PARSE (base)->have_group_id = FALSE; - GST_MPEGTS_PARSE (base)->group_id = G_MAXUINT; + parse->first = TRUE; + parse->have_group_id = FALSE; + parse->group_id = G_MAXUINT; - g_list_free_full (GST_MPEGTS_PARSE (base)->pending_buffers, - (GDestroyNotify) gst_buffer_unref); - GST_MPEGTS_PARSE (base)->pending_buffers = NULL;; + g_list_free_full (parse->pending_buffers, (GDestroyNotify) gst_buffer_unref); + parse->pending_buffers = NULL; + + parse->current_pcr = GST_CLOCK_TIME_NONE; + parse->previous_pcr = GST_CLOCK_TIME_NONE; + parse->bytes_since_pcr = 0; + parse->pcr_pid = parse->user_pcr_pid; } static void +mpegts_parse_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + MpegTSParse2 *parse = (MpegTSParse2 *) object; + + switch (prop_id) { + case PROP_SET_TIMESTAMPS: + parse->set_timestamps = g_value_get_boolean (value); + break; + case PROP_SMOOTHING_LATENCY: + parse->smoothing_latency = GST_USECOND * g_value_get_uint (value); + break; + case PROP_PCR_PID: + parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static void +mpegts_parse_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + MpegTSParse2 *parse = (MpegTSParse2 *) object; + + switch (prop_id) { + case PROP_SET_TIMESTAMPS: + g_value_set_boolean (value, parse->set_timestamps); + break; + case PROP_SMOOTHING_LATENCY: + g_value_set_uint (value, parse->smoothing_latency / GST_USECOND); + break; + case PROP_PCR_PID: + g_value_set_int (value, parse->pcr_pid); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + } +} + +static gboolean prepare_src_pad (MpegTSBase * base, MpegTSParse2 * parse) { - if (base->packetizer->packet_size) { - GstEvent *event; - gchar *stream_id; - GstCaps *caps; + GstEvent *event; + gchar *stream_id; + GstCaps *caps; - stream_id = - gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base), - "multi-program"); + if (!parse->first) + return TRUE; - event = - gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START, - 0); - if (event) { - if (gst_event_parse_group_id (event, &parse->group_id)) - parse->have_group_id = TRUE; - else - parse->have_group_id = FALSE; - gst_event_unref (event); - } else if (!parse->have_group_id) { + /* If there's no packet_size yet, we can't set caps yet */ + if (G_UNLIKELY (base->packetizer->packet_size == 0)) + return FALSE; + + stream_id = + gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base), + "multi-program"); + + event = + gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START, + 0); + if (event) { + if (gst_event_parse_group_id (event, &parse->group_id)) parse->have_group_id = TRUE; - parse->group_id = gst_util_group_id_next (); - } - event = gst_event_new_stream_start (stream_id); - if (parse->have_group_id) - gst_event_set_group_id (event, parse->group_id); - - gst_pad_push_event (parse->srcpad, event); - g_free (stream_id); - - caps = gst_caps_new_simple ("video/mpegts", - "systemstream", G_TYPE_BOOLEAN, TRUE, - "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL); - - gst_pad_set_caps (parse->srcpad, caps); - gst_caps_unref (caps); - - gst_pad_push_event (parse->srcpad, gst_event_new_segment (&base->segment)); - - parse->first = FALSE; + else + parse->have_group_id = FALSE; + gst_event_unref (event); + } else if (!parse->have_group_id) { + parse->have_group_id = TRUE; + parse->group_id = gst_util_group_id_next (); } + event = gst_event_new_stream_start (stream_id); + if (parse->have_group_id) + gst_event_set_group_id (event, parse->group_id); + + gst_pad_push_event (parse->srcpad, event); + g_free (stream_id); + + caps = gst_caps_new_simple ("video/mpegts", + "systemstream", G_TYPE_BOOLEAN, TRUE, + "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL); + + gst_pad_set_caps (parse->srcpad, caps); + gst_caps_unref (caps); + + /* If setting output timestamps, ensure that the output segment is TIME */ + if (parse->set_timestamps == FALSE || base->segment.format == GST_FORMAT_TIME) + gst_pad_push_event (parse->srcpad, gst_event_new_segment (&base->segment)); + else { + GstSegment seg; + gst_segment_init (&seg, GST_FORMAT_TIME); + GST_DEBUG_OBJECT (parse, + "Generating time output segment %" GST_SEGMENT_FORMAT, &seg); + gst_pad_push_event (parse->srcpad, gst_event_new_segment (&seg)); + } + + parse->first = FALSE; + + return TRUE; } static gboolean @@ -255,6 +355,8 @@ push_event (MpegTSBase * base, GstEvent * event) } prepare_src_pad (base, parse); } + if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS)) + drain_pending_buffers (parse, TRUE); for (tmp = parse->srcpads; tmp; tmp = tmp->next) { GstPad *pad = (GstPad *) tmp->data; @@ -559,39 +661,210 @@ mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, return ret; } +static void +mpegts_parse_inspect_packet (MpegTSBase * base, MpegTSPacketizerPacket * packet) +{ + MpegTSParse2 *parse = GST_MPEGTS_PARSE (base); + GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p PCR %" + G_GUINT64_FORMAT, packet->pid, packet->payload_unit_start_indicator, + packet->scram_afc_cc & 0x30, + FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload, + packet->pcr); + + /* Store the PCR if desired */ + if (parse->current_pcr == GST_CLOCK_TIME_NONE && + packet->afc_flags & MPEGTS_AFC_PCR_FLAG) { + /* Take this as the pcr_pid if set to auto-select */ + if (parse->pcr_pid == -1) + parse->pcr_pid = packet->pid; + /* Check the PCR-PID matches the program we want for multiple programs */ + if (parse->pcr_pid == packet->pid) + parse->current_pcr = PCRTIME_TO_GSTTIME (packet->pcr); + } +} + +static GstClockTime +get_pending_timestamp_diff (MpegTSParse2 * parse) +{ + GList *l; + GstClockTime first_ts, last_ts; + + if (parse->pending_buffers == NULL) + return GST_CLOCK_TIME_NONE; + + l = g_list_last (parse->pending_buffers); + first_ts = GST_BUFFER_PTS (l->data); + if (first_ts == GST_CLOCK_TIME_NONE) + return GST_CLOCK_TIME_NONE; + + l = g_list_first (parse->pending_buffers); + last_ts = GST_BUFFER_PTS (l->data); + if (last_ts == GST_CLOCK_TIME_NONE) + return GST_CLOCK_TIME_NONE; + + return last_ts - first_ts; +} + +static GstFlowReturn +drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all) +{ + MpegTSBase *base = (MpegTSBase *) (parse); + GstFlowReturn ret = GST_FLOW_OK; + GstClockTime start_ts; + GstClockTime pcr = GST_CLOCK_TIME_NONE; + GstClockTime pcr_diff = 0; + gsize pcr_bytes, bytes_since_pcr, pos; + GstBuffer *buffer; + GList *l, *end = NULL; + + if (parse->pending_buffers == NULL) + return GST_FLOW_OK; /* Nothing to push */ + + /* + * There are 4 cases: + * 1 We get a buffer with no PCR -> it's the head of the list + * -> Do nothing, unless it's EOS + * 2 We get a buffer with a PCR, it's the first PCR we've seen, and belongs + * to the buffer at the head of the list + * -> Push any buffers in the list except the head, + * using a smoothing of their timestamps to land at the PCR + * -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer); + * 3 It's EOS (drain_all == TRUE, current_pcr == NONE) + * -> Push any buffers in the list using a smoothing of their timestamps + * starting at the previous PCR or first TS + * 4 We get a buffer with a PCR, and have a previous PCR + * -> If distance > smoothing_latency, + * output buffers except the last in the pending queue using + * piecewise-linear timestamps + * -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer); + */ + + /* Case 1 */ + if (!GST_CLOCK_TIME_IS_VALID (parse->current_pcr) && !drain_all) + return GST_FLOW_OK; + + if (GST_CLOCK_TIME_IS_VALID (parse->current_pcr)) { + pcr = mpegts_packetizer_pts_to_ts (base->packetizer, + parse->current_pcr, parse->pcr_pid); + parse->current_pcr = GST_CLOCK_TIME_NONE; + } + + /* The bytes of the last buffer are after the PCR */ + buffer = GST_BUFFER (g_list_nth_data (parse->pending_buffers, 0)); + bytes_since_pcr = gst_buffer_get_size (buffer); + + pcr_bytes = parse->bytes_since_pcr - bytes_since_pcr; + + if (!drain_all) + end = g_list_first (parse->pending_buffers); + + /* Case 2 */ + if (!GST_CLOCK_TIME_IS_VALID (parse->previous_pcr)) { + pcr_diff = get_pending_timestamp_diff (parse); + + /* Calculate the start_ts that ends at the end timestamp */ + start_ts = GST_CLOCK_TIME_NONE; + if (end) { + start_ts = GST_BUFFER_PTS (GST_BUFFER (end->data)); + if (start_ts > pcr_diff) + start_ts -= pcr_diff; + } + } else if (drain_all) { /* Case 3 */ + start_ts = parse->previous_pcr; + pcr_diff = get_pending_timestamp_diff (parse); + } else { /* Case 4 */ + start_ts = parse->previous_pcr; + if (pcr > start_ts) + pcr_diff = GST_CLOCK_DIFF (start_ts, pcr); + + /* Make sure PCR observations are sufficiently far apart */ + if (drain_all == FALSE && pcr_diff < parse->smoothing_latency) + return GST_FLOW_OK; + } + + GST_LOG_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT + " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes\n", + GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes); + + /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */ + pos = 0; + l = g_list_last (parse->pending_buffers); + while (l != end) { + GList *p; + GstClockTime out_ts = start_ts; + + buffer = gst_buffer_make_writable (GST_BUFFER (l->data)); + + if (out_ts != GST_CLOCK_TIME_NONE && pcr_diff != GST_CLOCK_TIME_NONE && + pcr_bytes && pos) + out_ts += gst_util_uint64_scale (pcr_diff, pos, pcr_bytes); + + pos += gst_buffer_get_size (buffer); + + GST_DEBUG_OBJECT (parse, + "InputTS %" GST_TIME_FORMAT " out %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (out_ts)); + + GST_BUFFER_PTS (buffer) = out_ts; + GST_BUFFER_DTS (buffer) = out_ts; + if (ret == GST_FLOW_OK) + ret = gst_pad_push (parse->srcpad, buffer); + else + gst_buffer_unref (buffer); + + /* Free this list node and move to the next */ + p = g_list_previous (l); + parse->pending_buffers = g_list_remove_link (parse->pending_buffers, l); + g_list_free_1 (l); + l = p; + } + + parse->pending_buffers = end; + parse->bytes_since_pcr = bytes_since_pcr; + parse->previous_pcr = pcr; + return ret; +} + static GstFlowReturn mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer) { MpegTSParse2 *parse = GST_MPEGTS_PARSE (base); GstFlowReturn ret = GST_FLOW_OK; - if (G_UNLIKELY (parse->first)) - prepare_src_pad (base, parse); + GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer); - if (G_UNLIKELY (parse->first)) { - parse->pending_buffers = g_list_append (parse->pending_buffers, buffer); - return GST_FLOW_OK; + if (parse->current_pcr != GST_CLOCK_TIME_NONE) { + GST_DEBUG_OBJECT (parse, + "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), + GST_TIME_ARGS (mpegts_packetizer_pts_to_ts (base->packetizer, + parse->current_pcr, parse->pcr_pid))); } - if (G_UNLIKELY (parse->pending_buffers)) { - GList *l; + if (parse->set_timestamps || parse->first) { + parse->pending_buffers = g_list_prepend (parse->pending_buffers, buffer); + parse->bytes_since_pcr += gst_buffer_get_size (buffer); + buffer = NULL; + } - for (l = parse->pending_buffers; l; l = l->next) { - if (ret == GST_FLOW_OK) - ret = gst_pad_push (parse->srcpad, l->data); - else - gst_buffer_unref (l->data); - } - g_list_free (parse->pending_buffers); - parse->pending_buffers = NULL; + if (!prepare_src_pad (base, parse)) + return GST_FLOW_OK; + if (parse->pending_buffers != NULL) { + /* Don't keep pending_buffers if not setting output timestamps */ + gboolean drain_all = (parse->set_timestamps == FALSE); + ret = drain_pending_buffers (parse, drain_all); if (ret != GST_FLOW_OK) { - gst_buffer_unref (buffer); + if (buffer) + gst_buffer_unref (buffer); return ret; } } - return gst_pad_push (parse->srcpad, buffer); + if (buffer != NULL) + ret = gst_pad_push (parse->srcpad, buffer); + + return ret; } static MpegTSParsePad * @@ -656,14 +929,17 @@ mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query) gst_query_parse_latency (query, &is_live, &min_latency, &max_latency); if (is_live) { - min_latency += TS_LATENCY * GST_MSECOND; + GstClockTime extra_latency = TS_LATENCY * GST_MSECOND; + if (parse->set_timestamps) { + extra_latency = MAX (extra_latency, parse->smoothing_latency); + } + min_latency += extra_latency; if (max_latency != GST_CLOCK_TIME_NONE) - max_latency += TS_LATENCY * GST_MSECOND; + max_latency += extra_latency; } gst_query_set_latency (query, is_live, min_latency, max_latency); } - break; } default: diff --git a/gst/mpegtsdemux/mpegtsparse.h b/gst/mpegtsdemux/mpegtsparse.h index 6848836d5a..b1253bfc72 100644 --- a/gst/mpegtsdemux/mpegtsparse.h +++ b/gst/mpegtsdemux/mpegtsparse.h @@ -51,6 +51,11 @@ struct _MpegTSParse2 { gboolean have_group_id; guint group_id; + GstClockTime smoothing_latency; + GstClockTime current_pcr; + gint user_pcr_pid; + gint pcr_pid; + /* Always present source pad */ GstPad *srcpad; @@ -58,7 +63,12 @@ struct _MpegTSParse2 { /* state */ gboolean first; + gboolean set_timestamps; + + /* Pending buffer state */ GList *pending_buffers; + GstClockTime previous_pcr; + guint bytes_since_pcr; }; struct _MpegTSParse2Class {