ext/raw1394/gstdv1394src.c: Make interruptible, so it won't block forever in a read().

Original commit message from CVS:
2005-10-07  Andy Wingo  <wingo@pobox.com>

* ext/raw1394/gstdv1394src.c: Make interruptible, so it won't
block forever in a read().
This commit is contained in:
Andy Wingo 2005-10-07 15:24:24 +00:00
parent 19712f28f6
commit aeb4ab082e
4 changed files with 123 additions and 20 deletions

View file

@ -1,5 +1,8 @@
2005-10-07 Andy Wingo <wingo@pobox.com>
* ext/raw1394/gstdv1394src.c: Make interruptible, so it won't
block forever in a read().
* ext/raw1394/gstdv1394src.c: Clean up for style before doing some
hacking. The only change should be that the state change stuff was
put into basesrc's start() and stop() routines, which coalesces

View file

@ -144,7 +144,7 @@ static gboolean gst_signal_processor_src_activate_pull (GstPad * pad,
static gboolean gst_signal_processor_sink_activate_push (GstPad * pad,
gboolean active);
static GstStateChangeReturn gst_signal_processor_change_state (GstElement *
element);
element, GstStateChange transition);
static gboolean gst_signal_processor_event (GstPad * pad, GstEvent * event);
static GstFlowReturn gst_signal_processor_getrange (GstPad * pad,

View file

@ -23,14 +23,40 @@
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <libavc1394/avc1394.h>
#include <libavc1394/avc1394_vcr.h>
#include <libavc1394/rom1394.h>
#include <libraw1394/raw1394.h>
#include <gst/gst.h>
#include "gstdv1394src.h"
#define CONTROL_STOP 'S' /* stop the select call */
#define CONTROL_SOCKETS(src) src->control_sock
#define WRITE_SOCKET(src) src->control_sock[1]
#define READ_SOCKET(src) src->control_sock[0]
#define SEND_COMMAND(src, command) \
G_STMT_START { \
unsigned char c; c = command; \
write (WRITE_SOCKET(src), &c, 1); \
} G_STMT_END
#define READ_COMMAND(src, command, res) \
G_STMT_START { \
res = read(READ_SOCKET(src), &command, 1); \
} G_STMT_END
GST_DEBUG_CATEGORY_STATIC (dv1394src_debug);
#define GST_CAT_DEFAULT (dv1394src_debug)
@ -91,6 +117,7 @@ static void gst_dv1394src_get_property (GObject * object, guint prop_id,
static gboolean gst_dv1394src_start (GstBaseSrc * bsrc);
static gboolean gst_dv1394src_stop (GstBaseSrc * bsrc);
static gboolean gst_dv1394src_unlock (GstBaseSrc * bsrc);
static GstFlowReturn gst_dv1394src_create (GstPushSrc * psrc, GstBuffer ** buf);
@ -181,6 +208,8 @@ gst_dv1394src_class_init (GstDV1394SrcClass * klass)
gstbasesrc_class->negotiate = NULL;
gstbasesrc_class->start = gst_dv1394src_start;
gstbasesrc_class->stop = gst_dv1394src_stop;
gstbasesrc_class->unlock = gst_dv1394src_unlock;
gstpushsrc_class->create = gst_dv1394src_create;
}
@ -195,7 +224,6 @@ gst_dv1394src_init (GstDV1394Src * dv1394src, GstDV1394SrcClass * klass)
gst_pad_set_query_function (srcpad, gst_dv1394src_query);
gst_pad_set_query_type_function (srcpad, gst_dv1394src_get_query_types);
dv1394src->dv_lock = g_mutex_new ();
dv1394src->port = DEFAULT_PORT;
dv1394src->channel = DEFAULT_CHANNEL;
@ -205,6 +233,9 @@ gst_dv1394src_init (GstDV1394Src * dv1394src, GstDV1394SrcClass * klass)
dv1394src->use_avc = DEFAULT_USE_AVC;
dv1394src->guid = DEFAULT_GUID;
READ_SOCKET (dv1394src) = -1;
WRITE_SOCKET (dv1394src) = -1;
/* initialized when first header received */
dv1394src->frame_size = 0;
@ -302,7 +333,7 @@ gst_dv1394src_iso_receive (raw1394handle_t handle, int channel, size_t len,
*/
if (section_type == 0 && dif_sequence == 0) { // dif header
if (!dv1394src->negotiated) {
if (!GST_PAD_CAPS (GST_BASE_SRC_PAD (dv1394src))) {
GstCaps *caps;
// figure format (NTSC/PAL)
@ -326,7 +357,6 @@ gst_dv1394src_iso_receive (raw1394handle_t handle, int channel, size_t len,
}
gst_pad_set_caps (GST_BASE_SRC_PAD (dv1394src), caps);
gst_caps_unref (caps);
dv1394src->negotiated = TRUE;
}
// drop last frame when not complete
if (!dv1394src->drop_incomplete
@ -417,20 +447,67 @@ gst_dv1394src_create (GstPushSrc * psrc, GstBuffer ** buf)
{
GstDV1394Src *dv1394src = GST_DV1394SRC (psrc);
GstCaps *caps;
struct pollfd pollfds[2];
GST_DV_LOCK (dv1394src);
dv1394src->buf = NULL;
while (dv1394src->buf == NULL)
raw1394_loop_iterate (dv1394src->handle);
pollfds[0].fd = raw1394_get_fd (dv1394src->handle);
pollfds[0].events = POLLIN | POLLERR | POLLHUP | POLLPRI;
pollfds[1].fd = READ_SOCKET (dv1394src);
pollfds[1].events = POLLIN | POLLERR | POLLHUP | POLLPRI;
if (dv1394src->buf) {
/* maybe we had an error before, and there's a stale buffer? */
gst_buffer_unref (dv1394src->buf);
dv1394src->buf = NULL;
}
while (TRUE) {
int res = poll (pollfds, 2, -1);
if (res < 0) {
if (errno == EAGAIN || errno == EINTR)
continue;
else
goto error_while_polling;
}
if (pollfds[1].revents) {
char command;
g_print ("told to stop!\n");
if (pollfds[1].revents & POLLIN)
READ_COMMAND (dv1394src, command, res);
goto told_to_stop;
} else if (pollfds[0].revents & POLLIN) {
/* shouldn't block in theory */
raw1394_loop_iterate (dv1394src->handle);
if (dv1394src->buf)
break;
}
}
g_assert (dv1394src->buf);
caps = gst_pad_get_caps (GST_BASE_SRC_PAD (psrc));
gst_buffer_set_caps (dv1394src->buf, caps);
gst_caps_unref (caps);
*buf = dv1394src->buf;
GST_DV_UNLOCK (dv1394src);
dv1394src->buf = NULL;
return GST_FLOW_OK;
error_while_polling:
{
GST_ELEMENT_ERROR (dv1394src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
return GST_FLOW_UNEXPECTED;
}
told_to_stop:
{
GST_DEBUG_OBJECT (dv1394src, "told to stop, shutting down");
return GST_FLOW_WRONG_STATE;
}
}
static int
@ -504,6 +581,16 @@ static gboolean
gst_dv1394src_start (GstBaseSrc * bsrc)
{
GstDV1394Src *src = GST_DV1394SRC (bsrc);
int control_sock[2];
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
goto socket_pair;
READ_SOCKET (src) = control_sock[0];
WRITE_SOCKET (src) = control_sock[1];
fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
src->handle = raw1394_new_handle ();
@ -542,6 +629,12 @@ gst_dv1394src_start (GstBaseSrc * bsrc)
return TRUE;
socket_pair:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
GST_ERROR_SYSTEM);
return FALSE;
}
no_handle:
{
GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
@ -573,7 +666,11 @@ gst_dv1394src_stop (GstBaseSrc * bsrc)
{
GstDV1394Src *src = GST_DV1394SRC (bsrc);
GST_DV_LOCK (src);
close (READ_SOCKET (src));
close (WRITE_SOCKET (src));
READ_SOCKET (src) = -1;
WRITE_SOCKET (src) = -1;
raw1394_stop_iso_rcv (src->handle, src->channel);
if (src->use_avc) {
/* pause the VCR */
@ -582,8 +679,6 @@ gst_dv1394src_stop (GstBaseSrc * bsrc)
!= AVC1394_VCR_OPERAND_PLAY_FORWARD_PAUSE))
avc1394_vcr_pause (src->handle, src->avc_node);
}
GST_DV_UNLOCK (src);
src->negotiated = FALSE;
if (src->use_avc)
/* stop the VCR */
@ -594,6 +689,17 @@ gst_dv1394src_stop (GstBaseSrc * bsrc)
return TRUE;
}
static gboolean
gst_dv1394src_unlock (GstBaseSrc * bsrc)
{
GstDV1394Src *src = GST_DV1394SRC (bsrc);
g_print ("sending command!\n");
SEND_COMMAND (src, CONTROL_STOP);
return TRUE;
}
static gboolean
gst_dv1394src_convert (GstPad * pad,
GstFormat src_format, gint64 src_value,

View file

@ -43,10 +43,6 @@ G_BEGIN_DECLS
typedef struct _GstDV1394Src GstDV1394Src;
typedef struct _GstDV1394SrcClass GstDV1394SrcClass;
#define GST_DV_GET_LOCK(dv) (GST_DV1394SRC (dv)->dv_lock)
#define GST_DV_LOCK(dv) g_mutex_lock(GST_DV_GET_LOCK (dv))
#define GST_DV_UNLOCK(dv) g_mutex_unlock(GST_DV_GET_LOCK (dv))
struct _GstDV1394Src {
GstPushSrc element;
@ -55,8 +51,6 @@ struct _GstDV1394Src {
gint skip;
gboolean drop_incomplete;
GMutex *dv_lock;
gint num_ports;
gint port;
gint channel;
@ -75,7 +69,7 @@ struct _GstDV1394Src {
guint bytes_in_frame;
guint frame_sequence;
gboolean negotiated;
int control_sock[2];
gchar *uri;
};