summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rest-extras/youtube-proxy.c48
-rw-r--r--rest-extras/youtube-proxy.h4
-rw-r--r--rest/rest-proxy-call-private.h1
-rw-r--r--rest/rest-proxy-call.c151
-rw-r--r--rest/rest-proxy-call.h13
5 files changed, 205 insertions, 12 deletions
diff --git a/rest-extras/youtube-proxy.c b/rest-extras/youtube-proxy.c
index af39e20..39a1ac1 100644
--- a/rest-extras/youtube-proxy.c
+++ b/rest-extras/youtube-proxy.c
@@ -144,7 +144,7 @@ RestProxy *
youtube_proxy_new_with_auth (const char *developer_key,
const char *user_auth)
{
- return g_object_new (YOUTUBE_TYPE_PROXY,
+ return g_object_new (YOUTUBE_TYPE_PROXY,
"developer-key", developer_key,
"user-auth", user_auth,
NULL);
@@ -245,13 +245,14 @@ typedef struct {
SoupMessage *message;
GObject *weak_object;
gpointer user_data;
+ gsize uploaded;
} YoutubeProxyUploadClosure;
static void
_upload_async_weak_notify_cb (gpointer *data,
GObject *dead_object)
{
- YoutubeProxyUploadClosure *closure =
+ YoutubeProxyUploadClosure *closure =
(YoutubeProxyUploadClosure *) data;
_rest_proxy_cancel_message (REST_PROXY (closure->proxy), closure->message);
@@ -279,7 +280,7 @@ _upload_async_closure_new (YoutubeProxy *self,
{
YoutubeProxyUploadClosure *closure =
g_slice_new0 (YoutubeProxyUploadClosure);
-
+
closure->proxy = g_object_ref (self);
closure->callback = callback;
closure->message = message;
@@ -310,12 +311,31 @@ _upload_completed_cb (SoupSession *session,
message->status_code,
message->reason_phrase);
- closure->callback (closure->proxy, message->response_body->data, error,
- closure->weak_object, closure->user_data);
+ closure->callback (closure->proxy, message->response_body->data,
+ message->request_body->length,
+ message->request_body->length,
+ error, closure->weak_object, closure->user_data);
_upload_async_closure_free (closure);
}
+static void
+_message_wrote_data_cb (SoupMessage *msg,
+ SoupBuffer *chunk,
+ YoutubeProxyUploadClosure *closure)
+{
+ closure->uploaded = closure->uploaded + chunk->length;
+
+ if (closure->uploaded < msg->request_body->length)
+ closure->callback (closure->proxy,
+ NULL,
+ msg->request_body->length,
+ closure->uploaded,
+ NULL,
+ closure->weak_object,
+ closure->user_data);
+}
+
gboolean
youtube_proxy_upload_async (YoutubeProxy *self,
const gchar *filename,
@@ -355,7 +375,7 @@ youtube_proxy_upload_async (YoutubeProxy *self,
soup_message_headers_append (part_headers, "Content-Type",
"application/atom+xml; charset=UTF-8");
-
+
soup_multipart_append_part (mp, part_headers, sb);
soup_buffer_free (sb);
@@ -364,21 +384,21 @@ youtube_proxy_upload_async (YoutubeProxy *self,
filename,
(const guchar*) g_mapped_file_get_contents (map),
g_mapped_file_get_length (map),
- NULL);
-
+ NULL);
+
sb = soup_buffer_new_with_owner (g_mapped_file_get_contents (map),
g_mapped_file_get_length (map),
map,
(GDestroyNotify) g_mapped_file_unref);
soup_message_headers_replace (part_headers, "Content-Type", content_type);
-
+
soup_multipart_append_part (mp, part_headers, sb);
-
+
soup_buffer_free (sb);
soup_message_headers_free (part_headers);
-
+
message = soup_form_request_new_from_multipart (UPLOAD_URL, mp);
soup_multipart_free (mp);
@@ -388,6 +408,12 @@ youtube_proxy_upload_async (YoutubeProxy *self,
closure = _upload_async_closure_new (self, callback, message, weak_object,
userdata);
+ g_signal_connect (message,
+ "wrote-body-data",
+ (GCallback) _message_wrote_data_cb,
+ closure);
+
+
_rest_proxy_queue_message (REST_PROXY (self), message, _upload_completed_cb,
closure);
diff --git a/rest-extras/youtube-proxy.h b/rest-extras/youtube-proxy.h
index 440fd6a..943f9f0 100644
--- a/rest-extras/youtube-proxy.h
+++ b/rest-extras/youtube-proxy.h
@@ -75,8 +75,10 @@ RestProxy* youtube_proxy_new_with_auth (const gchar *developer_key,
void youtube_proxy_set_user_auth (YoutubeProxy *proxy,
const gchar *user_auth);
-typedef void (*YoutubeProxyUploadCallback)(YoutubeProxy *proxy,
+typedef void (*YoutubeProxyUploadCallback)(YoutubeProxy *proxy,
const gchar *payload,
+ gsize total,
+ gsize uploaded,
const GError *error,
GObject *weak_object,
gpointer userdata);
diff --git a/rest/rest-proxy-call-private.h b/rest/rest-proxy-call-private.h
index 584a77b..ec7a4be 100644
--- a/rest/rest-proxy-call-private.h
+++ b/rest/rest-proxy-call-private.h
@@ -31,6 +31,7 @@ G_BEGIN_DECLS
typedef struct _RestProxyCallAsyncClosure RestProxyCallAsyncClosure;
typedef struct _RestProxyCallContinuousClosure RestProxyCallContinuousClosure;
+typedef struct _RestProxyCallUploadClosure RestProxyCallUploadClosure;
struct _RestProxyCallPrivate {
gchar *method;
diff --git a/rest/rest-proxy-call.c b/rest/rest-proxy-call.c
index 6ffa56c..4d45a16 100644
--- a/rest/rest-proxy-call.c
+++ b/rest/rest-proxy-call.c
@@ -50,6 +50,15 @@ struct _RestProxyCallContinuousClosure {
SoupMessage *message;
};
+struct _RestProxyCallUploadClosure {
+ RestProxyCall *call;
+ RestProxyCallUploadCallback callback;
+ GObject *weak_object;
+ gpointer userdata;
+ SoupMessage *message;
+ gsize uploaded;
+};
+
enum
{
PROP_0 = 0,
@@ -994,6 +1003,148 @@ error:
return FALSE;
}
+static void
+_upload_call_message_completed_cb (SoupSession *session,
+ SoupMessage *message,
+ gpointer user_data)
+{
+ RestProxyCall *call;
+ RestProxyCallPrivate *priv;
+ GError *error = NULL;
+ RestProxyCallUploadClosure *closure;
+
+ closure = (RestProxyCallUploadClosure *) user_data;
+ call = closure->call;
+ priv = GET_PRIVATE (call);
+
+ priv->status_code = message->status_code;
+ priv->status_message = g_strdup (message->reason_phrase);
+
+ _handle_error_from_message (message, &error);
+
+ closure->callback (closure->call,
+ closure->uploaded,
+ closure->uploaded,
+ error,
+ closure->weak_object,
+ closure->userdata);
+
+ g_clear_error (&error);
+
+ /* Success. We don't need the weak reference any more */
+ if (closure->weak_object)
+ {
+ g_object_weak_unref (closure->weak_object,
+ (GWeakNotify)_call_async_weak_notify_cb,
+ closure);
+ }
+
+ priv->cur_call_closure = NULL;
+ g_object_unref (closure->call);
+ g_slice_free (RestProxyCallUploadClosure, closure);
+}
+
+static void
+_upload_call_message_wrote_data_cb (SoupMessage *msg,
+ SoupBuffer *chunk,
+ RestProxyCallUploadClosure *closure)
+{
+ closure->uploaded = closure->uploaded + chunk->length;
+
+ if (closure->uploaded < msg->request_body->length)
+ closure->callback (closure->call,
+ msg->request_body->length,
+ closure->uploaded,
+ NULL,
+ closure->weak_object,
+ closure->userdata);
+}
+
+/**
+ * rest_proxy_call_upload:
+ * @call: The #RestProxyCall
+ * @callback: a #RestProxyCallUploadCallback to invoke when a chunk of data was
+ * uploaded
+ * @weak_object: The #GObject to weakly reference and tie the lifecycle to
+ * @userdata: data to pass to @callback
+ * @error: a #GError, or %NULL
+ *
+ * Asynchronously invoke @call but expect to have the callback invoked every time a
+ * chunk of our request's body is written.
+ *
+ * When the callback is invoked with the uploaded byte count equaling the message
+ * byte count, the call has completed.
+ *
+ * If @weak_object is disposed during the call then this call will be
+ * cancelled. If the call is cancelled then the callback will be invoked with
+ * an error state.
+ *
+ * You may unref the call after calling this function since there is an
+ * internal reference, or you may unref in the callback.
+ */
+gboolean
+rest_proxy_call_upload (RestProxyCall *call,
+ RestProxyCallUploadCallback callback,
+ GObject *weak_object,
+ gpointer userdata,
+ GError **error)
+{
+ RestProxyCallPrivate *priv;
+ SoupMessage *message;
+ RestProxyCallUploadClosure *closure;
+
+ g_return_val_if_fail (REST_IS_PROXY_CALL (call), FALSE);
+ priv = GET_PRIVATE (call);
+ g_assert (priv->proxy);
+
+ if (priv->cur_call_closure)
+ {
+ /* FIXME: Use GError here */
+ g_critical (G_STRLOC ": Call already in progress.");
+ return FALSE;
+ }
+
+ message = prepare_message (call, error);
+ if (message == NULL)
+ goto error;
+
+ closure = g_slice_new0 (RestProxyCallUploadClosure);
+ closure->call = g_object_ref (call);
+ closure->callback = callback;
+ closure->weak_object = weak_object;
+ closure->message = message;
+ closure->userdata = userdata;
+ closure->uploaded = 0;
+
+ priv->cur_call_closure = (RestProxyCallAsyncClosure *)closure;
+
+ /* Weakly reference this object. We remove our callback if it goes away. */
+ if (closure->weak_object)
+ {
+ g_object_weak_ref (closure->weak_object,
+ (GWeakNotify)_call_async_weak_notify_cb,
+ closure);
+ }
+
+ g_signal_connect (message,
+ "wrote-body-data",
+ (GCallback) _upload_call_message_wrote_data_cb,
+ closure);
+
+ _rest_proxy_queue_message (priv->proxy,
+ message,
+ _upload_call_message_completed_cb,
+ closure);
+ g_free (priv->url);
+ priv->url = NULL;
+ return TRUE;
+
+error:
+ g_free (priv->url);
+ priv->url = NULL;
+ return FALSE;
+}
+
/**
* rest_proxy_call_cancel:
* @call: The #RestProxyCall
diff --git a/rest/rest-proxy-call.h b/rest/rest-proxy-call.h
index 918b35c..a8085d9 100644
--- a/rest/rest-proxy-call.h
+++ b/rest/rest-proxy-call.h
@@ -176,6 +176,19 @@ gboolean rest_proxy_call_continuous (RestProxyCall *call,
gpointer userdata,
GError **error);
+typedef void (*RestProxyCallUploadCallback) (RestProxyCall *call,
+ gsize total,
+ gsize uploaded,
+ const GError *error,
+ GObject *weak_object,
+ gpointer userdata);
+
+gboolean rest_proxy_call_upload (RestProxyCall *call,
+ RestProxyCallUploadCallback cb,
+ GObject *weak_object,
+ gpointer userdata,
+ GError **error);
+
gboolean rest_proxy_call_cancel (RestProxyCall *call);
gboolean rest_proxy_call_sync (RestProxyCall *call, GError **error_out);