diff options
-rw-r--r-- | ext/curl/gstcurldefaults.h | 2 | ||||
-rw-r--r-- | ext/curl/gstcurlhttpsrc.c | 479 | ||||
-rw-r--r-- | ext/curl/gstcurlhttpsrc.h | 43 | ||||
-rw-r--r-- | ext/curl/gstcurlqueue.c | 7 | ||||
-rw-r--r-- | ext/curl/gstcurlqueue.h | 2 |
5 files changed, 303 insertions, 230 deletions
diff --git a/ext/curl/gstcurldefaults.h b/ext/curl/gstcurldefaults.h index 8e687f749..2f4bc8393 100644 --- a/ext/curl/gstcurldefaults.h +++ b/ext/curl/gstcurldefaults.h @@ -60,7 +60,7 @@ #define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY ((void *)0) #define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME ((void *)0) #define GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD ((void *)0) -#define GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT gst_curl_http_src_default_useragent +#define GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "GStreamer curlhttpsrc libcurl" #define GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING FALSE #define GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION 1L #define GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS -1 diff --git a/ext/curl/gstcurlhttpsrc.c b/ext/curl/gstcurlhttpsrc.c index c1a0bcf5c..9a2dbb0d5 100644 --- a/ext/curl/gstcurlhttpsrc.c +++ b/ext/curl/gstcurlhttpsrc.c @@ -73,6 +73,46 @@ * </refsect2> */ +/* + * Thread safety notes. + * + * GstCurlHttpSrc uses a single thread running the + * gst_curl_http_src_curl_multi_loop() function to handle receiving + * data and messages from libcurl. Each instance of GstCurlHttpSrc adds + * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits + * for the multi_loop to perform the HTTP request. + * + * When an instance of GstCurlHttpSrc wants to make a request (i.e. + * it has moved to the PLAYING state) it adds itself to the + * multi_task_context.queue list and signals the multi_loop task. + * + * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond + * to wait for gst_curl_http_src_curl_multi_loop() to perform the + * request and signal completion. + * + * Each instance of GstCurlHttpSrc is protected by the mutexes: + * 1. uri_mutex + * 2. buffer_mutex + * + * uri_mutex is used to protect access to the uri field. + * + * buffer_mutex is used to protect access to buffer_cond, state and + * connection_status. + * + * The gst_curl_http_src_curl_multi_loop() function uses the mutexes: + * 1. multi_task_context.task_rec_mutex + * 2. multi_task_context.mutex + * + * multi_task_context.task_rec_mutex is only used by GstTask. + * + * multi_task_context.mutex is used to protect access to queue and state + * + * To avoid deadlock, it is vital that if both multi_task_context.mutex + * and buffer_mutex are required, that they are locked in the order: + * 1. multi_task_context.mutex + * 2. buffer_mutex + */ + #ifdef HAVE_CONFIG_H #include <config.h> #endif @@ -87,6 +127,35 @@ GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug); #define GST_CAT_DEFAULT gst_curl_http_src_debug GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug); +enum +{ + PROP_0, + PROP_URI, + PROP_USERNAME, + PROP_PASSWORD, + PROP_PROXYURI, + PROP_PROXYUSERNAME, + PROP_PROXYPASSWORD, + PROP_COOKIES, + PROP_USERAGENT, + PROP_HEADERS, + PROP_COMPRESS, + PROP_REDIRECT, + PROP_MAXREDIRECT, + PROP_KEEPALIVE, + PROP_TIMEOUT, + PROP_STRICT_SSL, + PROP_SSL_CA_FILE, + PROP_RETRIES, + PROP_CONNECTIONMAXTIME, + PROP_MAXCONCURRENT_SERVER, + PROP_MAXCONCURRENT_PROXY, + PROP_MAXCONCURRENT_GLOBAL, + PROP_HTTPVERSION, + PROP_IRADIO_MODE, + PROP_MAX +}; + /* * Make a source pad template to be able to kick out recv'd data */ @@ -138,14 +207,15 @@ static size_t gst_curl_http_src_get_header (void *header, size_t size, static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src); static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src); +static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src); static char *gst_curl_http_src_strcasestr (const char *haystack, const char *needle); -curl_version_info_data *gst_curl_http_src_curl_capabilities; -GstCurlHttpVersion pref_http_ver; -gchar *gst_curl_http_src_default_useragent; +static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL; +static GstCurlHttpVersion pref_http_ver; #define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ()) + static GType gst_curl_http_version_get_type (void) { @@ -237,10 +307,6 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass) pref_http_ver = default_http_version; } - gst_curl_http_src_default_useragent = - g_strdup_printf ("GStreamer curlhttpsrc libcurl/%s", - gst_curl_http_src_curl_capabilities->version); - gobject_class->set_property = gst_curl_http_src_set_property; gobject_class->get_property = gst_curl_http_src_get_property; gobject_class->finalize = gst_curl_http_src_finalize; @@ -285,7 +351,8 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass) g_object_class_install_property (gobject_class, PROP_USERAGENT, g_param_spec_string ("user-agent", "User-Agent", - "URI of resource requested", GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT, + "URI of resource requested", + GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_COMPRESS, @@ -390,9 +457,13 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass) __LINE__, NULL, "Testing the curl_multi_loop debugging prints"); #endif + klass->multi_task_context.task = NULL; + klass->multi_task_context.refcount = 0; + klass->multi_task_context.queue = NULL; + klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP; + klass->multi_task_context.multi_handle = NULL; g_mutex_init (&klass->multi_task_context.mutex); g_cond_init (&klass->multi_task_context.signal); - g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex); gst_element_class_set_static_metadata (gstelement_class, "HTTP Client Source using libcURL", @@ -410,8 +481,10 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_URI: + g_mutex_lock (&source->uri_mutex); g_free (source->uri); source->uri = g_value_dup_string (value); + g_mutex_unlock (&source->uri_mutex); break; case PROP_USERNAME: g_free (source->username); @@ -447,7 +520,9 @@ gst_curl_http_src_set_property (GObject * object, guint prop_id, const GstStructure *s = gst_value_get_structure (value); if (source->request_headers) gst_structure_free (source->request_headers); - source->request_headers = s ? gst_structure_copy (s) : NULL; + source->request_headers = + s ? gst_structure_copy (s) : + gst_structure_new_empty (REQUEST_HEADERS_NAME); } break; case PROP_COMPRESS: @@ -505,7 +580,9 @@ gst_curl_http_src_get_property (GObject * object, guint prop_id, switch (prop_id) { case PROP_URI: + g_mutex_lock (&source->uri_mutex); g_value_set_string (value, source->uri); + g_mutex_unlock (&source->uri_mutex); break; case PROP_USERNAME: g_value_set_string (value, source->username); @@ -591,9 +668,12 @@ gst_curl_http_src_init (GstCurlHttpSrc * source) source->proxy_user = NULL; source->proxy_pass = NULL; source->cookies = NULL; - source->user_agent = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT; + g_assert (gst_curl_http_src_curl_capabilities != NULL); + source->user_agent = + g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s", + gst_curl_http_src_curl_capabilities->version); source->number_cookies = 0; - source->request_headers = NULL; + source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME); source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION; source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS; source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE; @@ -608,8 +688,8 @@ gst_curl_http_src_init (GstCurlHttpSrc * source) source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES; source->retries_remaining = source->total_retries; source->slist = NULL; + source->accept_compressed_encodings = FALSE; - gst_caps_replace (&source->caps, NULL); gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE); source->proxy_uri = g_strdup (g_getenv ("http_proxy")); @@ -617,18 +697,23 @@ gst_curl_http_src_init (GstCurlHttpSrc * source) g_mutex_init (&source->uri_mutex); g_mutex_init (&source->buffer_mutex); - g_cond_init (&source->signal); + g_cond_init (&source->buffer_cond); source->buffer = NULL; source->buffer_len = 0; source->state = GSTCURL_NONE; source->pending_state = GSTCURL_NONE; - source->status_code = 0; + source->transfer_begun = FALSE; + source->data_received = FALSE; + source->connection_status = GSTCURL_NOT_CONNECTED; source->http_headers = NULL; + source->content_type = NULL; + source->status_code = 0; source->hdrs_updated = FALSE; source->curl_result = CURLE_OK; + gst_caps_replace (&source->caps, NULL); GSTCURL_FUNCTION_EXIT (source); } @@ -666,6 +751,8 @@ gst_curl_http_src_ref_multi (GstCurlHttpSrc * src) #endif /* Start the thread */ + g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex); + klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING; klass->multi_task_context.task = gst_task_new ( (GstTaskFunction) gst_curl_http_src_curl_multi_loop, (gpointer) & klass->multi_task_context, NULL); @@ -709,13 +796,20 @@ gst_curl_http_src_unref_multi (GstCurlHttpSrc * src) GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u", klass->multi_task_context.refcount); - if (klass->multi_task_context.refcount <= 0) { + if (klass->multi_task_context.refcount == 0) { /* Everything's done! Clean up. */ - gst_task_pause (klass->multi_task_context.task); + gst_task_stop (klass->multi_task_context.task); klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP; g_cond_signal (&klass->multi_task_context.signal); g_mutex_unlock (&klass->multi_task_context.mutex); + GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task..."); gst_task_join (klass->multi_task_context.task); + gst_object_unref (klass->multi_task_context.task); + klass->multi_task_context.task = NULL; + curl_multi_cleanup (klass->multi_task_context.multi_handle); + klass->multi_task_context.multi_handle = NULL; + g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex); + GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete"); } else { g_mutex_unlock (&klass->multi_task_context.mutex); } @@ -734,6 +828,9 @@ gst_curl_http_src_finalize (GObject * obj) gst_curl_http_src_cleanup_instance (src); GSTCURL_FUNCTION_EXIT (src); + + /* Chain up to parent class */ + G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj); } /* @@ -748,20 +845,25 @@ gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc); GstCurlHttpSrcClass *klass; GstStructure *empty_headers; + GstBaseSrc *basesrc; + + GSTCURL_FUNCTION_ENTRY (src); klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC, GstCurlHttpSrcClass); + basesrc = GST_BASE_SRC_CAST (src); - GSTCURL_FUNCTION_ENTRY (src); +retry: ret = GST_FLOW_OK; - + /* NOTE: when both the buffer_mutex and multi_task_context.mutex are + needed, multi_task_context.mutex must be acquired first */ + g_mutex_lock (&klass->multi_task_context.mutex); g_mutex_lock (&src->buffer_mutex); if (src->state == GSTCURL_UNLOCK) { ret = GST_FLOW_FLUSHING; goto escape; } -retry: if (!src->transfer_begun) { GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri); /* Create the Easy Handle and set up the session. */ @@ -771,19 +873,14 @@ retry: goto escape; } - g_mutex_lock (&klass->multi_task_context.mutex); - if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src) == FALSE) { GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting..."); ret = GST_FLOW_ERROR; goto escape; } - /* Signal the worker thread */ - klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT; g_cond_signal (&klass->multi_task_context.signal); - g_mutex_unlock (&klass->multi_task_context.mutex); src->state = GSTCURL_OK; src->transfer_begun = TRUE; @@ -791,6 +888,9 @@ retry: GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri); + if (src->http_headers != NULL) { + gst_structure_free (src->http_headers); + } empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME); src->http_headers = gst_structure_new (HTTP_HEADERS_NAME, URI_NAME, G_TYPE_STRING, src->uri, @@ -800,9 +900,12 @@ retry: GST_INFO_OBJECT (src, "Created a new headers object"); } + g_mutex_unlock (&klass->multi_task_context.mutex); + /* Wait for data to become available, then punt it downstream */ - while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)) { - g_cond_wait (&src->signal, &src->buffer_mutex); + while ((src->buffer_len == 0) && (src->state == GSTCURL_OK) + && (src->connection_status == GSTCURL_CONNECTED)) { + g_cond_wait (&src->buffer_cond, &src->buffer_mutex); } if (src->state == GSTCURL_UNLOCK) { @@ -811,14 +914,16 @@ retry: src->buffer = NULL; src->buffer_len = 0; } - ret = GST_FLOW_FLUSHING; - goto escape; + g_mutex_unlock (&src->buffer_mutex); + return GST_FLOW_FLUSHING; } ret = gst_curl_http_src_handle_response (src); switch (ret) { case GST_FLOW_ERROR: - goto escape; /* Don't attempt a retry, just bomb out */ + /* Don't attempt a retry, just bomb out */ + g_mutex_unlock (&src->buffer_mutex); + return ret; case GST_FLOW_CUSTOM_ERROR: if (src->data_received == TRUE) { /* @@ -830,14 +935,14 @@ retry: */ GST_WARNING_OBJECT (src, "Failed mid-transfer, can't continue for URI %s", src->uri); - ret = GST_FLOW_ERROR; - goto escape; + g_mutex_unlock (&src->buffer_mutex); + return GST_FLOW_ERROR; } src->retries_remaining--; if (src->retries_remaining == 0) { GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri); - ret = GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */ - goto escape; + g_mutex_unlock (&src->buffer_mutex); + return GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */ } GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri); src->state = GSTCURL_NONE; @@ -903,9 +1008,13 @@ retry: GST_ERROR_OBJECT (src, "Unknown state of %u", src->state); } } + g_mutex_unlock (&src->buffer_mutex); + GSTCURL_FUNCTION_EXIT (src); + return ret; escape: g_mutex_unlock (&src->buffer_mutex); + g_mutex_unlock (&klass->multi_task_context.mutex); GSTCURL_FUNCTION_EXIT (src); return ret; @@ -942,6 +1051,13 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s) gint i; GSTCURL_FUNCTION_ENTRY (s); + /* This is mandatory and yet not default option, so if this is NULL + * then something very bad is going on. */ + if (s->uri == NULL) { + GST_ERROR_OBJECT (s, "No URI for curl!"); + return NULL; + } + handle = curl_easy_init (); if (handle == NULL) { GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!"); @@ -949,14 +1065,7 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s) } GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri); - /* This is mandatory and yet not default option, so if this is NULL - * then something very bad is going on. */ - if (s->uri == NULL) { - GST_ERROR_OBJECT (s, "No URI for curl!"); - return NULL; - } gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri); - gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username); gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password); gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri); @@ -1164,7 +1273,6 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src) RESPONSE_HEADERS_NAME); if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) { GstEvent *hdrs_event; - GstStructure *empty_headers; gst_element_post_message (GST_ELEMENT_CAST (src), gst_message_new_element (GST_OBJECT_CAST (src), @@ -1172,15 +1280,9 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src) /* gst_event_new_custom takes ownership of our structure */ hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY, - src->http_headers); + gst_structure_copy (src->http_headers)); gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event); GST_INFO_OBJECT (src, "Pushed headers downstream"); - empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME); - src->http_headers = gst_structure_new (HTTP_HEADERS_NAME, - URI_NAME, G_TYPE_STRING, src->uri, - REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers, - RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL); - gst_structure_free (empty_headers); } src->hdrs_updated = FALSE; @@ -1198,29 +1300,28 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src) static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src) { + const GValue *response_headers; + const GstStructure *response_struct; + GST_INFO_OBJECT (src, "Negotiating caps..."); if (src->caps && src->http_headers) { - const GValue *response_headers = gst_structure_get_value (src->http_headers, - RESPONSE_HEADERS_NAME); - - if (gst_structure_has_field (gst_value_get_structure (response_headers), - "content-type") == TRUE) { - const GValue *gv_content_type = - gst_structure_get_value (gst_value_get_structure (response_headers), - "content-type"); - if (G_VALUE_HOLDS_STRING (gv_content_type) == TRUE) { - const gchar *content_type = g_value_get_string (gv_content_type); - GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", - content_type); - src->caps = gst_caps_make_writable (src->caps); - gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING, - content_type, NULL); - if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) { - GST_ERROR_OBJECT (src, "Setting caps failed!"); - return FALSE; - } - } else { - GST_ERROR_OBJECT (src, "Content Type doesn't contain expected string"); + response_headers = + gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME); + if (!response_headers) { + GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME); + return FALSE; + } + response_struct = gst_value_get_structure (response_headers); + if (gst_structure_has_field_typed (response_struct, "content-type", + G_TYPE_STRING)) { + const gchar *content_type = + gst_structure_get_string (response_struct, "content-type"); + GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type); + src->caps = gst_caps_make_writable (src->caps); + gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING, + content_type, NULL); + if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) { + GST_ERROR_OBJECT (src, "Setting caps failed!"); return FALSE; } } @@ -1268,8 +1369,10 @@ gst_curl_http_src_change_state (GstElement * element, GstStateChange transition) } break; case GST_STATE_CHANGE_READY_TO_NULL: - /* The pipeline has ended, so signal any running request to end. */ - gst_curl_http_src_request_remove (source); + GST_DEBUG_OBJECT (source, "Removing from multi_loop queue..."); + /* The pipeline has ended, so signal any running request to end + and wait until the multi_loop has stopped using this element */ + gst_curl_http_src_wait_until_removed (source); gst_curl_http_src_unref_multi (source); break; default: @@ -1314,17 +1417,25 @@ gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src) g_free (src->cookies); src->cookies = NULL; + g_free (src->user_agent); + src->user_agent = NULL; + g_mutex_clear (&src->buffer_mutex); - g_cond_clear (&src->signal); + g_cond_clear (&src->buffer_cond); g_free (src->buffer); src->buffer = NULL; + if (src->request_headers) { + gst_structure_free (src->request_headers); + src->request_headers = NULL; + } if (src->http_headers != NULL) { gst_structure_free (src->http_headers); src->http_headers = NULL; } + gst_caps_replace (&src->caps, NULL); gst_curl_http_src_destroy_easy_handle (src); } @@ -1338,10 +1449,12 @@ gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query) switch (GST_QUERY_TYPE (query)) { case GST_QUERY_URI: + g_mutex_lock (&src->uri_mutex); gst_query_set_uri (query, src->uri); if (src->redirect_uri != NULL) { gst_query_set_uri_redirection (query, src->redirect_uri); } + g_mutex_unlock (&src->uri_mutex); ret = TRUE; break; default: @@ -1366,22 +1479,17 @@ gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size) response_headers = gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME); - if (gst_structure_has_field (gst_value_get_structure (response_headers), - "content-length") == TRUE) { - const GValue *content_length = - gst_structure_get_value (gst_value_get_structure (response_headers), + if (gst_structure_has_field_typed (gst_value_get_structure (response_headers), + "content-length", G_TYPE_STRING)) { + const gchar *content_length = + gst_structure_get_string (gst_value_get_structure (response_headers), "content-length"); - if (G_VALUE_HOLDS_STRING (content_length) == TRUE) { - const gchar *len = g_value_get_string (content_length); - *size = (guint64) g_ascii_strtoull (len, NULL, 10); - ret = TRUE; - } else { - GST_ERROR_OBJECT (src, "Content Length doesn't contain expected string"); - } + *size = (guint64) g_ascii_strtoull (content_length, NULL, 10); + ret = TRUE; + } else { + GST_DEBUG_OBJECT (src, + "No content length has yet been set, or there was an error!"); } - - GST_DEBUG_OBJECT (src, - "No content length has yet been set, or there was an error!"); return ret; } @@ -1449,6 +1557,7 @@ gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler, source->uri = g_strdup (uri); if (source->uri == NULL) { + g_mutex_unlock (&source->uri_mutex); return FALSE; } source->retries_remaining = source->total_retries; @@ -1467,19 +1576,32 @@ static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc) { GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc); + gboolean want_removal = FALSE; g_mutex_lock (&src->buffer_mutex); if (src->state != GSTCURL_UNLOCK) { if (src->state == GSTCURL_OK) { /* A transfer is running, cancel it */ - gst_curl_http_src_request_remove (src); + if (src->connection_status == GSTCURL_CONNECTED) { + src->connection_status = GSTCURL_WANT_REMOVAL; + } + want_removal = TRUE; } src->pending_state = src->state; src->state = GSTCURL_UNLOCK; } - g_cond_signal (&src->signal); + g_cond_signal (&src->buffer_cond); g_mutex_unlock (&src->buffer_mutex); + if (want_removal) { + GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src, + GST_TYPE_CURL_HTTP_SRC, + GstCurlHttpSrcClass); + g_mutex_lock (&klass->multi_task_context.mutex); + g_cond_signal (&klass->multi_task_context.signal); + g_mutex_unlock (&klass->multi_task_context.mutex); + } + return TRUE; } @@ -1496,7 +1618,7 @@ gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc) g_mutex_lock (&src->buffer_mutex); src->state = src->pending_state; src->pending_state = GSTCURL_NONE; - g_cond_signal (&src->signal); + g_cond_signal (&src->buffer_cond); g_mutex_unlock (&src->buffer_mutex); return TRUE; @@ -1510,9 +1632,10 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data) { GstCurlHttpSrcMultiTaskContext *context; GstCurlHttpSrcQueueElement *qelement, *qnext; - int i, still_running; - gboolean cond = FALSE; + gint i, still_running = 0; CURLMsg *curl_message; + GstCurlHttpSrc *elt; + guint active = 0; context = (GstCurlHttpSrcMultiTaskContext *) thread_data; @@ -1521,49 +1644,60 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data) /* Someone is holding a reference to us, but isn't using us so to avoid * unnecessary clock cycle wasting, sit in a conditional wait until woken. */ - while (context->state == GSTCURL_MULTI_LOOP_STATE_WAIT) { - GSTCURL_DEBUG_PRINT ("Entering wait state..."); + while (context->queue == NULL + && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) { + GSTCURL_DEBUG_PRINT ("Waiting for an element to be added..."); g_cond_wait (&context->signal, &context->mutex); GSTCURL_DEBUG_PRINT ("Received wake up call!"); } + if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) { + GSTCURL_INFO_PRINT ("Got instruction to shut down"); + goto out; + } - if (context->state == GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) { - GSTCURL_DEBUG_PRINT ("Received a new item on the queue!"); - if (context->queue == NULL) { - GSTCURL_ERROR_PRINT ("Request Queue was empty on a Queue Event!"); - context->state = GSTCURL_MULTI_LOOP_STATE_WAIT; - return; - } - - /* - * Use the running mutex to lock access to each element, as the - * mutex's memory barriers stop cache optimisations from meaning - * flag values can't be trusted. The trylock will only let us in - * once and should fail immediately prior. - */ - qelement = context->queue; - while (qelement != NULL) { - if (g_mutex_trylock (&qelement->running) == TRUE) { + /* check for elements that need to be started or removed */ + qelement = context->queue; + while (qelement != NULL) { + qnext = qelement->next; + elt = qelement->p; + /* NOTE: when both the buffer_mutex and multi_task_context.mutex are + needed, multi_task_context.mutex must be acquired first */ + g_mutex_lock (&elt->buffer_mutex); + if (elt->connection_status == GSTCURL_WANT_REMOVAL) { + curl_multi_remove_handle (context->multi_handle, elt->curl_handle); + if (elt->state == GSTCURL_UNLOCK) { + elt->pending_state = GSTCURL_REMOVED; + } else { + elt->state = GSTCURL_REMOVED; + } + elt->connection_status = GSTCURL_NOT_CONNECTED; + gst_curl_http_src_remove_queue_item (&context->queue, qelement->p); + g_cond_signal (&elt->buffer_cond); + } else if (elt->connection_status == GSTCURL_CONNECTED) { + active++; + if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) { GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri); - cond = TRUE; curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle); } - qelement = qelement->next; } + g_mutex_unlock (&elt->buffer_mutex); + qelement = qnext; + } - if (cond != TRUE) { - GSTCURL_WARNING_PRINT ("All curl handles already added for QUEUE_EVENT!"); - } else { - GSTCURL_DEBUG_PRINT ("Finished adding all handles, continuing."); - context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING; - } - g_mutex_unlock (&context->mutex); - } else if (context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) { + if (active == 0) { + GSTCURL_DEBUG_PRINT ("No active elements"); + goto out; + } + + /* perform a select() on all of the active sockets and process any + messages from curl */ + { struct timeval timeout; gint rc; fd_set fdread, fdwrite, fdexcep; int maxfd = -1; long curl_timeo = -1; + gboolean cond = FALSE; /* Because curl can possibly take some time here, be nice and let go of the * mutex so other threads can perform state/queue operations as we don't @@ -1604,6 +1738,8 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data) break; } + g_mutex_lock (&context->mutex); + /* * Check the CURL message buffer to find out if any transfers have * completed. If they have, call the signal_finished function which @@ -1617,73 +1753,17 @@ gst_curl_http_src_curl_multi_loop (gpointer thread_data) } else if (curl_message->msg == CURLMSG_DONE) { /* A hack, but I have seen curl_message->easy_handle being * NULL randomly, so check for that. */ - g_mutex_lock (&context->mutex); - if (curl_message->easy_handle == NULL) { - break; - } - curl_multi_remove_handle (context->multi_handle, - curl_message->easy_handle); - gst_curl_http_src_remove_queue_handle (&context->queue, - curl_message->easy_handle, curl_message->data.result); - g_mutex_unlock (&context->mutex); - } - } - - if (still_running == 0) { - /* We've finished processing, so set the state to wait. - * - * This is a little more complex, as we need to catch the edge - * case of another thread adding a queue item while we've been - * working. - */ - g_mutex_lock (&context->mutex); - if ((context->state != GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT) && - (context->state != GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL)) { - context->state = GSTCURL_MULTI_LOOP_STATE_WAIT; - } - g_mutex_unlock (&context->mutex); - } - } - /* Is the following even necessary any more...? */ - else if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) { - g_mutex_unlock (&context->mutex); - /* Something wants us to shut down, so best to do a full cleanup as it - * might be that something's gone bang. - */ - /*gst_curl_http_src_unref_multi (NULL, GSTCURL_RETURN_PIPELINE_NULL, TRUE); */ - GSTCURL_INFO_PRINT ("Got instruction to shut down"); - } else if (context->state == GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL) { - qelement = context->queue; - while (qelement != NULL) { - qnext = qelement->next; - if (qelement->p == context->request_removal_element) { - g_mutex_lock (&qelement->p->buffer_mutex); - curl_multi_remove_handle (context->multi_handle, - context->request_removal_element->curl_handle); - if (qelement->p->state == GSTCURL_UNLOCK) { - qelement->p->pending_state = GSTCURL_REMOVED; - } else { - qelement->p->state = GSTCURL_REMOVED; + if (curl_message->easy_handle != NULL) { + curl_multi_remove_handle (context->multi_handle, + curl_message->easy_handle); + gst_curl_http_src_remove_queue_handle (&context->queue, + curl_message->easy_handle, curl_message->data.result); } - g_cond_signal (&qelement->p->signal); - g_mutex_unlock (&qelement->p->buffer_mutex); - gst_curl_http_src_remove_queue_item (&context->queue, qelement->p); } - qelement = qnext; } - context->request_removal_element = NULL; - context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING; - g_mutex_unlock (&context->mutex); - } else { - GSTCURL_WARNING_PRINT ("Curl Loop State was invalid or unsupported"); - GSTCURL_WARNING_PRINT ("Signal State is %d, resetting to RUNNING.", - context->state); - /* Reset to running, so if there isn't anything to do it'll be - * changed the WAIT once curl_multi_perform says it has no active - * handles. */ - context->state = GSTCURL_MULTI_LOOP_STATE_RUNNING; - g_mutex_unlock (&context->mutex); } +out: + g_mutex_unlock (&context->mutex); } /* @@ -1763,8 +1843,8 @@ gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb, /* If header field already exists, append to the end */ if (gst_structure_has_field (response_headers, header_key) == TRUE) { header_value = g_strdup_printf ("%s, %s", - g_value_get_string (gst_structure_get_value (response_headers, - header_key)), header_tpl[1]); + gst_structure_get_string (response_headers, header_key), + header_tpl[1]); gst_structure_set ((GstStructure *) response_headers, header_key, G_TYPE_STRING, header_value, NULL); g_free (header_value); @@ -1850,7 +1930,7 @@ gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src) } memcpy (s->buffer + s->buffer_len, chunk, chunk_len); s->buffer_len += chunk_len; - g_cond_signal (&s->signal); + g_cond_signal (&s->buffer_cond); g_mutex_unlock (&s->buffer_mutex); return chunk_len; } @@ -1864,10 +1944,29 @@ gst_curl_http_src_request_remove (GstCurlHttpSrc * src) GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC, GstCurlHttpSrcClass); - g_mutex_lock (&klass->multi_task_context.mutex); - klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL; - klass->multi_task_context.request_removal_element = src; + g_mutex_lock (&klass->multi_task_context.mutex); + g_mutex_lock (&src->buffer_mutex); + if (src->connection_status == GSTCURL_CONNECTED) { + src->connection_status = GSTCURL_WANT_REMOVAL; + } + g_mutex_unlock (&src->buffer_mutex); g_cond_signal (&klass->multi_task_context.signal); g_mutex_unlock (&klass->multi_task_context.mutex); } + +/* + * Request a cancellation of a currently running curl handle and + * block this thread until the src element has been removed + * from the queue + */ +static void +gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src) +{ + gst_curl_http_src_request_remove (src); + g_mutex_lock (&src->buffer_mutex); + while (src->connection_status != GSTCURL_NOT_CONNECTED) { + g_cond_wait (&src->buffer_cond, &src->buffer_mutex); + } + g_mutex_unlock (&src->buffer_mutex); +} diff --git a/ext/curl/gstcurlhttpsrc.h b/ext/curl/gstcurlhttpsrc.h index 72af6dc3b..d7e65d2a7 100644 --- a/ext/curl/gstcurlhttpsrc.h +++ b/ext/curl/gstcurlhttpsrc.h @@ -110,18 +110,12 @@ struct _GstCurlHttpSrcMultiTaskContext guint refcount; GCond signal; - GstCurlHttpSrc *request_removal_element; - GstCurlHttpSrcQueueElement *queue; enum { - GSTCURL_MULTI_LOOP_STATE_WAIT = 0, - GSTCURL_MULTI_LOOP_STATE_QUEUE_EVENT, GSTCURL_MULTI_LOOP_STATE_RUNNING, - GSTCURL_MULTI_LOOP_STATE_REQUEST_REMOVAL, - GSTCURL_MULTI_LOOP_STATE_STOP, - GSTCURL_MULTI_LOOP_STATE_MAX + GSTCURL_MULTI_LOOP_STATE_STOP } state; /* < private > */ @@ -200,11 +194,16 @@ struct _GstCurlHttpSrc } state, pending_state; CURL *curl_handle; GMutex buffer_mutex; - GCond signal; + GCond buffer_cond; gchar *buffer; guint buffer_len; gboolean transfer_begun; gboolean data_received; + enum { + GSTCURL_NOT_CONNECTED, + GSTCURL_CONNECTED, + GSTCURL_WANT_REMOVAL + } connection_status; /* * Response Headers @@ -220,34 +219,6 @@ struct _GstCurlHttpSrc GstCaps *caps; }; -enum -{ - PROP_0, - PROP_URI, - PROP_USERNAME, - PROP_PASSWORD, - PROP_PROXYURI, - PROP_PROXYUSERNAME, - PROP_PROXYPASSWORD, - PROP_COOKIES, - PROP_USERAGENT, - PROP_HEADERS, - PROP_COMPRESS, - PROP_REDIRECT, - PROP_MAXREDIRECT, - PROP_KEEPALIVE, - PROP_TIMEOUT, - PROP_STRICT_SSL, - PROP_SSL_CA_FILE, - PROP_RETRIES, - PROP_CONNECTIONMAXTIME, - PROP_MAXCONCURRENT_SERVER, - PROP_MAXCONCURRENT_PROXY, - PROP_MAXCONCURRENT_GLOBAL, - PROP_HTTPVERSION, - PROP_MAX -}; - GType gst_curl_http_src_get_type (void); G_END_DECLS diff --git a/ext/curl/gstcurlqueue.c b/ext/curl/gstcurlqueue.c index bfe6da957..627a2ba25 100644 --- a/ext/curl/gstcurlqueue.c +++ b/ext/curl/gstcurlqueue.c @@ -83,8 +83,9 @@ gst_curl_http_src_add_queue_item (GstCurlHttpSrcQueueElement ** queue, } insert_point->p = s; - g_mutex_init (&insert_point->running); + g_atomic_int_set (&insert_point->running, 0); insert_point->next = NULL; + s->connection_status = GSTCURL_CONNECTED; return TRUE; } @@ -127,6 +128,7 @@ gst_curl_http_src_remove_queue_item (GstCurlHttpSrcQueueElement ** queue, prev_qelement->next = this_qelement->next; } g_free (this_qelement); + s->connection_status = GSTCURL_NOT_CONNECTED; return TRUE; } @@ -164,12 +166,13 @@ gst_curl_http_src_remove_queue_handle (GstCurlHttpSrcQueueElement ** queue, this_qelement->p->uri); */ /* First, signal the transfer owner thread to wake up */ g_mutex_lock (&this_qelement->p->buffer_mutex); - g_cond_signal (&this_qelement->p->signal); + g_cond_signal (&this_qelement->p->buffer_cond); if (this_qelement->p->state != GSTCURL_UNLOCK) { this_qelement->p->state = GSTCURL_DONE; } else { this_qelement->p->pending_state = GSTCURL_DONE; } + this_qelement->p->connection_status = GSTCURL_NOT_CONNECTED; this_qelement->p->curl_result = result; g_mutex_unlock (&this_qelement->p->buffer_mutex); diff --git a/ext/curl/gstcurlqueue.h b/ext/curl/gstcurlqueue.h index 58eb340f4..e43359a58 100644 --- a/ext/curl/gstcurlqueue.h +++ b/ext/curl/gstcurlqueue.h @@ -51,7 +51,7 @@ struct _GstCurlHttpSrcQueueElement { GstCurlHttpSrc *p; - GMutex running; + volatile gint running; GstCurlHttpSrcQueueElement *next; }; |