splitmuxsink: fix sink pad release while PLAYING

- Release the split mux lock while removing the probes

- Flush the sinkpad to unblock other pads

- Turn check_completed_gop into a do while statement, when
  waking up we want to recheck whether the current GOP is
  ready for sending

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/719>
This commit is contained in:
Mathieu Duponchelle 2020-09-08 20:57:33 +02:00
parent 8f684913cf
commit 19860200ed

View file

@ -2453,64 +2453,66 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
return;
}
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
gboolean ready = TRUE;
do {
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
gboolean ready = TRUE;
/* Iterate each pad, and check that the input running time is at least
* up to the reference running time, and if so handle the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
GST_STIME_FORMAT " ctx %p",
GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
/* Iterate each pad, and check that the input running time is at least
* up to the reference running time, and if so handle the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
GST_STIME_FORMAT " ctx %p",
GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_LOG_OBJECT (splitmux,
"Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->sinkpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
tmpctx, tmpctx->sinkpad);
ready = FALSE;
break;
"Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->sinkpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
tmpctx, tmpctx->sinkpad);
ready = FALSE;
break;
}
}
if (ready) {
GST_DEBUG_OBJECT (splitmux,
"Collected GOP is complete. Processing (ctx %p)", ctx);
/* All pads have a complete GOP, release it into the multiqueue */
handle_gathered_gop (splitmux);
/* The user has requested a split, we can split now that the previous GOP
* has been collected to the correct location */
if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
TRUE, FALSE)) {
g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
}
}
}
if (ready) {
GST_DEBUG_OBJECT (splitmux,
"Collected GOP is complete. Processing (ctx %p)", ctx);
/* All pads have a complete GOP, release it into the multiqueue */
handle_gathered_gop (splitmux);
/* The user has requested a split, we can split now that the previous GOP
* has been collected to the correct location */
if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
FALSE)) {
g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
}
/* If upstream reached EOS we are not expecting more data, no need to wait
* here. */
if (ctx->in_eos)
return;
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
!ctx->flushing &&
(ctx->in_running_time >= splitmux->max_in_running_time) &&
(splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
}
}
/* If upstream reached EOS we are not expecting more data, no need to wait
* here. */
if (ctx->in_eos)
return;
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
!ctx->flushing &&
(ctx->in_running_time >= splitmux->max_in_running_time) &&
(splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
}
} while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
}
static GstPadProbeReturn
@ -3160,12 +3162,18 @@ gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
if (ctx->sink_pad_block_id)
GST_SPLITMUX_UNLOCK (splitmux);
if (ctx->sink_pad_block_id) {
gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
}
if (ctx->src_pad_block_id)
gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
GST_SPLITMUX_LOCK (splitmux);
/* Can release the context now */
mq_stream_ctx_free (ctx);
if (ctx == splitmux->reference_ctx)