RTSP: Add support for server tunneling

Save the tunnelid in the connection. Add a method to retrieve the tunnelid so
that a server can store and match the id against other tunnel requests.

Fix the URI in the tunnel requests so that they contain the absolute uri and the
query string if any instead of just the hostname.

Transparently base64 decode the input stream when tunneling.

Add method to set the connection ip address so that it can be included in the
tunnel response.

Add method to connect the two tunnel requests.

Add two callbacks for the async mode to notify a tunnel start and tunnel
complete event.

Add method to reset the watch after the connection has been tunneled.

Various little refactoring to make more stuff reusable.

API: RTSP::gst_rtsp_connection_set_ip()
API: RTSP::gst_rtsp_connection_get_tunnelid()
API: RTSP::gst_rtsp_connection_do_tunnel()
API: RTSP::gst_rtsp_watch_reset()
This commit is contained in:
Wim Taymans 2009-03-04 12:21:29 +01:00
parent 3b6e9fc870
commit b6d7a1dc03
4 changed files with 433 additions and 82 deletions

View file

@ -1229,17 +1229,23 @@ gst_rtsp_connection_clear_auth_params
gst_rtsp_connection_set_qos_dscp gst_rtsp_connection_set_qos_dscp
gst_rtsp_connection_set_ip
gst_rtsp_connection_get_ip gst_rtsp_connection_get_ip
gst_rtsp_connection_get_url gst_rtsp_connection_get_url
gst_rtsp_connection_set_tunneled gst_rtsp_connection_set_tunneled
gst_rtsp_connection_is_tunneled gst_rtsp_connection_is_tunneled
gst_rtsp_connection_get_tunnelid
gst_rtsp_connection_do_tunnel
GstRTSPWatch GstRTSPWatch
GstRTSPWatchFuncs GstRTSPWatchFuncs
gst_rtsp_watch_new gst_rtsp_watch_new
gst_rtsp_watch_unref gst_rtsp_watch_unref
gst_rtsp_watch_attach gst_rtsp_watch_attach
gst_rtsp_watch_reset
gst_rtsp_watch_queue_message gst_rtsp_watch_queue_message
</SECTION> </SECTION>

View file

@ -95,8 +95,18 @@
#include "gstrtspbase64.h" #include "gstrtspbase64.h"
#include "md5.h" #include "md5.h"
typedef struct
{
gint state;
guint save;
gchar in[4];
guint cin;
gchar out[3];
guint cout;
} DecodeCtx;
static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx, static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx,
guint size); guint size, DecodeCtx * ctxp);
static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key, static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
guint keysize, gchar ** value); guint keysize, gchar ** value);
static void parse_string (gchar * dest, gint size, gchar ** src); static void parse_string (gchar * dest, gint size, gchar ** src);
@ -137,6 +147,16 @@ G_STMT_START { \
} \ } \
} G_STMT_END } G_STMT_END
typedef enum
{
TUNNEL_STATE_NONE,
TUNNEL_STATE_GET,
TUNNEL_STATE_POST,
TUNNEL_STATE_COMPLETE
} GstRTSPTunnelState;
#define TUNNELID_LEN 24
struct _GstRTSPConnection struct _GstRTSPConnection
{ {
/*< private > */ /*< private > */
@ -150,7 +170,9 @@ struct _GstRTSPConnection
GstPollFD *readfd; GstPollFD *readfd;
GstPollFD *writefd; GstPollFD *writefd;
gchar tunnelid[TUNNELID_LEN];
gboolean tunneled; gboolean tunneled;
GstRTSPTunnelState tstate;
GstPoll *fdset; GstPoll *fdset;
gchar *ip; gchar *ip;
@ -166,6 +188,9 @@ struct _GstRTSPConnection
gchar *username; gchar *username;
gchar *passwd; gchar *passwd;
GHashTable *auth_params; GHashTable *auth_params;
DecodeCtx ctx;
DecodeCtx *ctxp;
}; };
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
@ -342,6 +367,7 @@ gst_rtsp_connection_accept (gint sock, GstRTSPConnection ** conn)
gst_rtsp_connection_create (url, &newconn); gst_rtsp_connection_create (url, &newconn);
ADD_POLLFD (newconn->fdset, &newconn->fd0, fd); ADD_POLLFD (newconn->fdset, &newconn->fd0, fd);
/* both read and write initially */
newconn->readfd = &newconn->fd0; newconn->readfd = &newconn->fd0;
newconn->writefd = &newconn->fd0; newconn->writefd = &newconn->fd0;
@ -499,7 +525,6 @@ timeout:
static GstRTSPResult static GstRTSPResult
setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout) setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
{ {
gchar sessionid[24];
gint i; gint i;
GstRTSPResult res; GstRTSPResult res;
gchar *str; gchar *str;
@ -508,20 +533,25 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
GstClockTime to; GstClockTime to;
const gchar *ip; const gchar *ip;
guint16 port; guint16 port;
gchar codestr[4]; gchar codestr[4], *resultstr;
gint code; gint code;
GstRTSPUrl *url;
/* create a random sessionid */ /* create a random sessionid */
for (i = 0; i < 24; i++) for (i = 0; i < TUNNELID_LEN; i++)
sessionid[i] = g_random_int_range ('a', 'z'); conn->tunnelid[i] = g_random_int_range ('a', 'z');
sessionid[23] = '\0'; conn->tunnelid[TUNNELID_LEN - 1] = '\0';
url = conn->url;
/* */ /* */
str = g_strdup_printf ("GET %s HTTP/1.0\r\n" str = g_strdup_printf ("GET %s%s%s HTTP/1.0\r\n"
"x-sessioncookie: %s\r\n" "x-sessioncookie: %s\r\n"
"Accept: application/x-rtsp-tunnelled\r\n" "Accept: application/x-rtsp-tunnelled\r\n"
"Pragma: no-cache\r\n" "Pragma: no-cache\r\n"
"Cache-Control: no-cache\r\n" "\r\n", conn->url->host, sessionid); "Cache-Control: no-cache\r\n" "\r\n",
url->abspath, url->query ? "?" : "", url->query ? url->query : "",
conn->tunnelid);
/* we start by writing to this fd */ /* we start by writing to this fd */
conn->writefd = &conn->fd0; conn->writefd = &conn->fd0;
@ -542,7 +572,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
idx = 0; idx = 0;
while (TRUE) { while (TRUE) {
res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer)); res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL);
if (res == GST_RTSP_EEOF) if (res == GST_RTSP_EEOF)
goto eof; goto eof;
if (res == GST_RTSP_OK) if (res == GST_RTSP_OK)
@ -583,6 +613,11 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
parse_string (codestr, sizeof (codestr), &bptr); parse_string (codestr, sizeof (codestr), &bptr);
code = atoi (codestr); code = atoi (codestr);
while (g_ascii_isspace (*bptr))
bptr++;
resultstr = bptr;
if (code != 200) if (code != 200)
goto wrong_result; goto wrong_result;
} else { } else {
@ -617,14 +652,16 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
conn->writefd = &conn->fd1; conn->writefd = &conn->fd1;
/* */ /* */
str = g_strdup_printf ("POST %s HTTP/1.0\r\n" str = g_strdup_printf ("POST %s%s%s HTTP/1.0\r\n"
"x-sessioncookie: %s\r\n" "x-sessioncookie: %s\r\n"
"Content-Type: application/x-rtsp-tunnelled\r\n" "Content-Type: application/x-rtsp-tunnelled\r\n"
"Pragma: no-cache\r\n" "Pragma: no-cache\r\n"
"Cache-Control: no-cache\r\n" "Cache-Control: no-cache\r\n"
"Content-Length: 32767\r\n" "Content-Length: 32767\r\n"
"Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n" "Expires: Sun, 9 Jan 1972 00:00:00 GMT\r\n"
"\r\n", conn->url->host, sessionid); "\r\n",
url->abspath, url->query ? "?" : "", url->query ? url->query : "",
conn->tunnelid);
/* we start by writing to this fd */ /* we start by writing to this fd */
conn->writefd = &conn->fd1; conn->writefd = &conn->fd1;
@ -664,7 +701,7 @@ stopped:
} }
wrong_result: wrong_result:
{ {
GST_ERROR ("got failure response %d %s", code, codestr); GST_ERROR ("got failure response %d %s", code, resultstr);
return GST_RTSP_ERROR; return GST_RTSP_ERROR;
} }
not_resolved: not_resolved:
@ -881,12 +918,10 @@ add_auth_header (GstRTSPConnection * conn, GstRTSPMessage * message)
} }
static void static void
add_date_header (GstRTSPMessage * message) gen_date_string (gchar * date_string, guint len)
{ {
GTimeVal tv; GTimeVal tv;
gchar date_string[100];
time_t t; time_t t;
#ifdef HAVE_GMTIME_R #ifdef HAVE_GMTIME_R
struct tm tm_; struct tm tm_;
#endif #endif
@ -895,14 +930,10 @@ add_date_header (GstRTSPMessage * message)
t = (time_t) tv.tv_sec; t = (time_t) tv.tv_sec;
#ifdef HAVE_GMTIME_R #ifdef HAVE_GMTIME_R
strftime (date_string, sizeof (date_string), "%a, %d %b %Y %H:%M:%S GMT", strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime_r (&t, &tm_));
gmtime_r (&t, &tm_));
#else #else
strftime (date_string, sizeof (date_string), "%a, %d %b %Y %H:%M:%S GMT", strftime (date_string, len, "%a, %d %b %Y %H:%M:%S GMT", gmtime (&t));
gmtime (&t));
#endif #endif
gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
} }
static GstRTSPResult static GstRTSPResult
@ -934,8 +965,51 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
return GST_RTSP_OK; return GST_RTSP_OK;
} }
static gint
fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx)
{
gint out = 0;
if (ctx) {
gint r;
while (size > 0) {
while (size > 0 && ctx->cout < 3) {
/* we have some leftover bytes */
*buffer++ = ctx->out[ctx->cout];
ctx->cout++;
size--;
out++;
}
/* nothing in the buffer */
if (size == 0)
break;
/* try to read more bytes */
r = READ_SOCKET (fd, &ctx->in[ctx->cin], 4 - ctx->cin);
if (r <= 0) {
if (out == 0)
out = r;
break;
}
ctx->cin += r;
if (ctx->cin == 4) {
r = g_base64_decode_step ((const gchar *) ctx->in, 4,
(guchar *) ctx->out, &ctx->state, &ctx->save);
ctx->cout = 0;
ctx->cin = 0;
}
}
} else {
out = READ_SOCKET (fd, buffer, size);
}
return out;
}
static GstRTSPResult static GstRTSPResult
read_bytes (gint fd, guint8 * buffer, guint * idx, guint size) read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
{ {
guint left; guint left;
@ -947,7 +1021,7 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size)
while (left) { while (left) {
gint r; gint r;
r = READ_SOCKET (fd, &buffer[*idx], left); r = fill_bytes (fd, &buffer[*idx], left, ctx);
if (r == 0) { if (r == 0) {
return GST_RTSP_EEOF; return GST_RTSP_EEOF;
} else if (r < 0) { } else if (r < 0) {
@ -964,13 +1038,13 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size)
} }
static GstRTSPResult static GstRTSPResult
read_line (gint fd, guint8 * buffer, guint * idx, guint size) read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
{ {
while (TRUE) { while (TRUE) {
guint8 c; guint8 c;
gint r; gint r;
r = READ_SOCKET (fd, &c, 1); r = fill_bytes (fd, &c, 1, ctx);
if (r == 0) { if (r == 0) {
return GST_RTSP_EEOF; return GST_RTSP_EEOF;
} else if (r < 0) { } else if (r < 0) {
@ -1128,8 +1202,12 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
/* append headers and body */ /* append headers and body */
if (message->type != GST_RTSP_MESSAGE_DATA) { if (message->type != GST_RTSP_MESSAGE_DATA) {
gchar date_string[100];
gen_date_string (date_string, sizeof (date_string));
/* add date header */ /* add date header */
add_date_header (message); gst_rtsp_message_add_header (message, GST_RTSP_HDR_DATE, date_string);
/* append headers */ /* append headers */
gst_rtsp_message_append_headers (message, str); gst_rtsp_message_append_headers (message, str);
@ -1280,7 +1358,8 @@ parse_error:
} }
static GstRTSPResult static GstRTSPResult
parse_request_line (guint8 * buffer, GstRTSPMessage * msg) parse_request_line (GstRTSPConnection * conn, guint8 * buffer,
GstRTSPMessage * msg)
{ {
GstRTSPResult res = GST_RTSP_OK; GstRTSPResult res = GST_RTSP_OK;
gchar versionstr[20]; gchar versionstr[20];
@ -1288,35 +1367,68 @@ parse_request_line (guint8 * buffer, GstRTSPMessage * msg)
gchar urlstr[4096]; gchar urlstr[4096];
gchar *bptr; gchar *bptr;
GstRTSPMethod method; GstRTSPMethod method;
GstRTSPTunnelState tstate = TUNNEL_STATE_NONE;
bptr = (gchar *) buffer; bptr = (gchar *) buffer;
parse_string (methodstr, sizeof (methodstr), &bptr); parse_string (methodstr, sizeof (methodstr), &bptr);
method = gst_rtsp_find_method (methodstr); method = gst_rtsp_find_method (methodstr);
if (method == GST_RTSP_INVALID) {
/* a tunnel request is allowed when we don't have one yet */
if (conn->tstate != TUNNEL_STATE_NONE)
goto invalid_method;
/* we need GET or POST for a valid tunnel request */
if (!strcmp (methodstr, "GET"))
tstate = TUNNEL_STATE_GET;
else if (!strcmp (methodstr, "POST"))
tstate = TUNNEL_STATE_POST;
else
goto invalid_method;
}
parse_string (urlstr, sizeof (urlstr), &bptr); parse_string (urlstr, sizeof (urlstr), &bptr);
if (*urlstr == '\0') if (*urlstr == '\0')
res = GST_RTSP_EPARSE; goto invalid_url;
parse_string (versionstr, sizeof (versionstr), &bptr); parse_string (versionstr, sizeof (versionstr), &bptr);
if (*bptr != '\0') if (*bptr != '\0')
res = GST_RTSP_EPARSE; goto invalid_version;
if (strcmp (versionstr, "RTSP/1.0") == 0) { if (strcmp (versionstr, "RTSP/1.0") == 0) {
if (gst_rtsp_message_init_request (msg, method, urlstr) != GST_RTSP_OK) res = gst_rtsp_message_init_request (msg, method, urlstr);
res = GST_RTSP_EPARSE;
} else if (strncmp (versionstr, "RTSP/", 5) == 0) { } else if (strncmp (versionstr, "RTSP/", 5) == 0) {
if (gst_rtsp_message_init_request (msg, method, urlstr) != GST_RTSP_OK) res = gst_rtsp_message_init_request (msg, method, urlstr);
msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
} else if (strcmp (versionstr, "HTTP/1.0") == 0) {
/* tunnel request, we need a tunnel method */
if (tstate == TUNNEL_STATE_NONE) {
res = GST_RTSP_EPARSE; res = GST_RTSP_EPARSE;
msg->type_data.request.version = GST_RTSP_VERSION_INVALID;
} else { } else {
gst_rtsp_message_init_request (msg, method, urlstr); conn->tstate = tstate;
msg->type_data.request.version = GST_RTSP_VERSION_INVALID; }
} else {
res = GST_RTSP_EPARSE; res = GST_RTSP_EPARSE;
} }
return res; return res;
/* ERRORS */
invalid_method:
{
GST_ERROR ("invalid method %s", methodstr);
return GST_RTSP_EPARSE;
}
invalid_url:
{
GST_ERROR ("invalid url %s", urlstr);
return GST_RTSP_EPARSE;
}
invalid_version:
{
GST_ERROR ("invalid version");
return GST_RTSP_EPARSE;
}
} }
static GstRTSPResult static GstRTSPResult
@ -1348,7 +1460,7 @@ no_column:
/* parsing lines means reading a Key: Value pair */ /* parsing lines means reading a Key: Value pair */
static GstRTSPResult static GstRTSPResult
parse_line (guint8 * buffer, GstRTSPMessage * msg) parse_line (GstRTSPConnection * conn, guint8 * buffer, GstRTSPMessage * msg)
{ {
GstRTSPResult res; GstRTSPResult res;
gchar key[32]; gchar key[32];
@ -1359,9 +1471,18 @@ parse_line (guint8 * buffer, GstRTSPMessage * msg)
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto parse_error; goto parse_error;
if (conn->tstate == TUNNEL_STATE_GET || conn->tstate == TUNNEL_STATE_POST) {
/* save the tunnel session in the connection */
if (!strcmp (key, "x-sessioncookie")) {
strncpy (conn->tunnelid, value, TUNNELID_LEN);
conn->tunnelid[TUNNELID_LEN - 1] = '\0';
conn->tunneled = TRUE;
}
} else {
field = gst_rtsp_find_header_field (key); field = gst_rtsp_find_header_field (key);
if (field != GST_RTSP_HDR_INVALID) if (field != GST_RTSP_HDR_INVALID)
gst_rtsp_message_add_header (msg, field, value); gst_rtsp_message_add_header (msg, field, value);
}
return GST_RTSP_OK; return GST_RTSP_OK;
@ -1390,7 +1511,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
builder->offset = 0; builder->offset = 0;
res = res =
read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
&builder->offset, 1); &builder->offset, 1, conn->ctxp);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1408,7 +1529,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
{ {
res = res =
read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, read_bytes (conn->readfd->fd, (guint8 *) builder->buffer,
&builder->offset, 4); &builder->offset, 4, conn->ctxp);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1425,7 +1546,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
{ {
res = res =
read_bytes (conn->readfd->fd, builder->body_data, &builder->offset, read_bytes (conn->readfd->fd, builder->body_data, &builder->offset,
builder->body_len); builder->body_len, conn->ctxp);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1442,7 +1563,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
case STATE_READ_LINES: case STATE_READ_LINES:
{ {
res = read_line (conn->readfd->fd, builder->buffer, &builder->offset, res = read_line (conn->readfd->fd, builder->buffer, &builder->offset,
sizeof (builder->buffer)); sizeof (builder->buffer), conn->ctxp);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1476,11 +1597,14 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
if (memcmp (builder->buffer, "RTSP", 4) == 0) { if (memcmp (builder->buffer, "RTSP", 4) == 0) {
res = parse_response_status (builder->buffer, message); res = parse_response_status (builder->buffer, message);
} else { } else {
res = parse_request_line (builder->buffer, message); res = parse_request_line (conn, builder->buffer, message);
} }
/* the first line must parse without errors */
if (res != GST_RTSP_OK)
goto done;
} else { } else {
/* else just parse the line */ /* else just parse the line, ignore errors */
parse_line (builder->buffer, message); parse_line (conn, builder->buffer, message);
} }
builder->line++; builder->line++;
builder->offset = 0; builder->offset = 0;
@ -1490,6 +1614,14 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
{ {
gchar *session_id; gchar *session_id;
if (conn->tstate == TUNNEL_STATE_GET) {
res = GST_RTSP_ETGET;
goto done;
} else if (conn->tstate == TUNNEL_STATE_POST) {
res = GST_RTSP_ETPOST;
goto done;
}
if (message->type == GST_RTSP_MESSAGE_DATA) { if (message->type == GST_RTSP_MESSAGE_DATA) {
/* data messages don't have headers */ /* data messages don't have headers */
res = GST_RTSP_OK; res = GST_RTSP_OK;
@ -1579,7 +1711,7 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
while (TRUE) { while (TRUE) {
res = read_bytes (conn->readfd->fd, data, &offset, size); res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp);
if (res == GST_RTSP_EEOF) if (res == GST_RTSP_EEOF)
goto eof; goto eof;
if (res == GST_RTSP_OK) if (res == GST_RTSP_OK)
@ -1628,6 +1760,38 @@ read_error:
} }
} }
static GString *
gen_tunnel_reply (GstRTSPConnection * conn, GstRTSPStatusCode code)
{
GString *str;
gchar date_string[100];
const gchar *status;
gen_date_string (date_string, sizeof (date_string));
status = gst_rtsp_status_as_text (code);
if (status == NULL) {
code = 500;
status = "Internal Server Error";
}
str = g_string_new ("");
/* */
g_string_append_printf (str, "HTTP/1.0 %d %s\r\n", code, status);
g_string_append_printf (str,
"Server: GStreamer RTSP Server\r\n"
"Date: %s\r\n"
"Connection: close\r\n"
"Cache-Control: no-store\r\n" "Pragma: no-cache\r\n", date_string);
if (code == 200) {
if (conn->ip)
g_string_append_printf (str, "x-server-ip-address: %s\r\n", conn->ip);
g_string_append_printf (str,
"Content-Type: application/x-rtsp-tunnelled\r\n" "\r\n");
}
return str;
}
/** /**
* gst_rtsp_connection_receive: * gst_rtsp_connection_receive:
@ -1669,7 +1833,20 @@ gst_rtsp_connection_receive (GstRTSPConnection * conn, GstRTSPMessage * message,
goto eof; goto eof;
if (res == GST_RTSP_OK) if (res == GST_RTSP_OK)
break; break;
if (res != GST_RTSP_EINTR) if (res == GST_RTSP_ETGET) {
GString *str;
/* tunnel GET request, we can reply now */
str = gen_tunnel_reply (conn, GST_RTSP_STS_OK);
res =
gst_rtsp_connection_write (conn, (guint8 *) str->str, str->len,
timeout);
g_string_free (str, TRUE);
} else if (res == GST_RTSP_ETPOST) {
/* tunnel POST request, return the value, the caller now has to link the
* two connections. */
break;
} else if (res != GST_RTSP_EINTR)
goto read_error; goto read_error;
do { do {
@ -2186,6 +2363,24 @@ gst_rtsp_connection_get_ip (const GstRTSPConnection * conn)
return conn->ip; return conn->ip;
} }
/**
* gst_rtsp_connection_set_ip:
* @conn: a #GstRTSPConnection
* @ip: an ip address
*
* Set the IP address of the server.
*
* Since: 0.10.23
*/
void
gst_rtsp_connection_set_ip (GstRTSPConnection * conn, const gchar * ip)
{
g_return_if_fail (conn != NULL);
g_free (conn->ip);
conn->ip = g_strdup (ip);
}
/** /**
* gst_rtsp_connection_set_tunneled: * gst_rtsp_connection_set_tunneled:
* @conn: a #GstRTSPConnection * @conn: a #GstRTSPConnection
@ -2217,13 +2412,85 @@ gst_rtsp_connection_set_tunneled (GstRTSPConnection * conn, gboolean tunneled)
* Since: 0.10.23 * Since: 0.10.23
*/ */
gboolean gboolean
gst_rtsp_connection_is_tunneled (GstRTSPConnection * conn) gst_rtsp_connection_is_tunneled (const GstRTSPConnection * conn)
{ {
g_return_val_if_fail (conn != NULL, FALSE); g_return_val_if_fail (conn != NULL, FALSE);
return conn->tunneled; return conn->tunneled;
} }
/**
* gst_rtsp_connection_get_tunnelid:
* @conn: a #GstRTSPConnection
*
* Get the tunnel session id the connection.
*
* Returns: returns a non-empty string if @conn is being tunneled over HTTP.
*
* Since: 0.10.23
*/
const gchar *
gst_rtsp_connection_get_tunnelid (const GstRTSPConnection * conn)
{
g_return_val_if_fail (conn != NULL, NULL);
if (!conn->tunneled)
return NULL;
return conn->tunnelid;
}
/**
* gst_rtsp_connection_do_tunnel:
* @conn: a #GstRTSPConnection
* @conn2: a #GstRTSPConnection
*
* If @conn received the first tunnel connection and @conn2 received
* the second tunnel connection, link the two connections together so that
* @conn manages the tunneled connection.
*
* After this call, @conn2 cannot be used anymore and must be freed with
* gst_rtsp_connection_free().
*
* Returns: return GST_RTSP_OK on success.
*
* Since: 0.10.23
*/
GstRTSPResult
gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn,
GstRTSPConnection * conn2)
{
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn2 != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (conn->tstate == TUNNEL_STATE_GET, GST_RTSP_EINVAL);
g_return_val_if_fail (conn2->tstate == TUNNEL_STATE_POST, GST_RTSP_EINVAL);
g_return_val_if_fail (!memcmp (conn2->tunnelid, conn->tunnelid, TUNNELID_LEN),
GST_RTSP_EINVAL);
/* both connections have fd0 as the read/write socket. start by taking the
* socket from conn2 and set it as the socket in conn */
conn->fd1 = conn2->fd0;
/* clean up some of the state of conn2 */
gst_poll_remove_fd (conn2->fdset, &conn2->fd0);
conn2->fd0.fd = -1;
conn2->readfd = conn2->writefd = NULL;
/* We make fd0 the write socket and fd1 the read socket. */
conn->writefd = &conn->fd0;
conn->readfd = &conn->fd1;
conn->tstate = TUNNEL_STATE_COMPLETE;
/* we need base64 decoding for the readfd */
conn->ctx.state = 0;
conn->ctx.cin = 0;
conn->ctx.cout = 3;
conn->ctx.save = 0;
conn->ctxp = &conn->ctx;
return GST_RTSP_OK;
}
#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR) #define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR)
#define WRITE_COND (G_IO_OUT | G_IO_ERR) #define WRITE_COND (G_IO_OUT | G_IO_ERR)
@ -2234,6 +2501,9 @@ typedef struct
guint cseq; guint cseq;
} GstRTSPRec; } GstRTSPRec;
static GstRTSPRec *queue_response (GstRTSPWatch * watch, GString * str,
guint cseq);
/* async functions */ /* async functions */
struct _GstRTSPWatch struct _GstRTSPWatch
{ {
@ -2300,14 +2570,33 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
break; break;
if (res == GST_RTSP_EEOF) if (res == GST_RTSP_EEOF)
goto eof; goto eof;
if (res != GST_RTSP_OK) if (res == GST_RTSP_ETGET) {
GString *str;
GstRTSPStatusCode code;
if (watch->funcs.tunnel_start)
code = watch->funcs.tunnel_start (watch, watch->user_data);
else
code = GST_RTSP_STS_OK;
/* queue the response string */
str = gen_tunnel_reply (watch->conn, code);
queue_response (watch, str, -1);
} else if (res == GST_RTSP_ETPOST) {
/* in the callback the connection should be tunneled with the
* GET connection */
if (watch->funcs.tunnel_complete)
watch->funcs.tunnel_complete (watch, watch->user_data);
} else if (res != GST_RTSP_OK)
goto error; goto error;
if (res == GST_RTSP_OK) {
if (watch->funcs.message_received) if (watch->funcs.message_received)
watch->funcs.message_received (watch, &watch->message, watch->funcs.message_received (watch, &watch->message,
watch->user_data); watch->user_data);
gst_rtsp_message_unset (&watch->message); gst_rtsp_message_unset (&watch->message);
}
build_reset (&watch->builder); build_reset (&watch->builder);
} while (FALSE); } while (FALSE);
} }
@ -2339,7 +2628,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback,
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto error; goto error;
if (watch->funcs.message_sent) if (watch->funcs.message_sent && watch->write_cseq != -1)
watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data); watch->funcs.message_sent (watch, watch->write_cseq, watch->user_data);
done: done:
@ -2435,14 +2724,10 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
result->conn = conn; result->conn = conn;
result->builder.state = STATE_START; result->builder.state = STATE_START;
result->readfd.fd = conn->readfd->fd; result->readfd.fd = -1;
result->readfd.events = READ_COND; result->writefd.fd = -1;
result->readfd.revents = 0;
result->writefd.fd = conn->writefd->fd; gst_rtsp_watch_reset (result);
result->writefd.events = WRITE_COND;
result->writefd.revents = 0;
result->write_added = FALSE;
result->funcs = *funcs; result->funcs = *funcs;
result->user_data = user_data; result->user_data = user_data;
@ -2455,6 +2740,35 @@ gst_rtsp_watch_new (GstRTSPConnection * conn,
return result; return result;
} }
/**
* gst_rtsp_watch_reset:
* @watch: a #GstRTSPWatch
*
* Reset @watch, this is usually called after gst_rtsp_connection_do_tunnel()
* when the file descriptors of the connection might have changed.
*
* Since: 0.10.23
*/
void
gst_rtsp_watch_reset (GstRTSPWatch * watch)
{
if (watch->readfd.fd != -1)
g_source_remove_poll ((GSource *) watch, &watch->readfd);
if (watch->writefd.fd != -1)
g_source_remove_poll ((GSource *) watch, &watch->writefd);
watch->readfd.fd = watch->conn->readfd->fd;
watch->readfd.events = READ_COND;
watch->readfd.revents = 0;
watch->writefd.fd = watch->conn->writefd->fd;
watch->writefd.events = WRITE_COND;
watch->writefd.revents = 0;
watch->write_added = FALSE;
g_source_add_poll ((GSource *) watch, &watch->readfd);
}
/** /**
* gst_rtsp_watch_attach: * gst_rtsp_watch_attach:
* @watch: a #GstRTSPWatch * @watch: a #GstRTSPWatch
@ -2491,6 +2805,29 @@ gst_rtsp_watch_unref (GstRTSPWatch * watch)
g_source_unref ((GSource *) watch); g_source_unref ((GSource *) watch);
} }
static GstRTSPRec *
queue_response (GstRTSPWatch * watch, GString * str, guint cseq)
{
GstRTSPRec *data;
/* make a record with the message as a string ans cseq */
data = g_slice_new (GstRTSPRec);
data->str = str;
data->cseq = cseq;
/* add the record to a queue */
watch->messages = g_list_append (watch->messages, data);
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
g_source_add_poll ((GSource *) watch, &watch->writefd);
watch->write_added = TRUE;
}
return data;
}
/** /**
* gst_rtsp_watch_queue_message: * gst_rtsp_watch_queue_message:
* @watch: a #GstRTSPWatch * @watch: a #GstRTSPWatch
@ -2528,18 +2865,7 @@ gst_rtsp_watch_queue_message (GstRTSPWatch * watch, GstRTSPMessage * message)
} }
/* make a record with the message as a string ans cseq */ /* make a record with the message as a string ans cseq */
data = g_slice_new (GstRTSPRec); data = queue_response (watch, message_to_string (watch->conn, message), cseq);
data->str = message_to_string (watch->conn, message);
data->cseq = cseq;
/* add the record to a queue */
watch->messages = g_list_append (watch->messages, data);
/* make sure the main context will now also check for writability on the
* socket */
if (!watch->write_added) {
g_source_add_poll ((GSource *) watch, &watch->writefd);
watch->write_added = TRUE;
}
return cseq; return cseq;
} }

View file

@ -105,9 +105,14 @@ GstRTSPResult gst_rtsp_connection_set_qos_dscp (GstRTSPConnection *conn,
/* accessors */ /* accessors */
GstRTSPUrl * gst_rtsp_connection_get_url (const GstRTSPConnection *conn); GstRTSPUrl * gst_rtsp_connection_get_url (const GstRTSPConnection *conn);
const gchar * gst_rtsp_connection_get_ip (const GstRTSPConnection *conn); const gchar * gst_rtsp_connection_get_ip (const GstRTSPConnection *conn);
void gst_rtsp_connection_set_ip (GstRTSPConnection *conn, const gchar *ip);
/* tunneling */
void gst_rtsp_connection_set_tunneled (GstRTSPConnection *conn, gboolean tunneled); void gst_rtsp_connection_set_tunneled (GstRTSPConnection *conn, gboolean tunneled);
gboolean gst_rtsp_connection_is_tunneled (GstRTSPConnection *conn); gboolean gst_rtsp_connection_is_tunneled (const GstRTSPConnection *conn);
const gchar * gst_rtsp_connection_get_tunnelid (const GstRTSPConnection *conn);
GstRTSPResult gst_rtsp_connection_do_tunnel (GstRTSPConnection *conn, GstRTSPConnection *conn2);
/* async IO */ /* async IO */
@ -125,8 +130,15 @@ typedef struct _GstRTSPWatch GstRTSPWatch;
* @message_sent: callback when a message was sent * @message_sent: callback when a message was sent
* @closed: callback when the connection is closed * @closed: callback when the connection is closed
* @error: callback when an error occured * @error: callback when an error occured
* @tunnel_start: a client started a tunneled connection. The tunnelid of the
* connection must be saved.
* @tunnel_complete: a client finished a tunneled connection. In this callback
* you usually pair the tunnelid of this connection with the saved one using
* gst_rtsp_connection_do_tunnel().
* *
* Callback functions from a #GstRTSPWatch. * Callback functions from a #GstRTSPWatch.
*
* Since: 0.10.23
*/ */
typedef struct { typedef struct {
GstRTSPResult (*message_received) (GstRTSPWatch *watch, GstRTSPMessage *message, GstRTSPResult (*message_received) (GstRTSPWatch *watch, GstRTSPMessage *message,
@ -136,6 +148,8 @@ typedef struct {
GstRTSPResult (*closed) (GstRTSPWatch *watch, gpointer user_data); GstRTSPResult (*closed) (GstRTSPWatch *watch, gpointer user_data);
GstRTSPResult (*error) (GstRTSPWatch *watch, GstRTSPResult result, GstRTSPResult (*error) (GstRTSPWatch *watch, GstRTSPResult result,
gpointer user_data); gpointer user_data);
GstRTSPStatusCode (*tunnel_start) (GstRTSPWatch *watch, gpointer user_data);
GstRTSPResult (*tunnel_complete) (GstRTSPWatch *watch, gpointer user_data);
/*< private >*/ /*< private >*/
gpointer _gst_reserved[GST_PADDING]; gpointer _gst_reserved[GST_PADDING];
@ -145,6 +159,7 @@ GstRTSPWatch * gst_rtsp_watch_new (GstRTSPConnection *conn,
GstRTSPWatchFuncs *funcs, GstRTSPWatchFuncs *funcs,
gpointer user_data, gpointer user_data,
GDestroyNotify notify); GDestroyNotify notify);
void gst_rtsp_watch_reset (GstRTSPWatch *watch);
void gst_rtsp_watch_unref (GstRTSPWatch *watch); void gst_rtsp_watch_unref (GstRTSPWatch *watch);
guint gst_rtsp_watch_attach (GstRTSPWatch *watch, guint gst_rtsp_watch_attach (GstRTSPWatch *watch,

View file

@ -7,9 +7,11 @@ EXPORTS
gst_rtsp_connection_close gst_rtsp_connection_close
gst_rtsp_connection_connect gst_rtsp_connection_connect
gst_rtsp_connection_create gst_rtsp_connection_create
gst_rtsp_connection_do_tunnel
gst_rtsp_connection_flush gst_rtsp_connection_flush
gst_rtsp_connection_free gst_rtsp_connection_free
gst_rtsp_connection_get_ip gst_rtsp_connection_get_ip
gst_rtsp_connection_get_tunnelid
gst_rtsp_connection_get_url gst_rtsp_connection_get_url
gst_rtsp_connection_is_tunneled gst_rtsp_connection_is_tunneled
gst_rtsp_connection_next_timeout gst_rtsp_connection_next_timeout
@ -20,6 +22,7 @@ EXPORTS
gst_rtsp_connection_send gst_rtsp_connection_send
gst_rtsp_connection_set_auth gst_rtsp_connection_set_auth
gst_rtsp_connection_set_auth_param gst_rtsp_connection_set_auth_param
gst_rtsp_connection_set_ip
gst_rtsp_connection_set_qos_dscp gst_rtsp_connection_set_qos_dscp
gst_rtsp_connection_set_tunneled gst_rtsp_connection_set_tunneled
gst_rtsp_connection_write gst_rtsp_connection_write
@ -95,4 +98,5 @@ EXPORTS
gst_rtsp_watch_attach gst_rtsp_watch_attach
gst_rtsp_watch_new gst_rtsp_watch_new
gst_rtsp_watch_queue_message gst_rtsp_watch_queue_message
gst_rtsp_watch_reset
gst_rtsp_watch_unref gst_rtsp_watch_unref