summaryrefslogtreecommitdiff
path: root/ext/curl/gstcurlhttpsrc.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/curl/gstcurlhttpsrc.c')
-rw-r--r--ext/curl/gstcurlhttpsrc.c479
1 files changed, 289 insertions, 190 deletions
diff --git a/ext/curl/gstcurlhttpsrc.c b/ext/curl/gstcurlhttpsrc.c
index c1a0bcf5c..9a2dbb0d5 100644
--- a/ext/curl/gstcurlhttpsrc.c
+++ b/ext/curl/gstcurlhttpsrc.c
@@ -73,6 +73,46 @@
* </refsect2>
*/
+/*
+ * Thread safety notes.
+ *
+ * GstCurlHttpSrc uses a single thread running the
+ * gst_curl_http_src_curl_multi_loop() function to handle receiving
+ * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
+ * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
+ * for the multi_loop to perform the HTTP request.
+ *
+ * When an instance of GstCurlHttpSrc wants to make a request (i.e.
+ * it has moved to the PLAYING state) it adds itself to the
+ * multi_task_context.queue list and signals the multi_loop task.
+ *
+ * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
+ * to wait for gst_curl_http_src_curl_multi_loop() to perform the
+ * request and signal completion.
+ *
+ * Each instance of GstCurlHttpSrc is protected by the mutexes:
+ * 1. uri_mutex
+ * 2. buffer_mutex
+ *
+ * uri_mutex is used to protect access to the uri field.
+ *
+ * buffer_mutex is used to protect access to buffer_cond, state and
+ * connection_status.
+ *
+ * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
+ * 1. multi_task_context.task_rec_mutex
+ * 2. multi_task_context.mutex
+ *
+ * multi_task_context.task_rec_mutex is only used by GstTask.
+ *
+ * multi_task_context.mutex is used to protect access to queue and state
+ *
+ * To avoid deadlock, it is vital that if both multi_task_context.mutex
+ * and buffer_mutex are required, that they are locked in the order:
+ * 1. multi_task_context.mutex
+ * 2. buffer_mutex
+ */
+
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
@@ -87,6 +127,35 @@ GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
#define GST_CAT_DEFAULT gst_curl_http_src_debug
GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
+enum
+{
+ PROP_0,
+ PROP_URI,
+ PROP_USERNAME,
+ PROP_PASSWORD,
+ PROP_PROXYURI,
+ PROP_PROXYUSERNAME,
+ PROP_PROXYPASSWORD,
+ PROP_COOKIES,
+ PROP_USERAGENT,
+ PROP_HEADERS,
+ PROP_COMPRESS,
+ PROP_REDIRECT,
+ PROP_MAXREDIRECT,
+ PROP_KEEPALIVE,
+ PROP_TIMEOUT,
+ PROP_STRICT_SSL,
+ PROP_SSL_CA_FILE,
+ PROP_RETRIES,
+ PROP_CONNECTIONMAXTIME,
+ PROP_MAXCONCURRENT_SERVER,
+ PROP_MAXCONCURRENT_PROXY,
+ PROP_MAXCONCURRENT_GLOBAL,
+ PROP_HTTPVERSION,
+ PROP_IRADIO_MODE,
+ PROP_MAX
+};
+
/*
* Make a source pad template to be able to kick out recv'd data
*/
@@ -138,14 +207,15 @@ static size_t gst_curl_http_src_get_header (void *header, size_t size,
static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
size_t nmemb, void *src);
static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
+static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
static char *gst_curl_http_src_strcasestr (const char *haystack,
const char *needle);
-curl_version_info_data *gst_curl_http_src_curl_capabilities;
-GstCurlHttpVersion pref_http_ver;
-gchar *gst_curl_http_src_default_useragent;
+static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
+static GstCurlHttpVersion pref_http_ver;
#define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
+
static GType
gst_curl_http_version_get_type (void)
{
@@ -237,10 +307,6 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
pref_http_ver = default_http_version;
}
- gst_curl_http_src_default_useragent =
- g_strdup_printf ("GStreamer curlhttpsrc libcurl/%s",
- gst_curl_http_src_curl_capabilities->version);
-
gobject_class->set_property = gst_curl_http_src_set_property;
gobject_class->get_property = gst_curl_http_src_get_property;
gobject_class->finalize = gst_curl_http_src_finalize;
@@ -285,7 +351,8 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
g_object_class_install_property (gobject_class, PROP_USERAGENT,
g_param_spec_string ("user-agent", "User-Agent",
- "URI of resource requested", GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT,
+ "URI of resource requested",
+ GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_COMPRESS,
@@ -390,9 +457,13 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
__LINE__, NULL, "Testing the curl_multi_loop debugging prints");
#endif
+ klass->multi_task_context.task = NULL;
+ klass->multi_task_context.refcount = 0;
+ klass->multi_task_context.queue = NULL;
+ klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
+ klass->multi_task_context.multi_handle = NULL;
g_mutex_init (&klass->multi_task_context.mutex);
g_cond_init (&klass->multi_task_context.signal);
- g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
gst_element_class_set_static_metadata (gstelement_class,
"HTTP Client Source using libcURL",
@@ -410,8 +481,10 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_URI:
+ g_mutex_lock (&source->uri_mutex);
g_free (source->uri);
source->uri = g_value_dup_string (value);
+ g_mutex_unlock (&source->uri_mutex);
break;
case PROP_USERNAME:
g_free (source->username);
@@ -447,7 +520,9 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id,
const GstStructure *s = gst_value_get_structure (value);
if (source->request_headers)
gst_structure_free (source->request_headers);
- source->request_headers = s ? gst_structure_copy (s) : NULL;
+ source->request_headers =
+ s ? gst_structure_copy (s) :
+ gst_structure_new_empty (REQUEST_HEADERS_NAME);
}
break;
case PROP_COMPRESS:
@@ -505,7 +580,9 @@ gst_curl_http_src_get_property (GObject * object, guint prop_id,
switch (prop_id) {
case PROP_URI:
+ g_mutex_lock (&source->uri_mutex);
g_value_set_string (value, source->uri);
+ g_mutex_unlock (&source->uri_mutex);
break;
case PROP_USERNAME:
g_value_set_string (value, source->username);
@@ -591,9 +668,12 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
source->proxy_user = NULL;
source->proxy_pass = NULL;
source->cookies = NULL;
- source->user_agent = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT;
+ g_assert (gst_curl_http_src_curl_capabilities != NULL);
+ source->user_agent =
+ g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
+ gst_curl_http_src_curl_capabilities->version);
source->number_cookies = 0;
- source->request_headers = NULL;
+ source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
@@ -608,8 +688,8 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
source->retries_remaining = source->total_retries;
source->slist = NULL;
+ source->accept_compressed_encodings = FALSE;
- gst_caps_replace (&source->caps, NULL);
gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
@@ -617,18 +697,23 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
g_mutex_init (&source->uri_mutex);
g_mutex_init (&source->buffer_mutex);
- g_cond_init (&source->signal);
+ g_cond_init (&source->buffer_cond);
source->buffer = NULL;
source->buffer_len = 0;
source->state = GSTCURL_NONE;
source->pending_state = GSTCURL_NONE;
- source->status_code = 0;
+ source->transfer_begun = FALSE;
+ source->data_received = FALSE;
+ source->connection_status = GSTCURL_NOT_CONNECTED;
source->http_headers = NULL;
+ source->content_type = NULL;
+ source->status_code = 0;
source->hdrs_updated = FALSE;
source->curl_result = CURLE_OK;
+ gst_caps_replace (&source->caps, NULL);
GSTCURL_FUNCTION_EXIT (source);
}
@@ -666,6 +751,8 @@ gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
#endif
/* Start the thread */
+ g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
+ klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
klass->multi_task_context.task = gst_task_new (
(GstTaskFunction) gst_curl_http_src_curl_multi_loop,
(gpointer) & klass->multi_task_context, NULL);
@@ -709,13 +796,20 @@ gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
klass->multi_task_context.refcount);
- if (klass->multi_task_context.refcount <= 0) {
+ if (klass->multi_task_context.refcount == 0) {
/* Everything's done! Clean up. */
- gst_task_pause (klass->multi_task_context.task);
+ gst_task_stop (klass->multi_task_context.task);
klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
+ GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
gst_task_join (klass->multi_task_context.task);
+ gst_object_unref (klass->multi_task_context.task);
+ klass->multi_task_context.task = NULL;
+ curl_multi_cleanup (klass->multi_task_context.multi_handle);
+ klass->multi_task_context.multi_handle = NULL;
+ g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
+ GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
} else {
g_mutex_unlock (&klass->multi_task_context.mutex);
}
@@ -734,6 +828,9 @@ gst_curl_http_src_finalize (GObject * obj)
gst_curl_http_src_cleanup_instance (src);
GSTCURL_FUNCTION_EXIT (src);
+
+ /* Chain up to parent class */
+ G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
}
/*
@@ -748,20 +845,25 @@ gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
GstCurlHttpSrcClass *klass;
GstStructure *empty_headers;
+ GstBaseSrc *basesrc;
+
+ GSTCURL_FUNCTION_ENTRY (src);
klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
+ basesrc = GST_BASE_SRC_CAST (src);
- GSTCURL_FUNCTION_ENTRY (src);
+retry:
ret = GST_FLOW_OK;
-
+ /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
+ needed, multi_task_context.mutex must be acquired first */
+ g_mutex_lock (&klass->multi_task_context.mutex);
g_mutex_lock (&src->buffer_mutex);
if (src->state == GSTCURL_UNLOCK) {
ret = GST_FLOW_FLUSHING;
goto escape;
}
-retry:
if (!src->transfer_begun) {
GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
/* Create the Easy Handle and set up the session. */
@@ -771,19 +873,14 @@ retry:
goto escape;
}
- g_mutex_lock (&klass->multi_task_context.mutex);
-
if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
== FALSE) {
GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
ret = GST_FLOW_ERROR;
goto escape;
}
-
/* Signal the worker thread */
- klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT;
g_cond_signal (&klass->multi_task_context.signal);
- g_mutex_unlock (&klass->multi_task_context.mutex);
src->state = GSTCURL_OK;
src->transfer_begun = TRUE;
@@ -791,6 +888,9 @@ retry:
GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
+ if (src->http_headers != NULL) {
+ gst_structure_free (src->http_headers);
+ }
empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
URI_NAME, G_TYPE_STRING, src->uri,
@@ -800,9 +900,12 @@ retry:
GST_INFO_OBJECT (src, "Created a new headers object");
}
+ g_mutex_unlock (&klass->multi_task_context.mutex);
+
/* Wait for data to become available, then punt it downstream */
- while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)) {
- g_cond_wait (&src->signal, &src->buffer_mutex);
+ while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
+ && (src->connection_status == GSTCURL_CONNECTED)) {
+ g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
}
if (src->state == GSTCURL_UNLOCK) {
@@ -811,14 +914,16 @@ retry:
src->buffer = NULL;
src->buffer_len = 0;
}
- ret = GST_FLOW_FLUSHING;
- goto escape;
+ g_mutex_unlock (&src->buffer_mutex);
+ return GST_FLOW_FLUSHING;
}
ret = gst_curl_http_src_handle_response (src);
switch (ret) {
case GST_FLOW_ERROR:
- goto escape; /* Don't attempt a retry, just bomb out */
+ /* Don't attempt a retry, just bomb out */
+ g_mutex_unlock (&src->buffer_mutex);
+ return ret;
case GST_FLOW_CUSTOM_ERROR:
if (src->data_received == TRUE) {
/*
@@ -830,14 +935,14 @@ retry:
*/
GST_WARNING_OBJECT (src,
"Failed mid-transfer, can't continue for URI %s", src->uri);
- ret = GST_FLOW_ERROR;
- goto escape;
+ g_mutex_unlock (&src->buffer_mutex);
+ return GST_FLOW_ERROR;
}
src->retries_remaining--;
if (src->retries_remaining == 0) {
GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
- ret = GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
- goto escape;
+ g_mutex_unlock (&src->buffer_mutex);
+ return GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
}
GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
src->state = GSTCURL_NONE;
@@ -903,9 +1008,13 @@ retry:
GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
}
}
+ g_mutex_unlock (&src->buffer_mutex);
+ GSTCURL_FUNCTION_EXIT (src);
+ return ret;
escape:
g_mutex_unlock (&src->buffer_mutex);
+ g_mutex_unlock (&klass->multi_task_context.mutex);
GSTCURL_FUNCTION_EXIT (src);
return ret;
@@ -942,6 +1051,13 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
gint i;
GSTCURL_FUNCTION_ENTRY (s);
+ /* This is mandatory and yet not default option, so if this is NULL
+ * then something very bad is going on. */
+ if (s->uri == NULL) {
+ GST_ERROR_OBJECT (s, "No URI for curl!");
+ return NULL;
+ }
+
handle = curl_easy_init ();
if (handle == NULL) {
GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
@@ -949,14 +1065,7 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
}
GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
- /* This is mandatory and yet not default option, so if this is NULL
- * then something very bad is going on. */
- if (s->uri == NULL) {
- GST_ERROR_OBJECT (s, "No URI for curl!");
- return NULL;
- }
gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
-
gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
@@ -1164,7 +1273,6 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
RESPONSE_HEADERS_NAME);
if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
GstEvent *hdrs_event;
- GstStructure *empty_headers;
gst_element_post_message (GST_ELEMENT_CAST (src),
gst_message_new_element (GST_OBJECT_CAST (src),
@@ -1172,15 +1280,9 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
/* gst_event_new_custom takes ownership of our structure */
hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
- src->http_headers);
+ gst_structure_copy (src->http_headers));
gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
GST_INFO_OBJECT (src, "Pushed headers downstream");
- empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
- src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
- URI_NAME, G_TYPE_STRING, src->uri,
- REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
- RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
- gst_structure_free (empty_headers);
}
src->hdrs_updated = FALSE;
@@ -1198,29 +1300,28 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
static gboolean
gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
{
+ const GValue *response_headers;
+ const GstStructure *response_struct;
+
GST_INFO_OBJECT (src, "Negotiating caps...");
if (src->caps && src->http_headers) {
- const GValue *response_headers = gst_structure_get_value (src->http_headers,
- RESPONSE_HEADERS_NAME);
-
- if (gst_structure_has_field (gst_value_get_structure (response_headers),
- "content-type") == TRUE) {
- const GValue *gv_content_type =
- gst_structure_get_value (gst_value_get_structure (response_headers),
- "content-type");
- if (G_VALUE_HOLDS_STRING (gv_content_type) == TRUE) {
- const gchar *content_type = g_value_get_string (gv_content_type);
- GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s",
- content_type);
- src->caps = gst_caps_make_writable (src->caps);
- gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
- content_type, NULL);
- if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
- GST_ERROR_OBJECT (src, "Setting caps failed!");
- return FALSE;
- }
- } else {
- GST_ERROR_OBJECT (src, "Content Type doesn't contain expected string");
+ response_headers =
+ gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
+ if (!response_headers) {
+ GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
+ return FALSE;
+ }
+ response_struct = gst_value_get_structure (response_headers);
+ if (gst_structure_has_field_typed (response_struct, "content-type",
+ G_TYPE_STRING)) {
+ const gchar *content_type =
+ gst_structure_get_string (response_struct, "content-type");
+ GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
+ src->caps = gst_caps_make_writable (src->caps);
+ gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
+ content_type, NULL);
+ if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
+ GST_ERROR_OBJECT (src, "Setting caps failed!");
return FALSE;
}
}
@@ -1268,8 +1369,10 @@ gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
}
break;
case GST_STATE_CHANGE_READY_TO_NULL:
- /* The pipeline has ended, so signal any running request to end. */
- gst_curl_http_src_request_remove (source);
+ GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
+ /* The pipeline has ended, so signal any running request to end
+ and wait until the multi_loop has stopped using this element */
+ gst_curl_http_src_wait_until_removed (source);
gst_curl_http_src_unref_multi (source);
break;
default:
@@ -1314,17 +1417,25 @@ gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
g_free (src->cookies);
src->cookies = NULL;
+ g_free (src->user_agent);
+ src->user_agent = NULL;
+
g_mutex_clear (&src->buffer_mutex);
- g_cond_clear (&src->signal);
+ g_cond_clear (&src->buffer_cond);
g_free (src->buffer);
src->buffer = NULL;
+ if (src->request_headers) {
+ gst_structure_free (src->request_headers);
+ src->request_headers = NULL;
+ }
if (src->http_headers != NULL) {
gst_structure_free (src->http_headers);
src->http_headers = NULL;
}
+ gst_caps_replace (&src->caps, NULL);
gst_curl_http_src_destroy_easy_handle (src);
}
@@ -1338,10 +1449,12 @@ gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_URI:
+ g_mutex_lock (&src->uri_mutex);
gst_query_set_uri (query, src->uri);
if (src->redirect_uri != NULL) {
gst_query_set_uri_redirection (query, src->redirect_uri);
}
+ g_mutex_unlock (&src->uri_mutex);
ret = TRUE;
break;
default:
@@ -1366,22 +1479,17 @@ gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
response_headers = gst_structure_get_value (src->http_headers,
RESPONSE_HEADERS_NAME);
- if (gst_structure_has_field (gst_value_get_structure (response_headers),
- "content-length") == TRUE) {
- const GValue *content_length =
- gst_structure_get_value (gst_value_get_structure (response_headers),
+ if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
+ "content-length", G_TYPE_STRING)) {
+ const gchar *content_length =
+ gst_structure_get_string (gst_value_get_structure (response_headers),
"content-length");
- if (G_VALUE_HOLDS_STRING (content_length) == TRUE) {
- const gchar *len = g_value_get_string (content_length);
- *size = (guint64) g_ascii_strtoull (len, NULL, 10);
- ret = TRUE;
- } else {
- GST_ERROR_OBJECT (src, "Content Length doesn't contain expected string");
- }
+ *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
+ ret = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (src,
+ "No content length has yet been set, or there was an error!");
}
-
- GST_DEBUG_OBJECT (src,
- "No content length has yet been set, or there was an error!");
return ret;
}
@@ -1449,6 +1557,7 @@ gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
source->uri = g_strdup (uri);
if (source->uri == NULL) {
+ g_mutex_unlock (&source->uri_mutex);
return FALSE;
}
source->retries_remaining = source->total_retries;
@@ -1467,19 +1576,32 @@ static gboolean
gst_curl_http_src_unlock (GstBaseSrc * bsrc)
{
GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
+ gboolean want_removal = FALSE;
g_mutex_lock (&src->buffer_mutex);
if (src->state != GSTCURL_UNLOCK) {
if (src->state == GSTCURL_OK) {
/* A transfer is running, cancel it */
- gst_curl_http_src_request_remove (src);
+ if (src->connection_status == GSTCURL_CONNECTED) {
+ src->connection_status = GSTCURL_WANT_REMOVAL;
+ }
+ want_removal = TRUE;
}
src->pending_state = src->state;
src->state = GSTCURL_UNLOCK;
}
- g_cond_signal (&src->signal);
+ g_cond_signal (&src->buffer_cond);
g_mutex_unlock (&src->buffer_mutex);
+ if (want_removal) {
+ GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
+ GST_TYPE_CURL_HTTP_SRC,
+ GstCurlHttpSrcClass);
+ g_mutex_lock (&klass->multi_task_context.mutex);
+ g_cond_signal (&klass->multi_task_context.signal);
+ g_mutex_unlock (&klass->multi_task_context.mutex);
+ }
+
return TRUE;
}
@@ -1496,7 +1618,7 @@ gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
g_mutex_lock (&src->buffer_mutex);
src->state = src->pending_state;
src->pending_state = GSTCURL_NONE;
- g_cond_signal (&src->signal);
+ g_cond_signal (&src->buffer_cond);
g_mutex_unlock (&src->buffer_mutex);
return TRUE;
@@ -1510,9 +1632,10 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
{
GstCurlHttpSrcMultiTaskContext *context;
GstCurlHttpSrcQueueElement *qelement, *qnext;
- int i, still_running;
- gboolean cond = FALSE;
+ gint i, still_running = 0;
CURLMsg *curl_message;
+ GstCurlHttpSrc *elt;
+ guint active = 0;
context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
@@ -1521,49 +1644,60 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
/* Someone is holding a reference to us, but isn't using us so to avoid
* unnecessary clock cycle wasting, sit in a conditional wait until woken.
*/
- while (context->state == GSTCURL_MULTI_LOOP_STATE_WAIT) {
- GSTCURL_DEBUG_PRINT ("Entering wait state...");
+ while (context->queue == NULL
+ && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
+ GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
g_cond_wait (&context->signal, &context->mutex);
GSTCURL_DEBUG_PRINT ("Received wake up call!");
}
+ if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
+ GSTCURL_INFO_PRINT ("Got instruction to shut down");
+ goto out;
+ }
- if (context->state == GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) {
- GSTCURL_DEBUG_PRINT ("Received a new item on the queue!");
- if (context->queue == NULL) {
- GSTCURL_ERROR_PRINT ("Request Queue was empty on a Queue Event!");
- context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
- return;
- }
-
- /*
- * Use the running mutex to lock access to each element, as the
- * mutex's memory barriers stop cache optimisations from meaning
- * flag values can't be trusted. The trylock will only let us in
- * once and should fail immediately prior.
- */
- qelement = context->queue;
- while (qelement != NULL) {
- if (g_mutex_trylock (&qelement->running) == TRUE) {
+ /* check for elements that need to be started or removed */
+ qelement = context->queue;
+ while (qelement != NULL) {
+ qnext = qelement->next;
+ elt = qelement->p;
+ /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
+ needed, multi_task_context.mutex must be acquired first */
+ g_mutex_lock (&elt->buffer_mutex);
+ if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
+ curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
+ if (elt->state == GSTCURL_UNLOCK) {
+ elt->pending_state = GSTCURL_REMOVED;
+ } else {
+ elt->state = GSTCURL_REMOVED;
+ }
+ elt->connection_status = GSTCURL_NOT_CONNECTED;
+ gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
+ g_cond_signal (&elt->buffer_cond);
+ } else if (elt->connection_status == GSTCURL_CONNECTED) {
+ active++;
+ if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
- cond = TRUE;
curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
}
- qelement = qelement->next;
}
+ g_mutex_unlock (&elt->buffer_mutex);
+ qelement = qnext;
+ }
- if (cond != TRUE) {
- GSTCURL_WARNING_PRINT ("All curl handles already added for QUEUE_EVENT!");
- } else {
- GSTCURL_DEBUG_PRINT ("Finished adding all handles, continuing.");
- context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
- }
- g_mutex_unlock (&context->mutex);
- } else if (context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
+ if (active == 0) {
+ GSTCURL_DEBUG_PRINT ("No active elements");
+ goto out;
+ }
+
+ /* perform a select() on all of the active sockets and process any
+ messages from curl */
+ {
struct timeval timeout;
gint rc;
fd_set fdread, fdwrite, fdexcep;
int maxfd = -1;
long curl_timeo = -1;
+ gboolean cond = FALSE;
/* Because curl can possibly take some time here, be nice and let go of the
* mutex so other threads can perform state/queue operations as we don't
@@ -1604,6 +1738,8 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
break;
}
+ g_mutex_lock (&context->mutex);
+
/*
* Check the CURL message buffer to find out if any transfers have
* completed. If they have, call the signal_finished function which
@@ -1617,73 +1753,17 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data)
} else if (curl_message->msg == CURLMSG_DONE) {
/* A hack, but I have seen curl_message->easy_handle being
* NULL randomly, so check for that. */
- g_mutex_lock (&context->mutex);
- if (curl_message->easy_handle == NULL) {
- break;
- }
- curl_multi_remove_handle (context->multi_handle,
- curl_message->easy_handle);
- gst_curl_http_src_remove_queue_handle (&context->queue,
- curl_message->easy_handle, curl_message->data.result);
- g_mutex_unlock (&context->mutex);
- }
- }
-
- if (still_running == 0) {
- /* We've finished processing, so set the state to wait.
- *
- * This is a little more complex, as we need to catch the edge
- * case of another thread adding a queue item while we've been
- * working.
- */
- g_mutex_lock (&context->mutex);
- if ((context->state != GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) &&
- (context->state != GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL)) {
- context->state = GSTCURL_MULTI_LOOP_STATE_WAIT;
- }
- g_mutex_unlock (&context->mutex);
- }
- }
- /* Is the following even necessary any more...? */
- else if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
- g_mutex_unlock (&context->mutex);
- /* Something wants us to shut down, so best to do a full cleanup as it
- * might be that something's gone bang.
- */
- /*gst_curl_http_src_unref_multi (NULL, GSTCURL_RETURN_PIPELINE_NULL, TRUE); */
- GSTCURL_INFO_PRINT ("Got instruction to shut down");
- } else if (context->state == GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL) {
- qelement = context->queue;
- while (qelement != NULL) {
- qnext = qelement->next;
- if (qelement->p == context->request_removal_element) {
- g_mutex_lock (&qelement->p->buffer_mutex);
- curl_multi_remove_handle (context->multi_handle,
- context->request_removal_element->curl_handle);
- if (qelement->p->state == GSTCURL_UNLOCK) {
- qelement->p->pending_state = GSTCURL_REMOVED;
- } else {
- qelement->p->state = GSTCURL_REMOVED;
+ if (curl_message->easy_handle != NULL) {
+ curl_multi_remove_handle (context->multi_handle,
+ curl_message->easy_handle);
+ gst_curl_http_src_remove_queue_handle (&context->queue,
+ curl_message->easy_handle, curl_message->data.result);
}
- g_cond_signal (&qelement->p->signal);
- g_mutex_unlock (&qelement->p->buffer_mutex);
- gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
}
- qelement = qnext;
}
- context->request_removal_element = NULL;
- context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
- g_mutex_unlock (&context->mutex);
- } else {
- GSTCURL_WARNING_PRINT ("Curl Loop State was invalid or unsupported");
- GSTCURL_WARNING_PRINT ("Signal State is %d, resetting to RUNNING.",
- context->state);
- /* Reset to running, so if there isn't anything to do it'll be
- * changed the WAIT once curl_multi_perform says it has no active
- * handles. */
- context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
- g_mutex_unlock (&context->mutex);
}
+out:
+ g_mutex_unlock (&context->mutex);
}
/*
@@ -1763,8 +1843,8 @@ gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
/* If header field already exists, append to the end */
if (gst_structure_has_field (response_headers, header_key) == TRUE) {
header_value = g_strdup_printf ("%s, %s",
- g_value_get_string (gst_structure_get_value (response_headers,
- header_key)), header_tpl[1]);
+ gst_structure_get_string (response_headers, header_key),
+ header_tpl[1]);
gst_structure_set ((GstStructure *) response_headers, header_key,
G_TYPE_STRING, header_value, NULL);
g_free (header_value);
@@ -1850,7 +1930,7 @@ gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
}
memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
s->buffer_len += chunk_len;
- g_cond_signal (&s->signal);
+ g_cond_signal (&s->buffer_cond);
g_mutex_unlock (&s->buffer_mutex);
return chunk_len;
}
@@ -1864,10 +1944,29 @@ gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
GST_TYPE_CURL_HTTP_SRC,
GstCurlHttpSrcClass);
- g_mutex_lock (&klass->multi_task_context.mutex);
- klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL;
- klass->multi_task_context.request_removal_element = src;
+ g_mutex_lock (&klass->multi_task_context.mutex);
+ g_mutex_lock (&src->buffer_mutex);
+ if (src->connection_status == GSTCURL_CONNECTED) {
+ src->connection_status = GSTCURL_WANT_REMOVAL;
+ }
+ g_mutex_unlock (&src->buffer_mutex);
g_cond_signal (&klass->multi_task_context.signal);
g_mutex_unlock (&klass->multi_task_context.mutex);
}
+
+/*
+ * Request a cancellation of a currently running curl handle and
+ * block this thread until the src element has been removed
+ * from the queue
+ */
+static void
+gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
+{
+ gst_curl_http_src_request_remove (src);
+ g_mutex_lock (&src->buffer_mutex);
+ while (src->connection_status != GSTCURL_NOT_CONNECTED) {
+ g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
+ }
+ g_mutex_unlock (&src->buffer_mutex);
+}