summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-06-05 21:06:52 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-10 14:28:03 -0700
commit4849c8c1f2dcf386117d3e3ae5d87da256bc2277 (patch)
treeb15ff826098efd843154274bdb906a3de25420a9
parent284f6a20df1a895a7c2752a628ab6e28a0b74c99 (diff)
downloadceph-4849c8c1f2dcf386117d3e3ae5d87da256bc2277.tar.gz
rgw: stream obj into http request
still need to figure out curl handle polling, handle client errors correctly. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_http_client.cc168
-rw-r--r--src/rgw/rgw_http_client.h4
-rw-r--r--src/rgw/rgw_rados.cc14
-rw-r--r--src/rgw/rgw_rest_client.cc109
-rw-r--r--src/rgw/rgw_rest_client.h29
-rw-r--r--src/rgw/rgw_rest_conn.cc23
-rw-r--r--src/rgw/rgw_rest_conn.h11
7 files changed, 334 insertions, 24 deletions
diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc
index 1f8f3d636b2..a3a8c42bdeb 100644
--- a/src/rgw/rgw_http_client.cc
+++ b/src/rgw/rgw_http_client.cc
@@ -1,5 +1,6 @@
#include <curl/curl.h>
#include <curl/easy.h>
+#include <curl/multi.h>
#include "rgw_common.h"
#include "rgw_http_client.h"
@@ -96,4 +97,171 @@ int RGWHTTPClient::process(const char *method, const char *url)
return ret;
}
+struct multi_req_data {
+ CURL *easy_handle;
+ CURLM *multi_handle;
+ curl_slist *h;
+ multi_req_data() : easy_handle(NULL), multi_handle(NULL), h(NULL) {}
+ ~multi_req_data() {
+ if (multi_handle)
+ curl_multi_cleanup(multi_handle);
+
+ if (easy_handle)
+ curl_easy_cleanup(easy_handle);
+
+ if (h)
+ curl_slist_free_all(h);
+ }
+};
+
+int RGWHTTPClient::init_async(const char *method, const char *url, void **handle)
+{
+ CURL *easy_handle;
+ CURLM *multi_handle;
+ multi_req_data *req_data = new multi_req_data;
+ *handle = (void *)req_data;
+
+ char error_buf[CURL_ERROR_SIZE];
+
+ multi_handle = curl_multi_init();
+ easy_handle = curl_easy_init();
+
+ req_data->multi_handle = multi_handle;
+ req_data->easy_handle = easy_handle;
+
+ CURLMcode mstatus = curl_multi_add_handle(multi_handle, easy_handle);
+ if (mstatus) {
+ dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
+ delete req_data;
+ return -EIO;
+ }
+
+ dout(20) << "sending request to " << url << dendl;
+
+ curl_slist *h = NULL;
+
+ list<pair<string, string> >::iterator iter;
+ for (iter = headers.begin(); iter != headers.end(); ++iter) {
+ pair<string, string>& p = *iter;
+ string val = p.first;
+
+ if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
+ val = val.substr(5);
+ }
+ val.append(": ");
+ val.append(p.second);
+ h = curl_slist_append(h, val.c_str());
+ }
+
+ req_data->h = h;
+
+ curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
+ curl_easy_setopt(easy_handle, CURLOPT_URL, url);
+ curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
+ curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
+ curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
+ curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)this);
+ curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
+ curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)this);
+ curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
+ if (h) {
+ curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
+ }
+ curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
+ curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)this);
+ curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
+ if (has_send_len) {
+ curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
+ }
+
+ return 0;
+}
+
+
+int RGWHTTPClient::process_request(void *handle, bool *done)
+{
+ multi_req_data *req_data = (multi_req_data *)handle;
+ int still_running;
+ int mstatus;
+
+ do {
+#if 0
+ struct timeval timeout;
+
+ fd_set fdread;
+ fd_set fdwrite;
+ fd_set fdexcep;
+ int maxfd = -1;
+
+ long curl_timeo = -1;
+
+ FD_ZERO(&fdread);
+ FD_ZERO(&fdwrite);
+ FD_ZERO(&fdexcep);
+#if 0
+ /* set a suitable timeout to play around with */
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+
+ curl_multi_timeout(multi_handle, &curl_timeo);
+ if(curl_timeo >= 0) {
+ timeout.tv_sec = curl_timeo / 1000;
+ if(timeout.tv_sec > 1)
+ timeout.tv_sec = 1;
+ else
+ timeout.tv_usec = (curl_timeo % 1000) * 1000;
+ }
+#endif
+
+ /* get file descriptors from the transfers */
+ int ret = curl_multi_fdset(req_data->multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd);
+ if (ret) {
+ dout(0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
+ return -EIO;
+ }
+
+#warning FIXME: replace select with poll
+ ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, NULL);
+ if (ret < 0) {
+ ret = -errno;
+ dout(0) << "ERROR: select returned " << ret << dendl;
+ return ret;
+ }
+#endif
+ mstatus = curl_multi_perform(req_data->multi_handle, &still_running);
+ dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
+ switch (mstatus) {
+ case CURLM_OK:
+ case CURLM_CALL_MULTI_PERFORM:
+ break;
+ default:
+ return -EINVAL;
+ }
+ int msgs_left;
+ CURLMsg *msg;
+ while ((msg = curl_multi_info_read(req_data->multi_handle, &msgs_left))) {
+ if (msg->msg == CURLMSG_DONE) {
+#warning FIXME: check result
+ dout(20) << "msg->data.result=" << msg->data.result << dendl;
+ }
+ }
+ } while (mstatus == CURLM_CALL_MULTI_PERFORM);
+
+ *done = (still_running == 0);
+
+ return 0;
+}
+
+int RGWHTTPClient::complete_request(void *handle)
+{
+ bool done;
+ int ret;
+ do {
+ ret = process_request(handle, &done);
+ } while (!done && !ret);
+ multi_req_data *req_data = (multi_req_data *)handle;
+ delete req_data;
+
+ return ret;
+}
diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h
index da7ab875beb..937dfe76ab6 100644
--- a/src/rgw/rgw_http_client.h
+++ b/src/rgw/rgw_http_client.h
@@ -30,6 +30,10 @@ public:
int process(const char *method, const char *url);
int process(const char *url) { return process("GET", url); }
+
+ int init_async(const char *method, const char *url, void **handle);
+ int process_request(void *handle,bool *done);
+ int complete_request(void *handle);
};
#endif
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 4ae126e8348..b0ede4ff9df 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -1950,10 +1950,22 @@ int RGWRados::copy_obj(void *ctx,
/* dest is in a different region, copy it there */
map<string, bufferlist> src_attrs;
+
+ RGWRESTStreamRequest *out_stream_req;
- int ret = rest_conn->put_obj(user_id, dest_obj, astate->size, NULL);
+ int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, &out_stream_req);
if (ret < 0)
return ret;
+
+ ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
+ if (ret < 0)
+ return ret;
+
+ ret = rest_conn->complete_request(out_stream_req);
+ if (ret < 0)
+ return ret;
+
+ return 0;
}
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc
index 6702237b745..1a85aa5c764 100644
--- a/src/rgw/rgw_rest_client.cc
+++ b/src/rgw/rgw_rest_client.cc
@@ -2,13 +2,14 @@
#include "rgw_rest_client.h"
#include "rgw_auth_s3.h"
#include "rgw_http_errors.h"
+#include "rgw_rados.h"
#include "common/ceph_crypto_cms.h"
#include "common/armor.h"
#define dout_subsys ceph_subsys_rgw
-int RGWRESTClient::receive_header(void *ptr, size_t len)
+int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
{
char line[len + 1];
@@ -60,7 +61,7 @@ static void get_new_date_str(CephContext *cct, string& date_str)
date_str = s.str();
}
-int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *resource)
+int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource)
{
string new_url = url;
string new_resource = resource;
@@ -102,7 +103,7 @@ int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *re
return rgw_http_error_to_errno(status);
}
-int RGWRESTClient::send_data(void *ptr, size_t len)
+int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
{
if (!send_iter)
return 0;
@@ -115,7 +116,7 @@ int RGWRESTClient::send_data(void *ptr, size_t len)
return len;
}
-int RGWRESTClient::receive_data(void *ptr, size_t len)
+int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len)
{
if (response.length() > max_response)
return 0; /* don't read extra data */
@@ -127,7 +128,8 @@ int RGWRESTClient::receive_data(void *ptr, size_t len)
return 0;
}
-void RGWRESTClient::append_param(string& dest, const string& name, const string& val)
+
+void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val)
{
if (dest.empty()) {
dest.append("?");
@@ -142,7 +144,7 @@ void RGWRESTClient::append_param(string& dest, const string& name, const string&
}
}
-void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest)
+void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest)
{
map<string, string>::iterator miter;
for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) {
@@ -154,7 +156,7 @@ void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest
}
}
-int RGWRESTClient::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
+int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
{
map<string, string>& m = env.get_map();
@@ -185,7 +187,7 @@ int RGWRESTClient::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info)
return 0;
}
-int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
+int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
{
string date_str;
@@ -246,7 +248,43 @@ int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max
return rgw_http_error_to_errno(status);
}
-int RGWRESTClient::put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *))
+class RGWRESTStreamOutCB : public RGWGetDataCB {
+ RGWRESTStreamRequest *req;
+public:
+ RGWRESTStreamOutCB(RGWRESTStreamRequest *_req) : req(_req) {}
+ int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len); /* callback for object iteration when sending data */
+};
+
+int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+ dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl;
+ if (!bl_ofs && bl_len == bl.length()) {
+ return req->add_output_data(bl);
+ }
+
+ bufferptr bp(bl.c_str() + bl_ofs, bl_len);
+ bufferlist new_bl;
+ new_bl.push_back(bp);
+
+ return req->add_output_data(new_bl);
+}
+
+RGWRESTStreamRequest::~RGWRESTStreamRequest()
+{
+ delete cb;
+}
+
+int RGWRESTStreamRequest::add_output_data(bufferlist& bl)
+{
+ lock.Lock();
+ pending_send.push_back(bl);
+ lock.Unlock();
+
+ bool done;
+ return process_request(handle, &done);
+}
+
+int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size)
{
string resource = obj.bucket.name + "/" + obj.object;
string new_url = url;
@@ -285,10 +323,61 @@ int RGWRESTClient::put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, v
headers.push_back(make_pair<string, string>(iter->first, iter->second));
}
- int r = process(new_info.method, new_url.c_str());
+ cb = new RGWRESTStreamOutCB(this);
+
+ set_send_length(obj_size);
+
+ int r = init_async(new_info.method, new_url.c_str(), &handle);
if (r < 0)
return r;
return 0;
}
+int RGWRESTStreamRequest::send_data(void *ptr, size_t len)
+{
+ uint64_t sent = 0;
+
+ dout(20) << "RGWRESTStreamRequest::send_data()" << dendl;
+ lock.Lock();
+ if (pending_send.empty()) {
+ lock.Unlock();
+ return 0;
+ }
+
+ list<bufferlist>::iterator iter = pending_send.begin();
+ while (iter != pending_send.end() && len > 0) {
+ bufferlist& bl = *iter;
+
+ list<bufferlist>::iterator next_iter = iter;
+ ++next_iter;
+ lock.Unlock();
+
+ uint64_t send_len = min(len, (size_t)bl.length());
+
+ memcpy(ptr, bl.c_str(), send_len);
+
+ len -= send_len;
+ sent += send_len;
+
+ lock.Lock();
+ pending_send.pop_front();
+
+ if (bl.length() > send_len) {
+ bufferptr bp(bl.c_str() + send_len, bl.length() - send_len);
+ bufferlist new_bl;
+ new_bl.append(bp);
+ pending_send.push_front(new_bl);
+ }
+ iter = next_iter;
+ }
+ lock.Unlock();
+
+ return sent;
+}
+
+
+int RGWRESTStreamRequest::complete()
+{
+ return complete_request(handle);
+}
diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h
index fee9c1d38b4..48a0bf04a1d 100644
--- a/src/rgw/rgw_rest_client.h
+++ b/src/rgw/rgw_rest_client.h
@@ -5,7 +5,9 @@
#include "rgw_http_client.h"
-class RGWRESTClient : public RGWHTTPClient {
+class RGWGetDataCB;
+
+class RGWRESTSimpleRequest : public RGWHTTPClient {
protected:
CephContext *cct;
@@ -26,7 +28,7 @@ protected:
int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info);
public:
- RGWRESTClient(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+ RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
max_response(0) {
if (_headers)
@@ -38,16 +40,33 @@ public:
int receive_header(void *ptr, size_t len);
virtual int receive_data(void *ptr, size_t len);
- int send_data(void *ptr, size_t len);
+ virtual int send_data(void *ptr, size_t len);
bufferlist& get_response() { return response; }
int execute(RGWAccessKey& key, const char *method, const char *resource);
int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
-
- int put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *));
};
+class RGWRESTStreamRequest : public RGWRESTSimpleRequest {
+ Mutex lock;
+ list<bufferlist> pending_send;
+ void *handle;
+ RGWGetDataCB *cb;
+public:
+ int add_output_data(bufferlist& bl);
+ int send_data(void *ptr, size_t len);
+
+ RGWRESTStreamRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
+ list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params),
+ lock("RGWRESTStreamRequest"), handle(NULL), cb(NULL) {}
+ ~RGWRESTStreamRequest();
+ int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size);
+ int complete();
+
+ RGWGetDataCB *get_out_cb() { return cb; }
+};
+
#endif
diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc
index ddcad57f4e8..cba8548d3b4 100644
--- a/src/rgw/rgw_rest_conn.cc
+++ b/src/rgw/rgw_rest_conn.cc
@@ -36,11 +36,17 @@ int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_r
list<pair<string, string> > params;
params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
- RGWRESTClient client(cct, url, NULL, &params);
- return client.forward_request(key, info, max_response, inbl, outbl);
+ RGWRESTSimpleRequest req(cct, url, NULL, &params);
+ return req.forward_request(key, info, max_response, inbl, outbl);
}
-int RGWRegionConnection::put_obj(const string& uid, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *))
+class StreamObjData : public RGWGetDataCB {
+ rgw_obj obj;
+public:
+ StreamObjData(rgw_obj& _obj) : obj(_obj) {}
+};
+
+int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, RGWRESTStreamRequest **req)
{
string url;
int ret = get_url(url);
@@ -50,7 +56,14 @@ int RGWRegionConnection::put_obj(const string& uid, rgw_obj& obj, uint64_t obj_s
list<pair<string, string> > params;
params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
- RGWRESTClient client(cct, url, NULL, &params);
- return client.put_obj(key, obj, obj_size, get_data);
+ *req = new RGWRESTStreamRequest(cct, url, NULL, &params);
+ return (*req)->put_obj_init(key, obj, obj_size);
}
+int RGWRegionConnection::complete_request(RGWRESTStreamRequest *req)
+{
+ int ret = req->complete();
+ delete req;
+
+ return ret;
+}
diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h
index 00bb286b130..1554e513db1 100644
--- a/src/rgw/rgw_rest_conn.h
+++ b/src/rgw/rgw_rest_conn.h
@@ -1,11 +1,12 @@
-#ifndef CEPH_RGW_REST_REQ_H
-#define CEPH_RGW_REST_REQ_H
+#ifndef CEPH_RGW_REST_CONN_H
+#define CEPH_RGW_REST_CONN_H
#include "rgw_rest_client.h"
class CephContext;
class RGWRados;
class RGWRegion;
+class RGWGetObjData;
class RGWRegionConnection
{
@@ -19,8 +20,12 @@ public:
RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream);
int get_url(string& endpoint);
+ /* sync request */
int forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
- int put_obj(const string& uid, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *));
+
+ /* async request */
+ int put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, RGWRESTStreamRequest **req);
+ int complete_request(RGWRESTStreamRequest *req);
};
#endif