summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-06-20 17:01:25 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-20 21:57:36 -0700
commit02de43ad731137179e750a318edf857eb67941e8 (patch)
tree2b4e7cca5936f11ad1b86518194c052214b199a1
parent234fab09025852ed57672032e715f0faa8f24b1a (diff)
downloadceph-02de43ad731137179e750a318edf857eb67941e8.tar.gz
rgw: tie opstate into intra-region copy operations
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/rgw/rgw_op.cc2
-rw-r--r--src/rgw/rgw_op.h2
-rw-r--r--src/rgw/rgw_rados.cc106
-rw-r--r--src/rgw/rgw_rados.h22
-rw-r--r--src/rgw/rgw_rest_s3.cc9
6 files changed, 119 insertions, 23 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index b48cfd5db1e..66582c0407e 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -603,6 +603,7 @@ OPTION(rgw_relaxed_s3_bucket_names, OPT_BOOL, false) // enable relaxed bucket na
OPTION(rgw_list_buckets_max_chunk, OPT_INT, 1000) // max buckets to retrieve in a single op when listing user buckets
OPTION(rgw_md_log_max_shards, OPT_INT, 64) // max shards for metadata log
OPTION(rgw_num_zone_opstate_shards, OPT_INT, 128) // max shards for keeping inter-region copy progress info
+OPTION(rgw_opstate_ratelimit_sec, OPT_INT, 30) // min time between opstate updates on a single upload (0 for disabling ratelimit)
OPTION(rgw_data_log_window, OPT_INT, 30) // data log entries window (in seconds)
OPTION(rgw_data_log_changes_size, OPT_INT, 1000) // number of in-memory entries to hold for data changes log
diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc
index dd32ccfeb24..1f2cb9f5452 100644
--- a/src/rgw/rgw_op.cc
+++ b/src/rgw/rgw_op.cc
@@ -1569,6 +1569,8 @@ void RGWCopyObj::execute()
ret = store->copy_obj(s->obj_ctx,
s->user.user_id,
+ client_id,
+ op_id,
&s->info,
source_zone,
dst_obj,
diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h
index 0b8462148b8..1632d35fcc8 100644
--- a/src/rgw/rgw_op.h
+++ b/src/rgw/rgw_op.h
@@ -420,6 +420,8 @@ protected:
RGWBucketInfo src_bucket_info;
RGWBucketInfo dest_bucket_info;
string source_zone;
+ string client_id;
+ string op_id;
int init_common();
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 055e7ad0283..9115c6b5e88 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -2194,14 +2194,26 @@ class RGWRadosPutObj : public RGWGetDataCB
{
rgw_obj obj;
RGWPutObjProcessor_Atomic *processor;
+ RGWOpStateSingleOp *opstate;
public:
- RGWRadosPutObj(RGWPutObjProcessor_Atomic *p) : processor(p) {}
+ RGWRadosPutObj(RGWPutObjProcessor_Atomic *p, RGWOpStateSingleOp *_ops) : processor(p), opstate(_ops) {}
int handle_data(bufferlist& bl, off_t ofs, off_t len) {
void *handle;
int ret = processor->handle_data(bl, ofs, &handle);
if (ret < 0)
return ret;
+ if (opstate) {
+ /* need to update opstate repository with new state. This is ratelimited, so we're not
+ * really doing it every time
+ */
+ ret = opstate->renew_state();
+ if (ret < 0) {
+ /* could not renew state! might have been marked as cancelled */
+ return ret;
+ }
+ }
+
ret = processor->throttle_data(handle);
if (ret < 0)
return ret;
@@ -2246,6 +2258,8 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs, map<string, buffe
*/
int RGWRados::copy_obj(void *ctx,
const string& user_id,
+ const string& client_id,
+ const string& op_id,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
@@ -2311,8 +2325,6 @@ int RGWRados::copy_obj(void *ctx,
if (ret < 0)
return ret;
- RGWRadosPutObj cb(&processor);
-
RGWRESTConn *conn;
if (source_zone.empty()) {
conn = rest_master_conn;
@@ -2324,39 +2336,62 @@ int RGWRados::copy_obj(void *ctx,
}
conn = iter->second;
}
-
- int ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req);
- if (ret < 0)
- return ret;
- string etag;
+ string obj_name = dest_obj.bucket.name + "/" + dest_obj.object;
+ RGWOpStateSingleOp opstate(this, client_id, op_id, obj_name);
+
+ int ret = opstate.set_state(RGWOpState::OPSTATE_IN_PROGRESS);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
+ return ret;
+ }
+ RGWRadosPutObj cb(&processor, &opstate);
+ string etag;
map<string, string> req_headers;
time_t set_mtime;
- ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers);
+
+ ret = conn->get_obj(user_id, info, src_obj, true, &cb, &in_stream_req);
if (ret < 0)
- return ret;
+ goto set_err_state;
- bufferlist& extra_data_bl = processor.get_extra_data();
- if (extra_data_bl.length()) {
- JSONParser jp;
- if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) {
- ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl;
- return -EINVAL;
- }
+ ret = conn->complete_request(in_stream_req, etag, &set_mtime, req_headers);
+ if (ret < 0)
+ goto set_err_state;
+
+ { /* opening scope so that we can do goto, sorry */
+ bufferlist& extra_data_bl = processor.get_extra_data();
+ if (extra_data_bl.length()) {
+ JSONParser jp;
+ if (!jp.parse(extra_data_bl.c_str(), extra_data_bl.length())) {
+ ldout(cct, 0) << "failed to parse response extra data. len=" << extra_data_bl.length() << " data=" << extra_data_bl.c_str() << dendl;
+ goto set_err_state;
+ }
- JSONDecoder::decode_json("attrs", src_attrs, &jp);
+ JSONDecoder::decode_json("attrs", src_attrs, &jp);
- src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
+ src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
+ }
}
set_copy_attrs(src_attrs, attrs, replace_attrs, !source_zone.empty());
ret = cb.complete(etag, mtime, set_mtime, src_attrs);
if (ret < 0)
- return ret;
+ goto set_err_state;
+
+ ret = opstate.set_state(RGWOpState::OPSTATE_COMPLETE);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to set opstate ret=" << ret << dendl;
+ }
return 0;
+set_err_state:
+ int r = opstate.set_state(RGWOpState::OPSTATE_ERROR);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl;
+ }
+ return ret;
}
set_copy_attrs(src_attrs, attrs, replace_attrs, false);
@@ -2392,9 +2427,10 @@ int RGWRados::copy_obj(void *ctx,
/* dest is in a different region, copy it there */
map<string, bufferlist> src_attrs;
+ string etag;
RGWRESTStreamWriteRequest *out_stream_req;
-
+
int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
if (ret < 0)
return ret;
@@ -2403,8 +2439,6 @@ int RGWRados::copy_obj(void *ctx,
if (ret < 0)
return ret;
- string etag;
-
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
if (ret < 0)
return ret;
@@ -5419,6 +5453,32 @@ int RGWOpState::renew_state(const string& client_id, const string& op_id, const
return store_entry(client_id, op_id, object, s, NULL, &s);
}
+RGWOpStateSingleOp::RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid,
+ const string& obj) : os(store), client_id(cid), op_id(oid), object(obj)
+{
+ cct = store->ctx();
+ cur_state = RGWOpState::OPSTATE_UNKNOWN;
+}
+
+int RGWOpStateSingleOp::set_state(RGWOpState::OpState state) {
+ last_update = ceph_clock_now(cct);
+ cur_state = state;
+ return os.set_state(client_id, op_id, object, state);
+}
+
+int RGWOpStateSingleOp::renew_state() {
+ utime_t now = ceph_clock_now(cct);
+
+ int rate_limit_sec = cct->_conf->rgw_opstate_ratelimit_sec;
+
+ if (rate_limit_sec && now - last_update < rate_limit_sec) {
+ return 0;
+ }
+
+ last_update = now;
+ return os.renew_state(client_id, op_id, object, cur_state);
+}
+
uint64_t RGWRados::instance_id()
{
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index b267ae2f86d..866736fc816 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -649,6 +649,26 @@ public:
int renew_state(const string& client_id, const string& op_id, const string& object, OpState state);
};
+class RGWOpStateSingleOp
+{
+ RGWOpState os;
+ string client_id;
+ string op_id;
+ string object;
+
+ CephContext *cct;
+
+ RGWOpState::OpState cur_state;
+ utime_t last_update;
+
+public:
+ RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, const string& obj);
+
+ int set_state(RGWOpState::OpState state);
+ int renew_state();
+};
+
+
class RGWRados
{
friend class RGWGC;
@@ -987,6 +1007,8 @@ public:
*/
virtual int copy_obj(void *ctx,
const string& user_id,
+ const string& client_id,
+ const string& op_id,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc
index 35b106cff19..bfb23f0be38 100644
--- a/src/rgw/rgw_rest_s3.cc
+++ b/src/rgw/rgw_rest_s3.cc
@@ -1256,6 +1256,15 @@ int RGWCopyObj_ObjStore_S3::get_params()
if (s->system_request) {
source_zone = s->info.args.get(RGW_SYS_PARAM_PREFIX "source-zone");
+ if (!source_zone.empty()) {
+ client_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "client-id");
+ op_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "op-id");
+
+ if (client_id.empty() || op_id.empty()) {
+ ldout(s->cct, 0) << RGW_SYS_PARAM_PREFIX "client-id or " RGW_SYS_PARAM_PREFIX "op-id were not provided, required for intra-region copy" << dendl;
+ return -EINVAL;
+ }
+ }
}
const char *md_directive = s->info.env->get("HTTP_X_AMZ_METADATA_DIRECTIVE");