/* GStreamer * Copyright (C) 2015-2017 YouView TV Ltd * Author: Vincent Penquerch * * gstipcpipelinecomm.c: * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, * Boston, MA 02110-1301, USA. */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include #include #include #include #include #include "gstipcpipelinecomm.h" GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_comm_debug); #define GST_CAT_DEFAULT gst_ipc_pipeline_comm_debug #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND) GQuark QUARK_ID; typedef enum { ACK_TYPE_NONE, ACK_TYPE_TIMED, ACK_TYPE_BLOCKING } AckType; typedef enum { COMM_REQUEST_TYPE_BUFFER, COMM_REQUEST_TYPE_EVENT, COMM_REQUEST_TYPE_QUERY, COMM_REQUEST_TYPE_STATE_CHANGE, COMM_REQUEST_TYPE_MESSAGE, } CommRequestType; typedef struct { guint32 id; gboolean replied; gboolean comm_error; guint32 ret; GstQuery *query; CommRequestType type; GCond cond; } CommRequest; static const gchar *comm_request_ret_get_name (CommRequestType type, guint32 ret); static guint32 comm_request_ret_get_failure_value (CommRequestType type); static CommRequest * comm_request_new (guint32 id, CommRequestType type, GstQuery * query) { CommRequest *req; req = g_malloc (sizeof (CommRequest)); req->id = id; g_cond_init (&req->cond); req->replied = FALSE; req->comm_error = FALSE; req->query = query; req->ret = comm_request_ret_get_failure_value (type); req->type = type; return req; } static guint32 comm_request_wait (GstIpcPipelineComm * comm, CommRequest * req, AckType ack_type) { guint32 ret = comm_request_ret_get_failure_value (req->type); guint64 end_time; if (ack_type == ACK_TYPE_TIMED) end_time = g_get_monotonic_time () + comm->ack_time; else end_time = G_MAXUINT64; GST_TRACE_OBJECT (comm->element, "Waiting for ACK/NAK for request %u", req->id); while (!req->replied) { if (ack_type == ACK_TYPE_TIMED) { g_cond_wait_until (&req->cond, &comm->mutex, end_time); if (g_get_monotonic_time () >= end_time) break; } else g_cond_wait (&req->cond, &comm->mutex); } if (req->replied) { ret = req->ret; GST_TRACE_OBJECT (comm->element, "Got reply for request %u: %d (%s)", req->id, ret, comm_request_ret_get_name (req->type, ret)); } else { req->comm_error = TRUE; GST_ERROR_OBJECT (comm->element, "Timeout waiting for reply for request %u", req->id); } return ret; } static void comm_request_free (CommRequest * req) { g_cond_clear (&req->cond); g_free (req); } static const gchar * comm_request_ret_get_name (CommRequestType type, guint32 ret) { switch (type) { case COMM_REQUEST_TYPE_BUFFER: return gst_flow_get_name (ret); case COMM_REQUEST_TYPE_EVENT: case COMM_REQUEST_TYPE_QUERY: case COMM_REQUEST_TYPE_MESSAGE: return ret ? "TRUE" : "FALSE"; case COMM_REQUEST_TYPE_STATE_CHANGE: return gst_element_state_change_return_get_name (ret); default: g_assert_not_reached (); } } static guint32 comm_request_ret_get_failure_value (CommRequestType type) { switch (type) { case COMM_REQUEST_TYPE_BUFFER: return GST_FLOW_COMM_ERROR; case COMM_REQUEST_TYPE_EVENT: case COMM_REQUEST_TYPE_MESSAGE: case COMM_REQUEST_TYPE_QUERY: return FALSE; case COMM_REQUEST_TYPE_STATE_CHANGE: return GST_STATE_CHANGE_FAILURE; default: g_assert_not_reached (); } } static const gchar * gst_ipc_pipeline_comm_data_type_get_name (GstIpcPipelineCommDataType type) { switch (type) { case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: return "ACK"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: return "QUERY_RESULT"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: return "BUFFER"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: return "EVENT"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: return "SINK_MESSAGE_EVENT"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: return "QUERY"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: return "STATE_CHANGE"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: return "STATE_LOST"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: return "MESSAGE"; case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: return "GERROR_MESSAGE"; default: return "UNKNOWN"; } } static gboolean gst_ipc_pipeline_comm_sync_fd (GstIpcPipelineComm * comm, guint32 id, GstQuery * query, guint32 * ret, AckType ack_type, CommRequestType type) { CommRequest *req; gboolean comm_error; GHashTable *waiting_ids; if (ack_type == ACK_TYPE_NONE) return TRUE; req = comm_request_new (id, type, query); waiting_ids = g_hash_table_ref (comm->waiting_ids); g_hash_table_insert (waiting_ids, GINT_TO_POINTER (id), req); *ret = comm_request_wait (comm, req, ack_type); comm_error = req->comm_error; g_hash_table_remove (waiting_ids, GINT_TO_POINTER (id)); g_hash_table_unref (waiting_ids); return !comm_error; } static gboolean write_to_fd_raw (GstIpcPipelineComm * comm, const void *data, size_t size) { size_t offset; gboolean ret = TRUE; offset = 0; GST_TRACE_OBJECT (comm->element, "Writing %zu bytes to fdout", size); while (size) { ssize_t written = write (comm->fdout, (const unsigned char *) data + offset, size); if (written < 0) { if (errno == EAGAIN || errno == EINTR) continue; GST_ERROR_OBJECT (comm->element, "Failed to write to fd: %s", strerror (errno)); ret = FALSE; goto done; } size -= written; offset += written; } done: return ret; } static gboolean write_byte_writer_to_fd (GstIpcPipelineComm * comm, GstByteWriter * bw) { guint8 *data; gboolean ret; guint size; size = gst_byte_writer_get_size (bw); data = gst_byte_writer_reset_and_get_data (bw); if (!data) return FALSE; ret = write_to_fd_raw (comm, data, size); g_free (data); return ret; } static void gst_ipc_pipeline_comm_write_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, guint32 ret, CommRequestType type) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK; guint32 size; GstByteWriter bw; g_mutex_lock (&comm->mutex); GST_TRACE_OBJECT (comm->element, "Writing ACK for %u: %s (%d)", id, comm_request_ret_get_name (type, ret), ret); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, id)) goto write_failed; size = sizeof (ret); if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, ret)) goto write_failed; if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; done: g_mutex_unlock (&comm->mutex); gst_byte_writer_reset (&bw); return; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); goto done; } void gst_ipc_pipeline_comm_write_flow_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, GstFlowReturn ret) { gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, COMM_REQUEST_TYPE_BUFFER); } void gst_ipc_pipeline_comm_write_boolean_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, gboolean ret) { gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, COMM_REQUEST_TYPE_EVENT); } void gst_ipc_pipeline_comm_write_state_change_ack_to_fd (GstIpcPipelineComm * comm, guint32 id, GstStateChangeReturn ret) { gst_ipc_pipeline_comm_write_ack_to_fd (comm, id, (guint32) ret, COMM_REQUEST_TYPE_STATE_CHANGE); } void gst_ipc_pipeline_comm_write_query_result_to_fd (GstIpcPipelineComm * comm, guint32 id, gboolean result, GstQuery * query) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT; guint8 result8 = result; guint32 size; size_t len; char *str = NULL; guint32 type; const GstStructure *structure; GstByteWriter bw; g_mutex_lock (&comm->mutex); GST_TRACE_OBJECT (comm->element, "Writing query result for %u: %d, %" GST_PTR_FORMAT, id, result, query); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, id)) goto write_failed; structure = gst_query_get_structure (query); if (structure) { str = gst_structure_to_string (structure); len = strlen (str); } else { str = NULL; len = 0; } size = 1 + sizeof (guint32) + len + 1; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!gst_byte_writer_put_uint8 (&bw, result8)) goto write_failed; type = GST_QUERY_TYPE (query); if (!gst_byte_writer_put_uint32_le (&bw, type)) goto write_failed; if (str) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, len + 1)) goto write_failed; } else { if (!gst_byte_writer_put_uint8 (&bw, 0)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; done: g_mutex_unlock (&comm->mutex); gst_byte_writer_reset (&bw); g_free (str); return; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); goto done; } static gboolean gst_ipc_pipeline_comm_read_query_result (GstIpcPipelineComm * comm, guint32 size, GstQuery ** query) { gchar *end = NULL; GstStructure *structure; guint8 result; guint32 type; const guint8 *payload = NULL; guint32 mapped_size = size; /* this should not be called if we don't have enough yet */ *query = NULL; g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); g_return_val_if_fail (size >= 1 + sizeof (guint32), FALSE); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return FALSE; result = *payload++; memcpy (&type, payload, sizeof (type)); payload += sizeof (type); size -= 1 + sizeof (guint32); if (size == 0) goto done; if (payload[size - 1]) { result = FALSE; goto done; } if (*payload) { structure = gst_structure_from_string ((const char *) payload, &end); } else { structure = NULL; } if (!structure) { result = FALSE; goto done; } *query = gst_query_new_custom (type, structure); done: gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return result; } typedef struct { guint32 bytes; guint64 size; guint32 flags; guint64 api; char *str; } MetaBuildInfo; typedef struct { GstIpcPipelineComm *comm; guint32 n_meta; guint32 total_bytes; MetaBuildInfo *info; } MetaListRepresentation; static gboolean build_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data) { MetaListRepresentation *repr = user_data; repr->n_meta++; repr->info = g_realloc (repr->info, repr->n_meta * sizeof (MetaBuildInfo)); repr->info[repr->n_meta - 1].bytes = /* 4 byte bytes */ 4 /* 4 byte GstMetaFlags */ + 4 /* GstMetaInfo::api */ + 4 + strlen (g_type_name ((*meta)->info->api)) + 1 /* GstMetaInfo::size */ + 8 /* str length */ + 4; repr->info[repr->n_meta - 1].flags = (*meta)->flags; repr->info[repr->n_meta - 1].api = (*meta)->info->api; repr->info[repr->n_meta - 1].size = (*meta)->info->size; repr->info[repr->n_meta - 1].str = NULL; /* GstMeta is a base class, and actual useful classes are all different... So we list a few of them we know we want and ignore the open ended rest */ if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) { GstProtectionMeta *m = (GstProtectionMeta *) * meta; repr->info[repr->n_meta - 1].str = gst_structure_to_string (m->info); repr->info[repr->n_meta - 1].bytes += strlen (repr->info[repr->n_meta - 1].str) + 1; GST_TRACE_OBJECT (repr->comm->element, "Found GstMeta type %s: %s", g_type_name ((*meta)->info->api), repr->info[repr->n_meta - 1].str); } else { GST_WARNING_OBJECT (repr->comm->element, "Ignoring GstMeta type %s", g_type_name ((*meta)->info->api)); } repr->total_bytes += repr->info[repr->n_meta - 1].bytes; return TRUE; } typedef struct { guint64 pts; guint64 dts; guint64 duration; guint64 offset; guint64 offset_end; guint64 flags; } CommBufferMetadata; GstFlowReturn gst_ipc_pipeline_comm_write_buffer_to_fd (GstIpcPipelineComm * comm, GstBuffer * buffer) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER; GstMapInfo map; guint32 ret32 = GST_FLOW_OK; guint32 size, n; CommBufferMetadata meta; GstFlowReturn ret; MetaListRepresentation repr = { comm, 0, 4, NULL }; /* starts a 4 for n_meta */ GstByteWriter bw; g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing buffer %u: %" GST_PTR_FORMAT, comm->send_id, buffer); gst_byte_writer_init (&bw); meta.pts = GST_BUFFER_PTS (buffer); meta.dts = GST_BUFFER_DTS (buffer); meta.duration = GST_BUFFER_DURATION (buffer); meta.offset = GST_BUFFER_OFFSET (buffer); meta.offset_end = GST_BUFFER_OFFSET_END (buffer); meta.flags = GST_BUFFER_FLAGS (buffer); /* work out meta size */ gst_buffer_foreach_meta (buffer, build_meta, &repr); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; size = gst_buffer_get_size (buffer) + sizeof (guint32) + sizeof (CommBufferMetadata) + repr.total_bytes; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!gst_byte_writer_put_data (&bw, (const guint8 *) &meta, sizeof (meta))) goto write_failed; size = gst_buffer_get_size (buffer); if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) goto map_failed; ret = write_to_fd_raw (comm, map.data, map.size); gst_buffer_unmap (buffer, &map); if (!ret) goto write_failed; /* meta */ gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint32_le (&bw, repr.n_meta)) goto write_failed; for (n = 0; n < repr.n_meta; ++n) { const MetaBuildInfo *info = repr.info + n; guint32 len; const char *s; if (!gst_byte_writer_put_uint32_le (&bw, info->bytes)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, info->flags)) goto write_failed; s = g_type_name (info->api); len = strlen (s) + 1; if (!gst_byte_writer_put_uint32_le (&bw, len)) goto write_failed; if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) goto write_failed; if (!gst_byte_writer_put_uint64_le (&bw, info->size)) goto write_failed; s = info->str; len = s ? (strlen (s) + 1) : 0; if (!gst_byte_writer_put_uint32_le (&bw, len)) goto write_failed; if (len) if (!gst_byte_writer_put_data (&bw, (const guint8 *) s, len)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, ACK_TYPE_BLOCKING, COMM_REQUEST_TYPE_BUFFER)) goto wait_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); gst_byte_writer_reset (&bw); for (n = 0; n < repr.n_meta; ++n) g_free (repr.info[n].str); g_free (repr.info); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = GST_FLOW_COMM_ERROR; goto done; wait_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to wait for reply on socket")); ret = GST_FLOW_COMM_ERROR; goto done; map_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), ("Failed to map buffer")); ret = GST_FLOW_ERROR; goto done; } static GstBuffer * gst_ipc_pipeline_comm_read_buffer (GstIpcPipelineComm * comm, guint32 size) { GstBuffer *buffer; CommBufferMetadata meta; guint32 n_meta, n; const guint8 *payload = NULL; guint32 mapped_size, buffer_data_size; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); g_return_val_if_fail (size >= sizeof (CommBufferMetadata), NULL); mapped_size = sizeof (CommBufferMetadata) + sizeof (buffer_data_size); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return NULL; memcpy (&meta, payload, sizeof (CommBufferMetadata)); payload += sizeof (CommBufferMetadata); memcpy (&buffer_data_size, payload, sizeof (buffer_data_size)); size -= mapped_size; gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); if (buffer_data_size == 0) { buffer = gst_buffer_new (); } else { buffer = gst_adapter_get_buffer (comm->adapter, buffer_data_size); gst_adapter_flush (comm->adapter, buffer_data_size); } size -= buffer_data_size; GST_BUFFER_PTS (buffer) = meta.pts; GST_BUFFER_DTS (buffer) = meta.dts; GST_BUFFER_DURATION (buffer) = meta.duration; GST_BUFFER_OFFSET (buffer) = meta.offset; GST_BUFFER_OFFSET_END (buffer) = meta.offset_end; GST_BUFFER_FLAGS (buffer) = meta.flags; /* If you don't call that, the GType isn't yet known at the g_type_from_name below */ gst_protection_meta_get_info (); mapped_size = size; payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) { gst_buffer_unref (buffer); return NULL; } memcpy (&n_meta, payload, sizeof (n_meta)); payload += sizeof (n_meta); for (n = 0; n < n_meta; ++n) { guint32 flags, len, bytes; guint64 msize; GType api; GstMeta *meta; GstStructure *structure = NULL; memcpy (&bytes, payload, sizeof (bytes)); payload += sizeof (bytes); #define READ_FIELD(f) do { \ memcpy (&f, payload, sizeof (f)); \ payload += sizeof(f); \ } while(0) READ_FIELD (flags); READ_FIELD (len); api = g_type_from_name ((const char *) payload); payload = (const guint8 *) strchr ((const char *) payload, 0) + 1; READ_FIELD (msize); READ_FIELD (len); if (len) { structure = gst_structure_new_from_string ((const char *) payload); payload += len + 1; } /* Seems we can add a meta from the api nor type ? */ if (api == GST_PROTECTION_META_API_TYPE) { meta = gst_buffer_add_meta (buffer, gst_protection_meta_get_info (), NULL); ((GstProtectionMeta *) meta)->info = structure; } else { GST_WARNING_OBJECT (comm->element, "Unsupported meta: %s", g_type_name (api)); } #undef READ_FIELD } gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return buffer; } static gboolean gst_ipc_pipeline_comm_write_sink_message_event_to_fd (GstIpcPipelineComm * comm, GstEvent * event) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT; gboolean ret; guint32 type, size, eseqnum, mseqnum, ret32 = TRUE, slen, structure_slen; char *str = NULL; const GstStructure *structure; GstMessage *message = NULL; const char *name; GstByteWriter bw; g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE, FALSE); g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing sink message event %u: %" GST_PTR_FORMAT, comm->send_id, event); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; name = gst_structure_get_name (gst_event_get_structure (event)); slen = strlen (name) + 1; gst_event_parse_sink_message (event, &message); structure = gst_message_get_structure (message); if (structure) { str = gst_structure_to_string (structure); structure_slen = strlen (str); } else { str = NULL; structure_slen = 0; } size = sizeof (type) + sizeof (eseqnum) + sizeof (mseqnum) + sizeof (slen) + strlen (name) + 1 + structure_slen + 1; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; type = GST_MESSAGE_TYPE (message); if (!gst_byte_writer_put_uint32_le (&bw, type)) goto write_failed; size -= sizeof (type); eseqnum = GST_EVENT_SEQNUM (event); if (!gst_byte_writer_put_uint32_le (&bw, eseqnum)) goto write_failed; size -= sizeof (eseqnum); mseqnum = GST_MESSAGE_SEQNUM (message); if (!gst_byte_writer_put_uint32_le (&bw, mseqnum)) goto write_failed; size -= sizeof (mseqnum); if (!gst_byte_writer_put_uint32_le (&bw, slen)) goto write_failed; size -= sizeof (slen); if (!gst_byte_writer_put_data (&bw, (const guint8 *) name, slen)) goto write_failed; size -= slen; if (str) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) goto write_failed; } else { if (!gst_byte_writer_put_uint8 (&bw, 0)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, GST_EVENT_IS_SERIALIZED (event) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, COMM_REQUEST_TYPE_EVENT)) goto write_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); gst_byte_writer_reset (&bw); g_free (str); if (message) gst_message_unref (message); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = FALSE; goto done; } static GstEvent * gst_ipc_pipeline_comm_read_sink_message_event (GstIpcPipelineComm * comm, guint32 size) { GstMessage *message; GstEvent *event = NULL; gchar *end = NULL; GstStructure *structure; guint32 type, eseqnum, mseqnum, slen; const char *name; guint32 mapped_size = size; const guint8 *payload; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); g_return_val_if_fail (size >= sizeof (type) + sizeof (slen), NULL); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return NULL; memcpy (&type, payload, sizeof (type)); payload += sizeof (type); size -= sizeof (type); if (size == 0) goto done; memcpy (&eseqnum, payload, sizeof (eseqnum)); payload += sizeof (eseqnum); size -= sizeof (eseqnum); if (size == 0) goto done; memcpy (&mseqnum, payload, sizeof (mseqnum)); payload += sizeof (mseqnum); size -= sizeof (mseqnum); if (size == 0) goto done; memcpy (&slen, payload, sizeof (slen)); payload += sizeof (slen); size -= sizeof (slen); if (size == 0) goto done; if (payload[slen - 1]) goto done; name = (const char *) payload; payload += slen; size -= slen; if ((payload)[size - 1]) { goto done; } if (*payload) { structure = gst_structure_from_string ((const char *) payload, &end); } else { structure = NULL; } message = gst_message_new_custom (type, GST_OBJECT (comm->element), structure); gst_message_set_seqnum (message, mseqnum); event = gst_event_new_sink_message (name, message); gst_event_set_seqnum (event, eseqnum); gst_message_unref (message); done: gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return event; } gboolean gst_ipc_pipeline_comm_write_event_to_fd (GstIpcPipelineComm * comm, gboolean upstream, GstEvent * event) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT; gboolean ret; guint32 type, size, ret32 = TRUE, seqnum, slen; char *str = NULL; const GstStructure *structure; GstByteWriter bw; /* we special case sink-message event as gst can't serialize/de-serialize it */ if (GST_EVENT_TYPE (event) == GST_EVENT_SINK_MESSAGE) return gst_ipc_pipeline_comm_write_sink_message_event_to_fd (comm, event); g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing event %u: %" GST_PTR_FORMAT, comm->send_id, event); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; structure = gst_event_get_structure (event); if (structure) { if (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { GstStructure *s = gst_structure_copy (structure); gst_structure_remove_field (s, "stream"); str = gst_structure_to_string (s); gst_structure_free (s); } else { str = gst_structure_to_string (structure); } slen = strlen (str); } else { str = NULL; slen = 0; } size = sizeof (type) + sizeof (seqnum) + 1 + slen + 1; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; type = GST_EVENT_TYPE (event); if (!gst_byte_writer_put_uint32_le (&bw, type)) goto write_failed; seqnum = GST_EVENT_SEQNUM (event); if (!gst_byte_writer_put_uint32_le (&bw, seqnum)) goto write_failed; if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) goto write_failed; if (str) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) goto write_failed; } else { if (!gst_byte_writer_put_uint8 (&bw, 0)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; /* Upstream events get serialized, this is required to send seeks only * one at a time. */ if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, (GST_EVENT_IS_SERIALIZED (event) || GST_EVENT_IS_UPSTREAM (event)) ? ACK_TYPE_BLOCKING : ACK_TYPE_NONE, COMM_REQUEST_TYPE_EVENT)) goto write_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); g_free (str); gst_byte_writer_reset (&bw); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = FALSE; goto done; } static GstEvent * gst_ipc_pipeline_comm_read_event (GstIpcPipelineComm * comm, guint32 size, gboolean * upstream) { GstEvent *event = NULL; gchar *end = NULL; GstStructure *structure; guint32 type, seqnum; guint32 mapped_size = size; const guint8 *payload; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); g_return_val_if_fail (size >= sizeof (type), NULL); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return NULL; memcpy (&type, payload, sizeof (type)); payload += sizeof (type); size -= sizeof (type); if (size == 0) goto done; memcpy (&seqnum, payload, sizeof (seqnum)); payload += sizeof (seqnum); size -= sizeof (seqnum); if (size == 0) goto done; *upstream = (*payload) ? TRUE : FALSE; payload += 1; size -= 1; if (size == 0) goto done; if (payload[size - 1]) goto done; if (*payload) { structure = gst_structure_from_string ((const char *) payload, &end); } else { structure = NULL; } event = gst_event_new_custom (type, structure); gst_event_set_seqnum (event, seqnum); done: gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return event; } gboolean gst_ipc_pipeline_comm_write_query_to_fd (GstIpcPipelineComm * comm, gboolean upstream, GstQuery * query) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY; gboolean ret; guint32 type, size, ret32 = TRUE, slen; char *str = NULL; const GstStructure *structure; GstByteWriter bw; g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing query %u: %" GST_PTR_FORMAT, comm->send_id, query); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; structure = gst_query_get_structure (query); if (structure) { str = gst_structure_to_string (structure); slen = strlen (str); } else { str = NULL; slen = 0; } size = sizeof (type) + 1 + slen + 1; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; type = GST_QUERY_TYPE (query); if (!gst_byte_writer_put_uint32_le (&bw, type)) goto write_failed; if (!gst_byte_writer_put_uint8 (&bw, upstream ? 1 : 0)) goto write_failed; if (str) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, slen + 1)) goto write_failed; } else { if (!gst_byte_writer_put_uint8 (&bw, 0)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, query, &ret32, GST_QUERY_IS_SERIALIZED (query) ? ACK_TYPE_BLOCKING : ACK_TYPE_TIMED, COMM_REQUEST_TYPE_QUERY)) goto write_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); g_free (str); gst_byte_writer_reset (&bw); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = FALSE; goto done; } static GstQuery * gst_ipc_pipeline_comm_read_query (GstIpcPipelineComm * comm, guint32 size, gboolean * upstream) { GstQuery *query = NULL; gchar *end = NULL; GstStructure *structure; guint32 type; guint32 mapped_size = size; const guint8 *payload; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); g_return_val_if_fail (size >= sizeof (type), NULL); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return NULL; memcpy (&type, payload, sizeof (type)); payload += sizeof (type); size -= sizeof (type); if (size == 0) goto done; *upstream = (*payload) ? TRUE : FALSE; payload += 1; size -= 1; if (size == 0) goto done; if (payload[size - 1]) goto done; if (*payload) { structure = gst_structure_from_string ((const char *) payload, &end); } else { structure = NULL; } query = gst_query_new_custom (type, structure); /* CAPS queries contain a filter field, of GstCaps type, which can be NULL. This does not play well with the serialization/deserialization system, which will give us a non-NULL GstCaps which has a value of NULL. This in turn wreaks havoc with any code that tests whether filter is NULL (which basically means, am I being given an optional GstCaps ?). So we look for non-NULL GstCaps which have NULL contents, and replace them with NULL instead. */ if (GST_QUERY_TYPE (query) == GST_QUERY_CAPS) { GstCaps *filter; gst_query_parse_caps (query, &filter); if (filter && !strcmp (gst_structure_get_name (gst_caps_get_structure (filter, 0)), "NULL")) { gst_query_unref (query); query = gst_query_new_caps (NULL); } } done: gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return query; } GstStateChangeReturn gst_ipc_pipeline_comm_write_state_change_to_fd (GstIpcPipelineComm * comm, GstStateChange transition) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE; GstStateChangeReturn ret; guint32 size, ret32 = GST_STATE_CHANGE_SUCCESS; GstByteWriter bw; g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing state change %u: %s -> %s", comm->send_id, gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; size = sizeof (transition); if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, transition)) goto write_failed; if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, ACK_TYPE_TIMED, COMM_REQUEST_TYPE_STATE_CHANGE)) goto write_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); gst_byte_writer_reset (&bw); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = GST_STATE_CHANGE_FAILURE; goto done; } static gboolean is_valid_state_change (GstStateChange transition) { if (transition == GST_STATE_CHANGE_NULL_TO_READY) return TRUE; if (transition == GST_STATE_CHANGE_READY_TO_PAUSED) return TRUE; if (transition == GST_STATE_CHANGE_PAUSED_TO_PLAYING) return TRUE; if (transition == GST_STATE_CHANGE_PLAYING_TO_PAUSED) return TRUE; if (transition == GST_STATE_CHANGE_PAUSED_TO_READY) return TRUE; if (transition == GST_STATE_CHANGE_READY_TO_NULL) return TRUE; if (GST_STATE_TRANSITION_CURRENT (transition) == GST_STATE_TRANSITION_NEXT (transition)) return TRUE; return FALSE; } static gboolean gst_ipc_pipeline_comm_read_state_change (GstIpcPipelineComm * comm, guint32 size, guint32 * transition) { guint32 mapped_size = size; const guint8 *payload; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, FALSE); g_return_val_if_fail (size >= sizeof (*transition), FALSE); payload = gst_adapter_map (comm->adapter, size); if (!payload) return FALSE; memcpy (transition, payload, sizeof (*transition)); gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return is_valid_state_change (*transition); } void gst_ipc_pipeline_comm_write_state_lost_to_fd (GstIpcPipelineComm * comm) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST; guint32 size; GstByteWriter bw; g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing state-lost %u", comm->send_id); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; size = 0; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; done: g_mutex_unlock (&comm->mutex); gst_byte_writer_reset (&bw); return; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); goto done; } static gboolean gst_ipc_pipeline_comm_read_state_lost (GstIpcPipelineComm * comm, guint32 size) { /* no payload */ return TRUE; } static gboolean gst_ipc_pipeline_comm_write_gerror_message_to_fd (GstIpcPipelineComm * comm, GstMessage * message) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE; gboolean ret; guint32 code, size, ret32 = TRUE; char *str = NULL; GError *error; char *extra_message; const char *domain_string; unsigned char msgtype; GstByteWriter bw; g_mutex_lock (&comm->mutex); ++comm->send_id; if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) { gst_message_parse_error (message, &error, &extra_message); msgtype = 2; } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) { gst_message_parse_warning (message, &error, &extra_message); msgtype = 1; } else { gst_message_parse_info (message, &error, &extra_message); msgtype = 0; } code = error->code; domain_string = g_quark_to_string (error->domain); GST_TRACE_OBJECT (comm->element, "Writing error %u: domain %s, code %u, message %s, extra message %s", comm->send_id, domain_string, error->code, error->message, extra_message); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; size = sizeof (size); size += 1; size += strlen (domain_string) + 1; size += sizeof (code); size += sizeof (size); size += error->message ? strlen (error->message) + 1 : 0; size += sizeof (size); size += extra_message ? strlen (extra_message) + 1 : 0; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!gst_byte_writer_put_uint8 (&bw, msgtype)) goto write_failed; size = strlen (domain_string) + 1; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (!gst_byte_writer_put_data (&bw, (const guint8 *) domain_string, size)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, code)) goto write_failed; size = error->message ? strlen (error->message) + 1 : 0; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (error->message) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) error->message, size)) goto write_failed; } size = extra_message ? strlen (extra_message) + 1 : 0; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; if (extra_message) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) extra_message, size)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) goto write_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); g_free (str); g_error_free (error); g_free (extra_message); gst_byte_writer_reset (&bw); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = FALSE; goto done; } static GstMessage * gst_ipc_pipeline_comm_read_gerror_message (GstIpcPipelineComm * comm, guint32 size) { GstMessage *message = NULL; guint32 code; GQuark domain; const char *msg, *extra_message; GError *error; unsigned char msgtype; guint32 mapped_size = size; const guint8 *payload; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); g_return_val_if_fail (size >= sizeof (code) + sizeof (size) * 3 + 1 + 1, NULL); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return NULL; msgtype = *payload++; memcpy (&size, payload, sizeof (size)); payload += sizeof (size); if (payload[size - 1]) goto done; domain = g_quark_from_string ((const char *) payload); payload += size; memcpy (&code, payload, sizeof (code)); payload += sizeof (code); memcpy (&size, payload, sizeof (size)); payload += sizeof (size); if (size) { if (payload[size - 1]) goto done; msg = (const char *) payload; } else { msg = NULL; } payload += size; memcpy (&size, payload, sizeof (size)); payload += sizeof (size); if (size) { if (payload[size - 1]) goto done; extra_message = (const char *) payload; } else { extra_message = NULL; } payload += size; error = g_error_new (domain, code, "%s", msg); if (msgtype == 2) message = gst_message_new_error (GST_OBJECT (comm->element), error, extra_message); else if (msgtype == 1) message = gst_message_new_warning (GST_OBJECT (comm->element), error, extra_message); else message = gst_message_new_info (GST_OBJECT (comm->element), error, extra_message); g_error_free (error); done: gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return message; } gboolean gst_ipc_pipeline_comm_write_message_to_fd (GstIpcPipelineComm * comm, GstMessage * message) { const unsigned char payload_type = GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE; gboolean ret; guint32 type, size, ret32 = TRUE, slen; char *str = NULL; const GstStructure *structure; GstByteWriter bw; /* we special case error as gst can't serialize/de-serialize it */ if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO) return gst_ipc_pipeline_comm_write_gerror_message_to_fd (comm, message); g_mutex_lock (&comm->mutex); ++comm->send_id; GST_TRACE_OBJECT (comm->element, "Writing message %u: %" GST_PTR_FORMAT, comm->send_id, message); gst_byte_writer_init (&bw); if (!gst_byte_writer_put_uint8 (&bw, payload_type)) goto write_failed; if (!gst_byte_writer_put_uint32_le (&bw, comm->send_id)) goto write_failed; structure = gst_message_get_structure (message); if (structure) { str = gst_structure_to_string (structure); slen = strlen (str); } else { str = NULL; slen = 0; } size = sizeof (type) + slen + 1; if (!gst_byte_writer_put_uint32_le (&bw, size)) goto write_failed; type = GST_MESSAGE_TYPE (message); if (!gst_byte_writer_put_uint32_le (&bw, type)) goto write_failed; size -= sizeof (type); if (str) { if (!gst_byte_writer_put_data (&bw, (const guint8 *) str, size)) goto write_failed; } else { if (!gst_byte_writer_put_uint8 (&bw, 0)) goto write_failed; } if (!write_byte_writer_to_fd (comm, &bw)) goto write_failed; if (!gst_ipc_pipeline_comm_sync_fd (comm, comm->send_id, NULL, &ret32, ACK_TYPE_NONE, COMM_REQUEST_TYPE_MESSAGE)) goto write_failed; ret = ret32; done: g_mutex_unlock (&comm->mutex); g_free (str); gst_byte_writer_reset (&bw); return ret; write_failed: GST_ELEMENT_ERROR (comm->element, RESOURCE, WRITE, (NULL), ("Failed to write to socket")); ret = FALSE; goto done; } static GstMessage * gst_ipc_pipeline_comm_read_message (GstIpcPipelineComm * comm, guint32 size) { GstMessage *message = NULL; gchar *end = NULL; GstStructure *structure; guint32 type; guint32 mapped_size = size; const guint8 *payload; /* this should not be called if we don't have enough yet */ g_return_val_if_fail (gst_adapter_available (comm->adapter) >= size, NULL); g_return_val_if_fail (size >= sizeof (type), NULL); payload = gst_adapter_map (comm->adapter, mapped_size); if (!payload) return NULL; memcpy (&type, payload, sizeof (type)); payload += sizeof (type); size -= sizeof (type); if (size == 0) goto done; if (payload[size - 1]) goto done; if (*payload) { structure = gst_structure_from_string ((const char *) payload, &end); } else { structure = NULL; } message = gst_message_new_custom (type, GST_OBJECT (comm->element), structure); done: gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); return message; } void gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element) { g_mutex_init (&comm->mutex); comm->element = element; comm->fdin = comm->fdout = -1; comm->ack_time = DEFAULT_ACK_TIME; comm->waiting_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) comm_request_free); comm->adapter = gst_adapter_new (); g_atomic_int_set (&comm->thread_running, 0); } void gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm) { g_assert (!g_atomic_int_get (&comm->thread_running)); g_hash_table_destroy (comm->waiting_ids); gst_object_unref (comm->adapter); g_mutex_clear (&comm->mutex); } static void cancel_request (gpointer key, gpointer value, gpointer user_data, GstFlowReturn fret) { GstIpcPipelineComm *comm = (GstIpcPipelineComm *) user_data; guint32 id = GPOINTER_TO_INT (key); CommRequest *req = (CommRequest *) value; GST_TRACE_OBJECT (comm->element, "Cancelling request %u, type %d", id, req->type); req->ret = fret; req->replied = TRUE; g_cond_signal (&req->cond); } static void cancel_request_error (gpointer key, gpointer value, gpointer user_data) { CommRequest *req = (CommRequest *) value; GstFlowReturn fret = comm_request_ret_get_failure_value (req->type); cancel_request (key, value, user_data, fret); } void gst_ipc_pipeline_comm_cancel (GstIpcPipelineComm * comm, gboolean cleanup) { g_mutex_lock (&comm->mutex); g_hash_table_foreach (comm->waiting_ids, cancel_request_error, comm); if (cleanup) { g_hash_table_unref (comm->waiting_ids); comm->waiting_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) comm_request_free); } g_mutex_unlock (&comm->mutex); } static gboolean set_field (GQuark field_id, const GValue * value, gpointer user_data) { GstStructure *structure = user_data; gst_structure_id_set_value (structure, field_id, value); return TRUE; } static gboolean gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id, GstFlowReturn ret, GstQuery * query) { CommRequest *req; req = g_hash_table_lookup (comm->waiting_ids, GINT_TO_POINTER (id)); if (!req) { GST_WARNING_OBJECT (comm->element, "Got reply for unknown request %u", id); return FALSE; } GST_TRACE_OBJECT (comm->element, "Got reply %d (%s) for request %u", ret, comm_request_ret_get_name (req->type, ret), req->id); req->replied = TRUE; req->ret = ret; if (query) { if (req->query) { /* We need to update the original query in place, as the caller will expect the object to be the same */ GstStructure *structure = gst_query_writable_structure (req->query); gst_structure_remove_all_fields (structure); gst_structure_foreach (gst_query_get_structure (query), set_field, structure); } else { GST_WARNING_OBJECT (comm->element, "Got query reply, but no query was in the request"); } } g_cond_signal (&req->cond); return TRUE; } static gboolean update_adapter (GstIpcPipelineComm * comm) { GstMemory *mem = NULL; for (;;) { fd_set set; struct timeval tv; int sret; ssize_t sz; GstBuffer *buf; GstMapInfo map; int fdin = comm->fdin; int fdclose = comm->reader_thread_stopping_pipe[0]; int fdmax; FD_ZERO (&set); FD_SET (fdclose, &set); fdmax = fdclose; if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) { FD_SET (fdin, &set); if (fdin > fdmax) fdmax = fdin; } tv.tv_sec = 0; tv.tv_usec = 100000; sret = select (fdmax + 1, &set, NULL, NULL, &tv); if (sret < 0) { if (errno == EAGAIN) continue; if (errno == EINTR) break; if (mem) gst_memory_unref (mem); return FALSE; } if (FD_ISSET (fdclose, &set)) { GST_INFO_OBJECT (comm->element, "data received on close notify pipe"); comm->reader_thread_stopping = TRUE; break; } if (fdin < 0) break; if (!FD_ISSET (fdin, &set)) break; if (mem == NULL) mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL); gst_memory_map (mem, &map, GST_MAP_WRITE); sz = read (fdin, map.data, map.size); gst_memory_unmap (mem, &map); if (sz < 0) { if (errno == EAGAIN) continue; mem = NULL; if (errno == EINTR) break; gst_memory_unref (mem); return FALSE; } if (sz == 0) { GST_INFO_OBJECT (comm->element, "fd closed"); comm->reader_thread_stopping = TRUE; break; } gst_memory_resize (mem, 0, sz); buf = gst_buffer_new (); gst_buffer_append_memory (buf, mem); mem = NULL; GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz); gst_adapter_push (comm->adapter, buf); /* If we have more data, we loop, otherwise we break */ FD_ZERO (&set); if (fdin >= 0) FD_SET (comm->fdin, &set); tv.tv_sec = 0; tv.tv_usec = 0; sret = select (fdin + 1, &set, NULL, NULL, &tv); if (sret < 0 || !FD_ISSET (fdin, &set)) break; } if (mem) gst_memory_unref (mem); return TRUE; } static gboolean read_many (GstIpcPipelineComm * comm) { gboolean ret = TRUE; gsize available; const guint8 *payload; while (1) switch (comm->state) { case GST_IPC_PIPELINE_COMM_STATE_TYPE: { guint8 type; guint32 mapped_size; available = gst_adapter_available (comm->adapter); mapped_size = 1 + sizeof (gint32) * 2; if (available < mapped_size) goto done; payload = gst_adapter_map (comm->adapter, mapped_size); type = *payload++; g_mutex_lock (&comm->mutex); memcpy (&comm->id, payload, sizeof (guint32)); memcpy (&comm->payload_length, payload + 4, sizeof (guint32)); g_mutex_unlock (&comm->mutex); gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, mapped_size); GST_TRACE_OBJECT (comm->element, "Got id %u, type %d, payload %u", comm->id, type, comm->payload_length); switch (type) { case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: GST_TRACE_OBJECT (comm->element, "switching to state %s", gst_ipc_pipeline_comm_data_type_get_name (type)); comm->state = type; break; default: goto out_of_sync; } break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_ACK: { const guint8 *rets; guint32 ret32; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; if (available < sizeof (guint32)) goto ack_failed; rets = gst_adapter_map (comm->adapter, sizeof (guint32)); memcpy (&ret32, rets, sizeof (ret32)); gst_adapter_unmap (comm->adapter); gst_adapter_flush (comm->adapter, sizeof (guint32)); GST_TRACE_OBJECT (comm->element, "Got ACK %s for id %u", gst_flow_get_name (ret32), comm->id); g_mutex_lock (&comm->mutex); gst_ipc_pipeline_comm_reply_request (comm, comm->id, ret32, NULL); g_mutex_unlock (&comm->mutex); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY_RESULT: { GstQuery *query = NULL; gboolean qret; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; qret = gst_ipc_pipeline_comm_read_query_result (comm, comm->payload_length, &query); GST_TRACE_OBJECT (comm->element, "deserialized query result %p: %d, %" GST_PTR_FORMAT, query, qret, query); g_mutex_lock (&comm->mutex); gst_ipc_pipeline_comm_reply_request (comm, comm->id, qret, query); g_mutex_unlock (&comm->mutex); gst_query_unref (query); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_BUFFER: { GstBuffer *buf; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; buf = gst_ipc_pipeline_comm_read_buffer (comm, comm->payload_length); if (!buf) goto buffer_failed; /* set caps and push */ GST_TRACE_OBJECT (comm->element, "deserialized buffer %p, pushing, timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT ", size %" G_GSIZE_FORMAT ", flags 0x%x", buf, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf), gst_buffer_get_size (buf), GST_BUFFER_FLAGS (buf)); gst_mini_object_set_qdata (GST_MINI_OBJECT (buf), QUARK_ID, GINT_TO_POINTER (comm->id), NULL); if (comm->on_buffer) (*comm->on_buffer) (comm->id, buf, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_EVENT: { GstEvent *event; gboolean upstream; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; event = gst_ipc_pipeline_comm_read_event (comm, comm->payload_length, &upstream); if (!event) goto event_failed; GST_TRACE_OBJECT (comm->element, "deserialized event %p of type %s", event, gst_event_type_get_name (event->type)); gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, GINT_TO_POINTER (comm->id), NULL); if (comm->on_event) (*comm->on_event) (comm->id, event, upstream, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_SINK_MESSAGE_EVENT: { GstEvent *event; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; event = gst_ipc_pipeline_comm_read_sink_message_event (comm, comm->payload_length); if (!event) goto event_failed; GST_TRACE_OBJECT (comm->element, "deserialized sink message event %p", event); gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_ID, GINT_TO_POINTER (comm->id), NULL); if (comm->on_event) (*comm->on_event) (comm->id, event, FALSE, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_QUERY: { GstQuery *query; gboolean upstream; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; query = gst_ipc_pipeline_comm_read_query (comm, comm->payload_length, &upstream); if (!query) goto query_failed; GST_TRACE_OBJECT (comm->element, "deserialized query %p of type %s", query, gst_query_type_get_name (query->type)); gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_ID, GINT_TO_POINTER (comm->id), NULL); if (comm->on_query) (*comm->on_query) (comm->id, query, upstream, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_CHANGE: { guint32 transition; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; if (!gst_ipc_pipeline_comm_read_state_change (comm, comm->payload_length, &transition)) goto state_change_failed; GST_TRACE_OBJECT (comm->element, "deserialized state change request: %s -> %s", gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)), gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition))); if (comm->on_state_change) (*comm->on_state_change) (comm->id, transition, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_STATE_LOST: { available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; if (!gst_ipc_pipeline_comm_read_state_lost (comm, comm->payload_length)) goto event_failed; GST_TRACE_OBJECT (comm->element, "deserialized state-lost"); if (comm->on_state_lost) (*comm->on_state_lost) (comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_MESSAGE: { GstMessage *message; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; message = gst_ipc_pipeline_comm_read_message (comm, comm->payload_length); if (!message) goto message_failed; GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", message, gst_message_type_get_name (message->type)); if (comm->on_message) (*comm->on_message) (comm->id, message, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } case GST_IPC_PIPELINE_COMM_DATA_TYPE_GERROR_MESSAGE: { GstMessage *message; available = gst_adapter_available (comm->adapter); if (available < comm->payload_length) goto done; message = gst_ipc_pipeline_comm_read_gerror_message (comm, comm->payload_length); if (!message) goto message_failed; GST_TRACE_OBJECT (comm->element, "deserialized message %p of type %s", message, gst_message_type_get_name (message->type)); if (comm->on_message) (*comm->on_message) (comm->id, message, comm->user_data); GST_TRACE_OBJECT (comm->element, "switching to state TYPE"); comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; break; } } done: return ret; /* ERRORS */ out_of_sync: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("Socket out of sync")); ret = FALSE; goto done; } state_change_failed: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("could not read state change from fd")); ret = FALSE; goto done; } ack_failed: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("could not read ack from fd")); ret = FALSE; goto done; } buffer_failed: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("could not read buffer from fd")); ret = FALSE; goto done; } event_failed: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("could not read event from fd")); ret = FALSE; goto done; } message_failed: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("could not read message from fd")); ret = FALSE; goto done; } query_failed: { GST_ELEMENT_ERROR (comm->element, STREAM, DECODE, (NULL), ("could not read query from fd")); ret = FALSE; goto done; } } static gpointer reader_thread (gpointer data) { GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data; g_atomic_int_set (&comm->thread_running, 1); while (!comm->reader_thread_stopping) { if (!update_adapter (comm)) { if (comm->reader_thread_stopping) { GST_INFO_OBJECT (comm->element, "We're stopping, all good"); break; } GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL), ("Failed to read from socket")); break; } read_many (comm); } GST_INFO_OBJECT (comm->element, "Reader thread ending"); g_atomic_int_set (&comm->thread_running, 0); return NULL; } gboolean gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm, void (*on_buffer) (guint32, GstBuffer *, gpointer), void (*on_event) (guint32, GstEvent *, gboolean, gpointer), void (*on_query) (guint32, GstQuery *, gboolean, gpointer), void (*on_state_change) (guint32, GstStateChange, gpointer), void (*on_state_lost) (gpointer), void (*on_message) (guint32, GstMessage *, gpointer), gpointer user_data) { if (comm->reader_thread) return FALSE; comm->reader_thread_stopping = FALSE; comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE; comm->on_buffer = on_buffer; comm->on_event = on_event; comm->on_query = on_query; comm->on_state_change = on_state_change; comm->on_state_lost = on_state_lost; comm->on_message = on_message; comm->user_data = user_data; if (pipe (comm->reader_thread_stopping_pipe) < 0) { GST_WARNING_OBJECT (comm->element, "Failed to create pipes"); return FALSE; } comm->reader_thread = g_thread_new ("reader", (GThreadFunc) reader_thread, comm); return TRUE; } void gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm) { char dummy = 0; if (!comm->reader_thread) return; while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0 && errno == EINTR); g_thread_join (comm->reader_thread); close (comm->reader_thread_stopping_pipe[0]); close (comm->reader_thread_stopping_pipe[1]); comm->reader_thread = NULL; } static gchar * gst_value_serialize_event (const GValue * value) { const GstStructure *structure; GstEvent *ev; gchar *type, *ts, *seqnum, *rt_offset, *str, *str64, *s; GValue val = G_VALUE_INIT; ev = g_value_get_boxed (value); g_value_init (&val, gst_event_type_get_type ()); g_value_set_enum (&val, ev->type); type = gst_value_serialize (&val); g_value_unset (&val); g_value_init (&val, G_TYPE_UINT64); g_value_set_uint64 (&val, ev->timestamp); ts = gst_value_serialize (&val); g_value_unset (&val); g_value_init (&val, G_TYPE_UINT); g_value_set_uint (&val, ev->seqnum); seqnum = gst_value_serialize (&val); g_value_unset (&val); g_value_init (&val, G_TYPE_INT64); g_value_set_int64 (&val, gst_event_get_running_time_offset (ev)); rt_offset = gst_value_serialize (&val); g_value_unset (&val); structure = gst_event_get_structure (ev); str = gst_structure_to_string (structure); str64 = g_base64_encode ((guchar *) str, strlen (str) + 1); g_strdelimit (str64, "=", '_'); g_free (str); s = g_strconcat (type, ":", ts, ":", seqnum, ":", rt_offset, ":", str64, NULL); g_free (type); g_free (ts); g_free (seqnum); g_free (rt_offset); g_free (str64); return s; } static gboolean gst_value_deserialize_event (GValue * dest, const gchar * s) { GstEvent *ev = NULL; GValue val = G_VALUE_INIT; gboolean ret = FALSE; gchar **fields; gsize len; fields = g_strsplit (s, ":", -1); if (g_strv_length (fields) != 5) goto wrong_length; g_strdelimit (fields[4], "_", '='); g_base64_decode_inplace (fields[4], &len); g_value_init (&val, gst_event_type_get_type ()); if (!gst_value_deserialize (&val, fields[0])) goto fail; ev = gst_event_new_custom (g_value_get_enum (&val), gst_structure_new_from_string (fields[4])); g_value_unset (&val); g_value_init (&val, G_TYPE_UINT64); if (!gst_value_deserialize (&val, fields[1])) goto fail; ev->timestamp = g_value_get_uint64 (&val); g_value_unset (&val); g_value_init (&val, G_TYPE_UINT); if (!gst_value_deserialize (&val, fields[2])) goto fail; ev->seqnum = g_value_get_uint (&val); g_value_unset (&val); g_value_init (&val, G_TYPE_INT64); if (!gst_value_deserialize (&val, fields[3])) goto fail; gst_event_set_running_time_offset (ev, g_value_get_int64 (&val)); g_value_take_boxed (dest, ev); ev = NULL; ret = TRUE; fail: g_clear_pointer (&ev, gst_event_unref); g_value_unset (&val); wrong_length: g_strfreev (fields); return ret; } #define REGISTER_SERIALIZATION_NO_COMPARE(_gtype, _type) \ G_STMT_START { \ static GstValueTable gst_value = \ { 0, NULL, \ gst_value_serialize_ ## _type, gst_value_deserialize_ ## _type }; \ gst_value.type = _gtype; \ gst_value_register (&gst_value); \ } G_STMT_END void gst_ipc_pipeline_comm_plugin_init (void) { static volatile gsize once = 0; if (g_once_init_enter (&once)) { GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_comm_debug, "ipcpipelinecomm", 0, "ipc pipeline comm"); QUARK_ID = g_quark_from_static_string ("ipcpipeline-id"); REGISTER_SERIALIZATION_NO_COMPARE (gst_event_get_type (), event); g_once_init_leave (&once, (gsize) 1); } }