sctpenc: Propagate downstream flow errors upstream

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/1180
This commit is contained in:
Sebastian Dröge 2020-01-30 15:58:30 +02:00
parent 1f9c1aa489
commit 90e9f12880
2 changed files with 33 additions and 1 deletions

View file

@ -269,6 +269,7 @@ gst_sctp_enc_init (GstSctpEnc * self)
gst_element_add_pad (GST_ELEMENT (self), self->src_pad); gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
g_queue_init (&self->pending_pads); g_queue_init (&self->pending_pads);
self->src_ret = GST_FLOW_FLUSHING;
} }
static void static void
@ -338,6 +339,7 @@ gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
break; break;
case GST_STATE_CHANGE_READY_TO_PAUSED: case GST_STATE_CHANGE_READY_TO_PAUSED:
self->need_segment = self->need_stream_start_caps = TRUE; self->need_segment = self->need_stream_start_caps = TRUE;
self->src_ret = GST_FLOW_OK;
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE); gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
res = configure_association (self); res = configure_association (self);
break; break;
@ -345,6 +347,7 @@ gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
break; break;
case GST_STATE_CHANGE_PAUSED_TO_READY: case GST_STATE_CHANGE_PAUSED_TO_READY:
sctpenc_cleanup (self); sctpenc_cleanup (self);
self->src_ret = GST_FLOW_FLUSHING;
break; break;
case GST_STATE_CHANGE_READY_TO_NULL: case GST_STATE_CHANGE_READY_TO_NULL:
break; break;
@ -500,9 +503,15 @@ gst_sctp_enc_srcpad_loop (GstPad * pad)
} }
if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) { if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
flow_ret = gst_pad_push (self->src_pad, GST_BUFFER (item->object)); GstBuffer *buffer = GST_BUFFER (item->object);
flow_ret = gst_pad_push (self->src_pad, buffer);
item->object = NULL; item->object = NULL;
GST_OBJECT_LOCK (self);
self->src_ret = flow_ret;
GST_OBJECT_UNLOCK (self);
if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
|| flow_ret == GST_FLOW_NOT_LINKED)) { || flow_ret == GST_FLOW_NOT_LINKED)) {
GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s", GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
@ -521,6 +530,10 @@ gst_sctp_enc_srcpad_loop (GstPad * pad)
item->destroy (item); item->destroy (item);
} else { } else {
GST_OBJECT_LOCK (self);
self->src_ret = GST_FLOW_FLUSHING;
GST_OBJECT_UNLOCK (self);
GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing"); GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
gst_pad_pause_task (pad); gst_pad_pause_task (pad);
} }
@ -541,6 +554,17 @@ gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO; const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
GstFlowReturn flow_ret = GST_FLOW_ERROR; GstFlowReturn flow_ret = GST_FLOW_ERROR;
GST_OBJECT_LOCK (self);
if (self->src_ret != GST_FLOW_OK) {
GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
gst_flow_get_name (self->src_ret));
flow_ret = self->src_ret;
GST_OBJECT_UNLOCK (self);
gst_buffer_unref (buffer);
return flow_ret;
}
GST_OBJECT_UNLOCK (self);
ppid = sctpenc_pad->ppid; ppid = sctpenc_pad->ppid;
ordered = sctpenc_pad->ordered; ordered = sctpenc_pad->ordered;
pr = sctpenc_pad->reliability; pr = sctpenc_pad->reliability;
@ -617,6 +641,7 @@ error:
static gboolean static gboolean
gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{ {
GstSctpEnc *self = GST_SCTP_ENC (parent);
GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad); GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
gboolean ret, is_new_ppid; gboolean ret, is_new_ppid;
guint32 new_ppid; guint32 new_ppid;
@ -656,6 +681,9 @@ gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
break; break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
sctpenc_pad->flushing = FALSE; sctpenc_pad->flushing = FALSE;
GST_OBJECT_LOCK (self);
self->src_ret = GST_FLOW_OK;
GST_OBJECT_UNLOCK (self);
ret = gst_pad_event_default (pad, parent, event); ret = gst_pad_event_default (pad, parent, event);
break; break;
default: default:
@ -715,6 +743,9 @@ gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE); gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
self->need_segment = TRUE; self->need_segment = TRUE;
GST_OBJECT_LOCK (self);
self->src_ret = GST_FLOW_OK;
GST_OBJECT_UNLOCK (self);
gst_pad_start_task (self->src_pad, gst_pad_start_task (self->src_pad,
(GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL); (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);

View file

@ -46,6 +46,7 @@ struct _GstSctpEnc
GstElement element; GstElement element;
GstPad *src_pad; GstPad *src_pad;
GstFlowReturn src_ret;
gboolean need_stream_start_caps, need_segment; gboolean need_stream_start_caps, need_segment;
guint32 sctp_association_id; guint32 sctp_association_id;
guint16 remote_sctp_port; guint16 remote_sctp_port;