diff --git a/ChangeLog b/ChangeLog index 04b761bdf5..2aaeb6580d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,24 @@ +2006-06-02 Thomas Vander Stichele + + * gst/tcp/README: + * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_init), + (gst_multi_fd_sink_remove_client_link), + (gst_multi_fd_sink_client_queue_caps), + (gst_multi_fd_sink_client_queue_buffer), + (gst_multi_fd_sink_handle_client_write), + (gst_multi_fd_sink_render): + * gst/tcp/gstmultifdsink.h: + make multifdsink properly deal with streamheader: + - streamheader is taken from caps + - buffers marked with IN_CAPS are not sent + - streamheaders are sent, on connection, from the caps of the + buffer where the client gets positioned to + - further streamheader changes are done every time the client + will receive a buffer with different caps + * tests/check/elements/multifdsink.c: (GST_START_TEST), + (gst_multifdsink_create_streamheader): + add tests for this + 2006-06-02 Michael Smith * ext/vorbis/vorbisdec.c: (vorbis_handle_identification_packet): diff --git a/gst/tcp/README b/gst/tcp/README index 589d534091..0e3af6a614 100644 --- a/gst/tcp/README +++ b/gst/tcp/README @@ -29,3 +29,22 @@ TODO ---- - implement DNS resolution +multifdsink +----------- +- operation: + - client fd gets added when "add" signal gets emitted on multifdsink + - signal handler creates a GstTCPClient structure, adds it to ->clients, + and adds the fd to ->fd_hash, then emits client-added + - client + + - when a buffer comes in: + - the _render vmethod puts the buffer on the global queue + - and increases bytes_to_serve + - (currently it sets streamheaders, but since this is treated globally + this is wrong - clients can be at different positions in the stream) + + - when a client issues a write (ie requests data): + - when using GDP, if no caps sent yet, send caps first, then set caps_sent + - if streamheader buffers, and we haven't sent yet to this client, + send current streamheader buffers, then set streamheader_sent + - send out buffers diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 5ded7c4f02..7dfce3e766 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -537,6 +537,8 @@ gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass) this->timeout = DEFAULT_TIMEOUT; this->sync_method = DEFAULT_SYNC_METHOD; + + this->header_flags = 0; } static void @@ -792,6 +794,10 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) g_slist_free (client->sending); client->sending = NULL; + if (client->caps) + gst_caps_unref (client->caps); + client->caps = NULL; + /* unlock the mutex before signaling because the signal handler * might query some properties */ CLIENTS_UNLOCK (sink); @@ -936,7 +942,8 @@ gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink, client->fd.fd, string); g_free (string); - if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { + if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header, + &payload)) { GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps"); return FALSE; } @@ -965,11 +972,118 @@ static gboolean gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, GstTCPClient * client, GstBuffer * buffer) { + GstCaps *caps; + + /* TRUE: send them if the new caps have them */ + gboolean send_streamheader = FALSE; + GstStructure *s; + + + /* before we queue the buffer, we check if we need to queue streamheader + * buffers (because it's a new client, or because they changed) */ + caps = gst_buffer_get_caps (buffer); /* cleaned up after streamheader */ + if (!client->caps) { + GST_LOG_OBJECT (sink, + "[fd %5d] no previous caps for this client, send streamheader", + client->fd.fd); + send_streamheader = TRUE; + client->caps = gst_caps_ref (caps); + } else { + /* there were previous caps recorded, so compare */ + if (!gst_caps_is_equal (caps, client->caps)) { + const GValue *sh1, *sh2; + + /* caps are not equal, but could still have the same streamheader */ + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_has_field (s, "streamheader")) { + /* no new streamheader, so nothing new to send */ + GST_LOG_OBJECT (sink, + "[fd %5d] new caps do not have streamheader, not sending", + client->fd.fd); + } else { + /* there is a new streamheader */ + s = gst_caps_get_structure (client->caps, 0); + if (!gst_structure_has_field (s, "streamheader")) { + /* no previous streamheader, so send the new one */ + GST_LOG_OBJECT (sink, + "[fd %5d] previous caps did not have streamheader, sending", + client->fd.fd); + send_streamheader = TRUE; + } else { + /* both old and new caps have streamheader set */ + sh1 = gst_structure_get_value (s, "streamheader"); + s = gst_caps_get_structure (caps, 0); + sh2 = gst_structure_get_value (s, "streamheader"); + if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) { + GST_LOG_OBJECT (sink, + "[fd %5d] new streamheader different from old, sending", + client->fd.fd); + send_streamheader = TRUE; + } + } + } + } + } + + if (G_UNLIKELY (send_streamheader)) { + const GValue *sh; + GArray *buffers; + int i; + + GST_LOG_OBJECT (sink, + "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, + client->fd.fd, caps); + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_has_field (s, "streamheader")) { + GST_LOG_OBJECT (sink, + "[fd %5d] no new streamheader, so nothing to send", client->fd.fd); + } else { + GST_LOG_OBJECT (sink, + "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, + client->fd.fd, caps); + sh = gst_structure_get_value (s, "streamheader"); + g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY); + buffers = g_value_peek_pointer (sh); + for (i = 0; i < buffers->len; ++i) { + GValue *bufval; + GstBuffer *buffer; + + bufval = &g_array_index (buffers, GValue, i); + g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER); + buffer = g_value_peek_pointer (bufval); + GST_LOG_OBJECT (sink, + "[fd %5d] queueing streamheader buffer of length %d", + client->fd.fd, GST_BUFFER_SIZE (buffer)); + gst_buffer_ref (buffer); + + if (sink->protocol == GST_TCP_PROTOCOL_GDP) { + guint8 *header; + guint len; + + if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, + &header)) { + GST_DEBUG_OBJECT (sink, + "[fd %5d] could not create header, removing client", + client->fd.fd); + return FALSE; + } + gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, + len); + } + + client->sending = g_slist_append (client->sending, buffer); + } + } + } + + gst_caps_unref (caps); + caps = NULL; + /* now we can send the buffer, possibly sending a GDP header first */ if (sink->protocol == GST_TCP_PROTOCOL_GDP) { guint8 *header; guint len; - if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) { + if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) { GST_DEBUG_OBJECT (sink, "[fd %5d] could not create header, removing client", client->fd.fd); return FALSE; @@ -1143,28 +1257,6 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, client->caps_sent = TRUE; } } - /* if we have streamheader buffers, and haven't sent them to this client - * yet, send them out one by one */ - if (!client->streamheader_sent) { - GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd, - g_slist_length (sink->streamheader)); - if (sink->streamheader) { - GSList *l; - - for (l = sink->streamheader; l; l = l->next) { - /* queue stream headers for sending */ - res = - gst_multi_fd_sink_client_queue_buffer (sink, client, - GST_BUFFER (l->data)); - if (!res) { - GST_DEBUG_OBJECT (sink, - "Failed queueing streamheader, removing client"); - return FALSE; - } - } - } - client->streamheader_sent = TRUE; - } more = TRUE; do { @@ -1645,9 +1737,32 @@ static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) { GstMultiFdSink *sink; + GstCaps *bufcaps, *padcaps; sink = GST_MULTI_FD_SINK (bsink); + /* since we check every buffer for streamheader caps, we need to make + * sure every buffer has caps set */ + bufcaps = gst_buffer_get_caps (buf); + padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink)); + + /* make sure we have caps on the pad */ + if (!padcaps) { + if (!bufcaps) { + GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL), + ("Received first buffer without caps set")); + return GST_FLOW_NOT_NEGOTIATED; + } + } + + /* stamp the buffer with previous caps if no caps set */ + if (!bufcaps) { + buf = gst_buffer_make_writable (buf); + gst_buffer_set_caps (buf, padcaps); + } else { + gst_caps_unref (bufcaps); + } + /* since we keep this buffer out of the scope of this method */ gst_buffer_ref (buf); @@ -1670,9 +1785,11 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) /* if the incoming buffer is marked as IN CAPS, then we assume for now * it's a streamheader that needs to be sent to each new client, so we * put it on our internal list of streamheader buffers. - * After that we return, since we only send these out when we get - * non IN_CAPS buffers so we properly keep track of clients that got - * streamheaders. */ + * FIXME: we could check if the buffer's contents are in fact part of the + * current streamheader. + * + * We don't send the buffer to the client, since streamheaders are sent + * separately when necessary. */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) { sink->previous_buffer_in_caps = TRUE; GST_DEBUG_OBJECT (sink, diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 7848903801..7e615736f6 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -142,9 +142,10 @@ typedef struct { GstTCPProtocol protocol; gboolean caps_sent; - gboolean streamheader_sent; gboolean new_connection; + GstCaps *caps; /* caps of last queued buffer */ + /* stats */ guint64 bytes_sent; guint64 connect_time; @@ -152,7 +153,6 @@ typedef struct { guint64 last_activity_time; guint64 dropped_buffers; guint64 avg_queue_size; - } GstTCPClient; #define CLIENTS_LOCK_INIT(fdsink) (g_static_rec_mutex_init(&fdsink->clientslock)) @@ -203,6 +203,8 @@ struct _GstMultiFdSink { gint buffers_queued; /* number of queued buffers */ gint bytes_queued; /* number of queued bytes */ gint time_queued; /* number of queued time */ + + guint8 header_flags; }; struct _GstMultiFdSinkClass { diff --git a/tests/check/elements/multifdsink.c b/tests/check/elements/multifdsink.c index 67c8c4758e..7b822cae38 100644 --- a/tests/check/elements/multifdsink.c +++ b/tests/check/elements/multifdsink.c @@ -31,7 +31,7 @@ GstPad *mysrcpad; static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, - GST_STATIC_CAPS ("application/x-gdp") + GST_STATIC_CAPS ("application/x-gst-check") ); GstElement * @@ -80,12 +80,16 @@ GST_START_TEST (test_no_clients) { GstElement *sink; GstBuffer *buffer; + GstCaps *caps; sink = setup_multifdsink (); ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); + caps = gst_caps_from_string ("application/x-gst-check"); buffer = gst_buffer_new_and_alloc (4); + gst_buffer_set_caps (buffer, caps); + gst_caps_unref (caps); fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); GST_DEBUG ("cleaning up multifdsink"); @@ -99,6 +103,7 @@ GST_START_TEST (test_add_client) { GstElement *sink; GstBuffer *buffer; + GstCaps *caps; int pfd[2]; gchar data[4]; guint64 bytes_served; @@ -112,7 +117,11 @@ GST_START_TEST (test_add_client) /* add the client */ g_signal_emit_by_name (sink, "add", pfd[1]); + caps = gst_caps_from_string ("application/x-gst-check"); + GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps); buffer = gst_buffer_new_and_alloc (4); + gst_buffer_set_caps (buffer, caps); + ASSERT_CAPS_REFCOUNT (caps, "caps", 2); memcpy (GST_BUFFER_DATA (buffer), "dead", 4); fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); @@ -123,6 +132,9 @@ GST_START_TEST (test_add_client) GST_DEBUG ("cleaning up multifdsink"); ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); cleanup_multifdsink (sink); + + ASSERT_CAPS_REFCOUNT (caps, "caps", 1); + gst_caps_unref (caps); } GST_END_TEST; @@ -143,12 +155,13 @@ G_STMT_START { \ /* from the given two data buffers, create two streamheader buffers and * some caps that match it, and store them in the given pointers - * returns buffers and caps with a refcount of 1 */ + * returns one ref to each of the buffers and the caps */ static void gst_multifdsink_create_streamheader (const gchar * data1, const gchar * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2, GstCaps ** caps) { + GstBuffer *buf; GValue array = { 0 }; GValue value = { 0 }; GstStructure *structure; @@ -174,12 +187,19 @@ gst_multifdsink_create_streamheader (const gchar * data1, g_value_init (&array, GST_TYPE_ARRAY); g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, *hbuf1); + /* we take a copy, set it on the array (which refs it), then unref our copy */ + buf = gst_buffer_copy (*hbuf1); + gst_value_set_buffer (&value, buf); + ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2); + gst_buffer_unref (buf); gst_value_array_append_value (&array, &value); g_value_unset (&value); g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, *hbuf2); + buf = gst_buffer_copy (*hbuf2); + gst_value_set_buffer (&value, buf); + ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2); + gst_buffer_unref (buf); gst_value_array_append_value (&array, &value); g_value_unset (&value); @@ -188,6 +208,14 @@ gst_multifdsink_create_streamheader (const gchar * data1, gst_structure_set_value (structure, "streamheader", &array); g_value_unset (&array); + ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 1); + + /* set our streamheadery caps on the buffers */ + gst_buffer_set_caps (*hbuf1, *caps); + gst_buffer_set_caps (*hbuf2, *caps); + ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 3); + + GST_DEBUG ("created streamheader caps %p %" GST_PTR_FORMAT, *caps, *caps); } @@ -196,7 +224,8 @@ gst_multifdsink_create_streamheader (const gchar * data1, * - sets streamheader caps on the pad * - pushes the IN_CAPS buffers * - pushes a buffer - * - verifies that the client received all the data correctly + * - verifies that the client received all the data correctly, and did not + * get multiple copies of the streamheader * - adds a second client * - verifies that this second client receives the streamheader caps too, plus * - the new buffer @@ -227,7 +256,8 @@ GST_START_TEST (test_streamheader) gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2, &caps); fail_unless (gst_pad_set_caps (mysrcpad, caps)); - gst_caps_unref (caps); + /* one is ours, two on the buffers, and one now on the pad */ + ASSERT_CAPS_REFCOUNT (caps, "caps", 4); fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK); fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK); @@ -265,11 +295,21 @@ GST_START_TEST (test_streamheader) fail_unless_read ("second client", pfd2[0], 4, "deaf"); wait_bytes_served (sink, 36); - gst_buffer_unref (hbuf1); - gst_buffer_unref (hbuf2); GST_DEBUG ("cleaning up multifdsink"); + + g_signal_emit_by_name (sink, "remove", pfd1[1]); + g_signal_emit_by_name (sink, "remove", pfd2[1]); + ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); cleanup_multifdsink (sink); + + ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1); + ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1); + gst_buffer_unref (hbuf1); + gst_buffer_unref (hbuf2); + + ASSERT_CAPS_REFCOUNT (caps, "caps", 1); + gst_caps_unref (caps); } GST_END_TEST; @@ -306,10 +346,15 @@ GST_START_TEST (test_change_streamheader) /* create caps with streamheader, set the caps, and push the IN_CAPS * buffers */ - gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2, + gst_multifdsink_create_streamheader ("first", "header", &hbuf1, &hbuf2, &caps); fail_unless (gst_pad_set_caps (mysrcpad, caps)); - gst_caps_unref (caps); + /* one is ours, two on the buffers, and one now on the pad */ + ASSERT_CAPS_REFCOUNT (caps, "caps", 4); + + /* one to hold for the test and one to give away */ + ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2); + ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2); fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK); fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK); @@ -327,22 +372,32 @@ GST_START_TEST (test_change_streamheader) memcpy (GST_BUFFER_DATA (buf), "f00d", 4); gst_pad_push (mysrcpad, buf); - fail_unless_read ("change: first client", pfd1[0], 4, "babe"); - fail_unless_read ("change: first client", pfd1[0], 8, "deadbeef"); + fail_unless_read ("change: first client", pfd1[0], 5, "first"); + fail_unless_read ("change: first client", pfd1[0], 6, "header"); fail_unless_read ("change: first client", pfd1[0], 4, "f00d"); - wait_bytes_served (sink, 16); + //wait_bytes_served (sink, 16); /* now add the second client */ g_signal_emit_by_name (sink, "add", pfd2[1]); fail_if_can_read ("second client, no buffer", pfd2[0]); /* change the streamheader */ + + /* before we change, multifdsink still has a list of the old streamheaders */ + ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2); + ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2); gst_buffer_unref (hbuf1); gst_buffer_unref (hbuf2); - gst_multifdsink_create_streamheader ("beef", "deadbabe", &hbuf1, &hbuf2, + + /* drop our ref to the previous caps */ + gst_caps_unref (caps); + + gst_multifdsink_create_streamheader ("second", "header", &hbuf1, &hbuf2, &caps); fail_unless (gst_pad_set_caps (mysrcpad, caps)); - gst_caps_unref (caps); + /* one to hold for the test and one to give away */ + ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2); + ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2); fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK); fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK); @@ -353,25 +408,35 @@ GST_START_TEST (test_change_streamheader) /* now push another buffer, which will trigger streamheader for second * client, but should also send new streamheaders to first client */ - buf = gst_buffer_new_and_alloc (4); - memcpy (GST_BUFFER_DATA (buf), "deaf", 4); + buf = gst_buffer_new_and_alloc (8); + memcpy (GST_BUFFER_DATA (buf), "deadbabe", 8); gst_pad_push (mysrcpad, buf); - /* FIXME: here's a bug - the first client does not get new streamheaders */ - fail_unless_read ("first client", pfd1[0], 4, "deaf"); + fail_unless_read ("first client", pfd1[0], 6, "second"); + fail_unless_read ("first client", pfd1[0], 6, "header"); + fail_unless_read ("first client", pfd1[0], 8, "deadbabe"); /* new streamheader data */ - fail_unless_read ("second client", pfd2[0], 4, "beef"); - fail_unless_read ("second client", pfd2[0], 8, "deadbabe"); + fail_unless_read ("second client", pfd2[0], 6, "second"); + fail_unless_read ("second client", pfd2[0], 6, "header"); /* we missed the f00d buffer */ - fail_unless_read ("second client", pfd2[0], 4, "deaf"); - wait_bytes_served (sink, 36); + fail_unless_read ("second client", pfd2[0], 8, "deadbabe"); + //wait_bytes_served (sink, 36); + GST_DEBUG ("cleaning up multifdsink"); + g_signal_emit_by_name (sink, "remove", pfd1[1]); + g_signal_emit_by_name (sink, "remove", pfd2[1]); + ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + + /* setting to NULL should have cleared the streamheader */ + ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1); + ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1); gst_buffer_unref (hbuf1); gst_buffer_unref (hbuf2); - GST_DEBUG ("cleaning up multifdsink"); - ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); cleanup_multifdsink (sink); + + ASSERT_CAPS_REFCOUNT (caps, "caps", 1); + gst_caps_unref (caps); } GST_END_TEST;