diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-05 21:06:52 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-10 14:28:03 -0700 |
commit | 4849c8c1f2dcf386117d3e3ae5d87da256bc2277 (patch) | |
tree | b15ff826098efd843154274bdb906a3de25420a9 | |
parent | 284f6a20df1a895a7c2752a628ab6e28a0b74c99 (diff) | |
download | ceph-4849c8c1f2dcf386117d3e3ae5d87da256bc2277.tar.gz |
rgw: stream obj into http request
still need to figure out curl handle polling, handle client
errors correctly.
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/rgw/rgw_http_client.cc | 168 | ||||
-rw-r--r-- | src/rgw/rgw_http_client.h | 4 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 14 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.cc | 109 | ||||
-rw-r--r-- | src/rgw/rgw_rest_client.h | 29 | ||||
-rw-r--r-- | src/rgw/rgw_rest_conn.cc | 23 | ||||
-rw-r--r-- | src/rgw/rgw_rest_conn.h | 11 |
7 files changed, 334 insertions, 24 deletions
diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 1f8f3d636b2..a3a8c42bdeb 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -1,5 +1,6 @@ #include <curl/curl.h> #include <curl/easy.h> +#include <curl/multi.h> #include "rgw_common.h" #include "rgw_http_client.h" @@ -96,4 +97,171 @@ int RGWHTTPClient::process(const char *method, const char *url) return ret; } +struct multi_req_data { + CURL *easy_handle; + CURLM *multi_handle; + curl_slist *h; + multi_req_data() : easy_handle(NULL), multi_handle(NULL), h(NULL) {} + ~multi_req_data() { + if (multi_handle) + curl_multi_cleanup(multi_handle); + + if (easy_handle) + curl_easy_cleanup(easy_handle); + + if (h) + curl_slist_free_all(h); + } +}; + +int RGWHTTPClient::init_async(const char *method, const char *url, void **handle) +{ + CURL *easy_handle; + CURLM *multi_handle; + multi_req_data *req_data = new multi_req_data; + *handle = (void *)req_data; + + char error_buf[CURL_ERROR_SIZE]; + + multi_handle = curl_multi_init(); + easy_handle = curl_easy_init(); + + req_data->multi_handle = multi_handle; + req_data->easy_handle = easy_handle; + + CURLMcode mstatus = curl_multi_add_handle(multi_handle, easy_handle); + if (mstatus) { + dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; + delete req_data; + return -EIO; + } + + dout(20) << "sending request to " << url << dendl; + + curl_slist *h = NULL; + + list<pair<string, string> >::iterator iter; + for (iter = headers.begin(); iter != headers.end(); ++iter) { + pair<string, string>& p = *iter; + string val = p.first; + + if (strncmp(val.c_str(), "HTTP_", 5) == 0) { + val = val.substr(5); + } + val.append(": "); + val.append(p.second); + h = curl_slist_append(h, val.c_str()); + } + + req_data->h = h; + + curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method); + curl_easy_setopt(easy_handle, CURLOPT_URL, url); + curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); + curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); + curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)this); + curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); + curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)this); + curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); + if (h) { + curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); + } + curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); + curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)this); + curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); + if (has_send_len) { + curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); + } + + return 0; +} + + +int RGWHTTPClient::process_request(void *handle, bool *done) +{ + multi_req_data *req_data = (multi_req_data *)handle; + int still_running; + int mstatus; + + do { +#if 0 + struct timeval timeout; + + fd_set fdread; + fd_set fdwrite; + fd_set fdexcep; + int maxfd = -1; + + long curl_timeo = -1; + + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); +#if 0 + /* set a suitable timeout to play around with */ + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + curl_multi_timeout(multi_handle, &curl_timeo); + if(curl_timeo >= 0) { + timeout.tv_sec = curl_timeo / 1000; + if(timeout.tv_sec > 1) + timeout.tv_sec = 1; + else + timeout.tv_usec = (curl_timeo % 1000) * 1000; + } +#endif + + /* get file descriptors from the transfers */ + int ret = curl_multi_fdset(req_data->multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd); + if (ret) { + dout(0) << "ERROR: curl_multi_fdset returned " << ret << dendl; + return -EIO; + } + +#warning FIXME: replace select with poll + ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, NULL); + if (ret < 0) { + ret = -errno; + dout(0) << "ERROR: select returned " << ret << dendl; + return ret; + } +#endif + mstatus = curl_multi_perform(req_data->multi_handle, &still_running); + dout(20) << "curl_multi_perform returned: " << mstatus << dendl; + switch (mstatus) { + case CURLM_OK: + case CURLM_CALL_MULTI_PERFORM: + break; + default: + return -EINVAL; + } + int msgs_left; + CURLMsg *msg; + while ((msg = curl_multi_info_read(req_data->multi_handle, &msgs_left))) { + if (msg->msg == CURLMSG_DONE) { +#warning FIXME: check result + dout(20) << "msg->data.result=" << msg->data.result << dendl; + } + } + } while (mstatus == CURLM_CALL_MULTI_PERFORM); + + *done = (still_running == 0); + + return 0; +} + +int RGWHTTPClient::complete_request(void *handle) +{ + bool done; + int ret; + do { + ret = process_request(handle, &done); + } while (!done && !ret); + multi_req_data *req_data = (multi_req_data *)handle; + delete req_data; + + return ret; +} diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index da7ab875beb..937dfe76ab6 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -30,6 +30,10 @@ public: int process(const char *method, const char *url); int process(const char *url) { return process("GET", url); } + + int init_async(const char *method, const char *url, void **handle); + int process_request(void *handle,bool *done); + int complete_request(void *handle); }; #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4ae126e8348..b0ede4ff9df 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1950,10 +1950,22 @@ int RGWRados::copy_obj(void *ctx, /* dest is in a different region, copy it there */ map<string, bufferlist> src_attrs; + + RGWRESTStreamRequest *out_stream_req; - int ret = rest_conn->put_obj(user_id, dest_obj, astate->size, NULL); + int ret = rest_conn->put_obj_init(user_id, dest_obj, astate->size, &out_stream_req); if (ret < 0) return ret; + + ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb()); + if (ret < 0) + return ret; + + ret = rest_conn->complete_request(out_stream_req); + if (ret < 0) + return ret; + + return 0; } if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */ diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 6702237b745..1a85aa5c764 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -2,13 +2,14 @@ #include "rgw_rest_client.h" #include "rgw_auth_s3.h" #include "rgw_http_errors.h" +#include "rgw_rados.h" #include "common/ceph_crypto_cms.h" #include "common/armor.h" #define dout_subsys ceph_subsys_rgw -int RGWRESTClient::receive_header(void *ptr, size_t len) +int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len) { char line[len + 1]; @@ -60,7 +61,7 @@ static void get_new_date_str(CephContext *cct, string& date_str) date_str = s.str(); } -int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *resource) +int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const char *resource) { string new_url = url; string new_resource = resource; @@ -102,7 +103,7 @@ int RGWRESTClient::execute(RGWAccessKey& key, const char *method, const char *re return rgw_http_error_to_errno(status); } -int RGWRESTClient::send_data(void *ptr, size_t len) +int RGWRESTSimpleRequest::send_data(void *ptr, size_t len) { if (!send_iter) return 0; @@ -115,7 +116,7 @@ int RGWRESTClient::send_data(void *ptr, size_t len) return len; } -int RGWRESTClient::receive_data(void *ptr, size_t len) +int RGWRESTSimpleRequest::receive_data(void *ptr, size_t len) { if (response.length() > max_response) return 0; /* don't read extra data */ @@ -127,7 +128,8 @@ int RGWRESTClient::receive_data(void *ptr, size_t len) return 0; } -void RGWRESTClient::append_param(string& dest, const string& name, const string& val) + +void RGWRESTSimpleRequest::append_param(string& dest, const string& name, const string& val) { if (dest.empty()) { dest.append("?"); @@ -142,7 +144,7 @@ void RGWRESTClient::append_param(string& dest, const string& name, const string& } } -void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest) +void RGWRESTSimpleRequest::get_params_str(map<string, string>& extra_args, string& dest) { map<string, string>::iterator miter; for (miter = extra_args.begin(); miter != extra_args.end(); ++miter) { @@ -154,7 +156,7 @@ void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest } } -int RGWRESTClient::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info) +int RGWRESTSimpleRequest::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info) { map<string, string>& m = env.get_map(); @@ -185,7 +187,7 @@ int RGWRESTClient::sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info) return 0; } -int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl) +int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl) { string date_str; @@ -246,7 +248,43 @@ int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max return rgw_http_error_to_errno(status); } -int RGWRESTClient::put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *)) +class RGWRESTStreamOutCB : public RGWGetDataCB { + RGWRESTStreamRequest *req; +public: + RGWRESTStreamOutCB(RGWRESTStreamRequest *_req) : req(_req) {} + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len); /* callback for object iteration when sending data */ +}; + +int RGWRESTStreamOutCB::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) +{ + dout(20) << "RGWRESTStreamOutCB::handle_data bl.length()=" << bl.length() << " bl_ofs=" << bl_ofs << " bl_len=" << bl_len << dendl; + if (!bl_ofs && bl_len == bl.length()) { + return req->add_output_data(bl); + } + + bufferptr bp(bl.c_str() + bl_ofs, bl_len); + bufferlist new_bl; + new_bl.push_back(bp); + + return req->add_output_data(new_bl); +} + +RGWRESTStreamRequest::~RGWRESTStreamRequest() +{ + delete cb; +} + +int RGWRESTStreamRequest::add_output_data(bufferlist& bl) +{ + lock.Lock(); + pending_send.push_back(bl); + lock.Unlock(); + + bool done; + return process_request(handle, &done); +} + +int RGWRESTStreamRequest::put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size) { string resource = obj.bucket.name + "/" + obj.object; string new_url = url; @@ -285,10 +323,61 @@ int RGWRESTClient::put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, v headers.push_back(make_pair<string, string>(iter->first, iter->second)); } - int r = process(new_info.method, new_url.c_str()); + cb = new RGWRESTStreamOutCB(this); + + set_send_length(obj_size); + + int r = init_async(new_info.method, new_url.c_str(), &handle); if (r < 0) return r; return 0; } +int RGWRESTStreamRequest::send_data(void *ptr, size_t len) +{ + uint64_t sent = 0; + + dout(20) << "RGWRESTStreamRequest::send_data()" << dendl; + lock.Lock(); + if (pending_send.empty()) { + lock.Unlock(); + return 0; + } + + list<bufferlist>::iterator iter = pending_send.begin(); + while (iter != pending_send.end() && len > 0) { + bufferlist& bl = *iter; + + list<bufferlist>::iterator next_iter = iter; + ++next_iter; + lock.Unlock(); + + uint64_t send_len = min(len, (size_t)bl.length()); + + memcpy(ptr, bl.c_str(), send_len); + + len -= send_len; + sent += send_len; + + lock.Lock(); + pending_send.pop_front(); + + if (bl.length() > send_len) { + bufferptr bp(bl.c_str() + send_len, bl.length() - send_len); + bufferlist new_bl; + new_bl.append(bp); + pending_send.push_front(new_bl); + } + iter = next_iter; + } + lock.Unlock(); + + return sent; +} + + +int RGWRESTStreamRequest::complete() +{ + return complete_request(handle); +} diff --git a/src/rgw/rgw_rest_client.h b/src/rgw/rgw_rest_client.h index fee9c1d38b4..48a0bf04a1d 100644 --- a/src/rgw/rgw_rest_client.h +++ b/src/rgw/rgw_rest_client.h @@ -5,7 +5,9 @@ #include "rgw_http_client.h" -class RGWRESTClient : public RGWHTTPClient { +class RGWGetDataCB; + +class RGWRESTSimpleRequest : public RGWHTTPClient { protected: CephContext *cct; @@ -26,7 +28,7 @@ protected: int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info); public: - RGWRESTClient(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, + RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL), max_response(0) { if (_headers) @@ -38,16 +40,33 @@ public: int receive_header(void *ptr, size_t len); virtual int receive_data(void *ptr, size_t len); - int send_data(void *ptr, size_t len); + virtual int send_data(void *ptr, size_t len); bufferlist& get_response() { return response; } int execute(RGWAccessKey& key, const char *method, const char *resource); int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl); - - int put_obj(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *)); }; +class RGWRESTStreamRequest : public RGWRESTSimpleRequest { + Mutex lock; + list<bufferlist> pending_send; + void *handle; + RGWGetDataCB *cb; +public: + int add_output_data(bufferlist& bl); + int send_data(void *ptr, size_t len); + + RGWRESTStreamRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers, + list<pair<string, string> > *_params) : RGWRESTSimpleRequest(_cct, _url, _headers, _params), + lock("RGWRESTStreamRequest"), handle(NULL), cb(NULL) {} + ~RGWRESTStreamRequest(); + int put_obj_init(RGWAccessKey& key, rgw_obj& obj, uint64_t obj_size); + int complete(); + + RGWGetDataCB *get_out_cb() { return cb; } +}; + #endif diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index ddcad57f4e8..cba8548d3b4 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -36,11 +36,17 @@ int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_r list<pair<string, string> > params; params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid)); params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region)); - RGWRESTClient client(cct, url, NULL, ¶ms); - return client.forward_request(key, info, max_response, inbl, outbl); + RGWRESTSimpleRequest req(cct, url, NULL, ¶ms); + return req.forward_request(key, info, max_response, inbl, outbl); } -int RGWRegionConnection::put_obj(const string& uid, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *)) +class StreamObjData : public RGWGetDataCB { + rgw_obj obj; +public: + StreamObjData(rgw_obj& _obj) : obj(_obj) {} +}; + +int RGWRegionConnection::put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, RGWRESTStreamRequest **req) { string url; int ret = get_url(url); @@ -50,7 +56,14 @@ int RGWRegionConnection::put_obj(const string& uid, rgw_obj& obj, uint64_t obj_s list<pair<string, string> > params; params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid)); params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region)); - RGWRESTClient client(cct, url, NULL, ¶ms); - return client.put_obj(key, obj, obj_size, get_data); + *req = new RGWRESTStreamRequest(cct, url, NULL, ¶ms); + return (*req)->put_obj_init(key, obj, obj_size); } +int RGWRegionConnection::complete_request(RGWRESTStreamRequest *req) +{ + int ret = req->complete(); + delete req; + + return ret; +} diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 00bb286b130..1554e513db1 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -1,11 +1,12 @@ -#ifndef CEPH_RGW_REST_REQ_H -#define CEPH_RGW_REST_REQ_H +#ifndef CEPH_RGW_REST_CONN_H +#define CEPH_RGW_REST_CONN_H #include "rgw_rest_client.h" class CephContext; class RGWRados; class RGWRegion; +class RGWGetObjData; class RGWRegionConnection { @@ -19,8 +20,12 @@ public: RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream); int get_url(string& endpoint); + /* sync request */ int forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl); - int put_obj(const string& uid, rgw_obj& obj, uint64_t obj_size, void (*get_data)(uint64_t ofs, uint64_t len, bufferlist& bl, void *)); + + /* async request */ + int put_obj_init(const string& uid, rgw_obj& obj, uint64_t obj_size, RGWRESTStreamRequest **req); + int complete_request(RGWRESTStreamRequest *req); }; #endif |