diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-20 17:01:25 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-20 21:57:36 -0700 |
commit | 02de43ad731137179e750a318edf857eb67941e8 (patch) | |
tree | 2b4e7cca5936f11ad1b86518194c052214b199a1 | |
parent | 234fab09025852ed57672032e715f0faa8f24b1a (diff) | |
download | ceph-02de43ad731137179e750a318edf857eb67941e8.tar.gz |
rgw: tie opstate into intra-region copy operations
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/common/config_opts.h | 1 | ||||
-rw-r--r-- | src/rgw/rgw_op.cc | 2 | ||||
-rw-r--r-- | src/rgw/rgw_op.h | 2 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 106 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 22 | ||||
-rw-r--r-- | src/rgw/rgw_rest_s3.cc | 9 |
6 files changed, 119 insertions, 23 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index b48cfd5db1e..66582c0407e 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -603,6 +603,7 @@ OPTION(rgw_relaxed_s3_bucket_names, OPT_BOOL, false) // enable relaxed bucket na OPTION(rgw_list_buckets_max_chunk, OPT_INT, 1000) // max buckets to retrieve in a single op when listing user buckets OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log OPTION(rgw_num_zone_opstate_shards, OPT_INT, 128) // max shards for keeping inter-region copy progress info +OPTION(rgw_opstate_ratelimit_sec, OPT_INT, 30) // min time between opstate updates on a single upload (0 for disabling ratelimit) OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds) OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index dd32ccfeb24..1f2cb9f5452 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1569,6 +1569,8 @@ void RGWCopyObj::execute() ret = store->copy_obj(s->obj_ctx, s->user.user_id, + client_id, + op_id, &s->info, source_zone, dst_obj, diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 0b8462148b8..1632d35fcc8 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -420,6 +420,8 @@ protected: RGWBucketInfo src_bucket_info; RGWBucketInfo dest_bucket_info; string source_zone; + string client_id; + string op_id; int init_common(); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 055e7ad0283..9115c6b5e88 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2194,14 +2194,26 @@ class RGWRadosPutObj : public RGWGetDataCB { rgw_obj obj; RGWPutObjProcessor_Atomic *processor; + RGWOpStateSingleOp *opstate; public: - RGWRadosPutObj(RGWPutObjProcessor_Atomic *p) : processor(p) {} + RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops) : processor(p), opstate(_ops) {} int handle_data(bufferlist& bl, off_t ofs, off_t len) { void *handle; int ret = processor->handle_data(bl, ofs, &handle); if (ret < 0) return ret; + if (opstate) { + /* need to update opstate repository with new state. This is ratelimited, so we're not + * really doing it every time + */ + ret = opstate->renew_state(); + if (ret < 0) { + /* could not renew state! might have been marked as cancelled */ + return ret; + } + } + ret = processor->throttle_data(handle); if (ret < 0) return ret; @@ -2246,6 +2258,8 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs, map<string, buffe */ int RGWRados::copy_obj(void *ctx, const string& user_id, + const string& client_id, + const string& op_id, req_info *info, const string& source_zone, rgw_obj& dest_obj, @@ -2311,8 +2325,6 @@ int RGWRados::copy_obj(void *ctx, if (ret < 0) return ret; - RGWRadosPutObj cb(&processor); - RGWRESTConn *conn; if (source_zone.empty()) { conn = rest_master_conn; @@ -2324,39 +2336,62 @@ int RGWRados::copy_obj(void *ctx, } conn = iter->second; } - - int ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req); - if (ret < 0) - return ret; - string etag; + string obj_name = dest_obj.bucket.name + "/" + dest_obj.object; + RGWOpStateSingleOp opstate(this, client_id, op_id, obj_name); + + int ret = opstate.set_state(RGWOpState::OPSTATE_IN_PROGRESS); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; + return ret; + } + RGWRadosPutObj cb(&processor, &opstate); + string etag; map<string, string> req_headers; time_t set_mtime; - ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers); + + ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req); if (ret < 0) - return ret; + goto set_err_state; - bufferlist& extra_data_bl = processor.get_extra_data(); - if (extra_data_bl.length()) { - JSONParser jp; - if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) { - ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl; - return -EINVAL; - } + ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers); + if (ret < 0) + goto set_err_state; + + { /* opening scope so that we can do goto, sorry */ + bufferlist& extra_data_bl = processor.get_extra_data(); + if (extra_data_bl.length()) { + JSONParser jp; + if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) { + ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl; + goto set_err_state; + } - JSONDecoder::decode_json("attrs", src_attrs, &jp); + JSONDecoder::decode_json("attrs", src_attrs, &jp); - src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout + src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout + } } set_copy_attrs(src_attrs, attrs, replace_attrs, !source_zone.empty()); ret = cb.complete(etag, mtime, set_mtime, src_attrs); if (ret < 0) - return ret; + goto set_err_state; + + ret = opstate.set_state(RGWOpState::OPSTATE_COMPLETE); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl; + } return 0; +set_err_state: + int r = opstate.set_state(RGWOpState::OPSTATE_ERROR); + if (r < 0) { + ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl; + } + return ret; } set_copy_attrs(src_attrs, attrs, replace_attrs, false); @@ -2392,9 +2427,10 @@ int RGWRados::copy_obj(void *ctx, /* dest is in a different region, copy it there */ map<string, bufferlist> src_attrs; + string etag; RGWRESTStreamWriteRequest *out_stream_req; - + int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req); if (ret < 0) return ret; @@ -2403,8 +2439,6 @@ int RGWRados::copy_obj(void *ctx, if (ret < 0) return ret; - string etag; - ret = rest_master_conn->complete_request(out_stream_req, etag, mtime); if (ret < 0) return ret; @@ -5419,6 +5453,32 @@ int RGWOpState::renew_state(const string& client_id, const string& op_id, const return store_entry(client_id, op_id, object, s, NULL, &s); } +RGWOpStateSingleOp::RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, + const string& obj) : os(store), client_id(cid), op_id(oid), object(obj) +{ + cct = store->ctx(); + cur_state = RGWOpState::OPSTATE_UNKNOWN; +} + +int RGWOpStateSingleOp::set_state(RGWOpState::OpState state) { + last_update = ceph_clock_now(cct); + cur_state = state; + return os.set_state(client_id, op_id, object, state); +} + +int RGWOpStateSingleOp::renew_state() { + utime_t now = ceph_clock_now(cct); + + int rate_limit_sec = cct->_conf->rgw_opstate_ratelimit_sec; + + if (rate_limit_sec && now - last_update < rate_limit_sec) { + return 0; + } + + last_update = now; + return os.renew_state(client_id, op_id, object, cur_state); +} + uint64_t RGWRados::instance_id() { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index b267ae2f86d..866736fc816 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -649,6 +649,26 @@ public: int renew_state(const string& client_id, const string& op_id, const string& object, OpState state); }; +class RGWOpStateSingleOp +{ + RGWOpState os; + string client_id; + string op_id; + string object; + + CephContext *cct; + + RGWOpState::OpState cur_state; + utime_t last_update; + +public: + RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, const string& obj); + + int set_state(RGWOpState::OpState state); + int renew_state(); +}; + + class RGWRados { friend class RGWGC; @@ -987,6 +1007,8 @@ public: */ virtual int copy_obj(void *ctx, const string& user_id, + const string& client_id, + const string& op_id, req_info *info, const string& source_zone, rgw_obj& dest_obj, diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 35b106cff19..bfb23f0be38 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -1256,6 +1256,15 @@ int RGWCopyObj_ObjStore_S3::get_params() if (s->system_request) { source_zone = s->info.args.get(RGW_SYS_PARAM_PREFIX "source-zone"); + if (!source_zone.empty()) { + client_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "client-id"); + op_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "op-id"); + + if (client_id.empty() || op_id.empty()) { + ldout(s->cct, 0) << RGW_SYS_PARAM_PREFIX "client-id or " RGW_SYS_PARAM_PREFIX "op-id were not provided, required for intra-region copy" << dendl; + return -EINVAL; + } + } } const char *md_directive = s->info.env->get("HTTP_X_AMZ_METADATA_DIRECTIVE"); |