summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-10-07 20:22:51 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-10-07 20:22:51 -0700
commit0f3b794d188850800fd41fa2ef29045175b451a2 (patch)
tree04ac9f69eab4b944fa18527a1ae2ff2a95d674a8
parent10a1e9b371a02d69e617677b80ed34c264c2d2dc (diff)
downloadceph-wip-rgw-quota.tar.gz
rgw: async quota updatewip-rgw-quota
Not done yet. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/cls/rgw/cls_rgw_client.cc38
-rw-r--r--src/cls/rgw/cls_rgw_client.h8
-rw-r--r--src/rgw/rgw_quota.cc107
-rw-r--r--src/rgw/rgw_quota.h7
-rw-r--r--src/rgw/rgw_rados.cc37
-rw-r--r--src/rgw/rgw_rados.h25
6 files changed, 214 insertions, 8 deletions
diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc
index 165ca437987..36caf19b4fe 100644
--- a/src/cls/rgw/cls_rgw_client.cc
+++ b/src/cls/rgw/cls_rgw_client.cc
@@ -157,6 +157,44 @@ int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *he
return r;
}
+class GetDirHeaderCompletion : public ObjectOperationCompletion {
+ RGWGetDirHeader_CB *ret_ctx;
+public:
+ GetDirHeaderCompletion(RGWGetDirHeader_CB *_ctx) : ret_ctx(_ctx) {}
+ ~GetDirHeaderCompletion() {
+ ret_ctx->put();
+ }
+ void handle_completion(int r, bufferlist& outbl) {
+ struct rgw_cls_list_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ r = -EIO;
+ }
+
+ ret_ctx->handle_response(r, ret.dir.header);
+ };
+};
+
+int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx)
+{
+ bufferlist in, out;
+ struct rgw_cls_list_op call;
+ call.num_entries = 0;
+ ::encode(call, in);
+ ObjectWriteOperation op;
+ GetDirHeaderCompletion *cb = new GetDirHeaderCompletion(ctx);
+ op.exec("rgw", "bucket_list", in, cb);
+ AioCompletion *c = IoCtx::create_completion(NULL, NULL, NULL);
+ int r = io_ctx.aio_operate(oid, c, &op);
+ c->release();
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
list<rgw_bi_log_entry>& entries, bool *truncated)
{
diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h
index 2ea5d9ca771..39bb3c9fc4a 100644
--- a/src/cls/rgw/cls_rgw_client.h
+++ b/src/cls/rgw/cls_rgw_client.h
@@ -4,6 +4,13 @@
#include "include/types.h"
#include "include/rados/librados.hpp"
#include "cls_rgw_types.h"
+#include "common/RefCountedObj.h"
+
+class RGWGetDirHeader_CB : public RefCountedObject {
+public:
+ virtual ~RGWGetDirHeader_CB() {}
+ virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
+};
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
@@ -27,6 +34,7 @@ int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
+int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc
index 70ab0762ce6..1f406a2c82c 100644
--- a/src/rgw/rgw_quota.cc
+++ b/src/rgw/rgw_quota.cc
@@ -26,6 +26,9 @@ public:
int get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats);
void adjust_bucket_stats(rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
+
+ int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs);
+ void async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats);
};
int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& stats)
@@ -42,6 +45,8 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats&
return r;
}
+ stats = RGWBucketStats();
+
map<RGWObjCategory, RGWBucketStats>::iterator iter;
for (iter = bucket_stats.begin(); iter != bucket_stats.end(); ++iter) {
RGWBucketStats& s = iter->second;
@@ -53,23 +58,115 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats&
return 0;
}
+class AsyncRefreshHandler : public RGWGetBucketStats_CB {
+ RGWRados *store;
+ RGWBucketStatsCache *cache;
+public:
+ AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), cache(_cache) {}
+
+ int init_fetch();
+
+ void handle_response(int r);
+};
+
+
+int AsyncRefreshHandler::init_fetch()
+{
+ map<RGWObjCategory, RGWBucketStats> bucket_stats;
+ int r = store->get_bucket_stats_async(bucket, this);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
+
+ /* get_bucket_stats_async() dropped our reference already */
+ return r;
+ }
+
+ return 0;
+}
+
+void AsyncRefreshHandler::handle_response(int r)
+{
+ if (r < 0)
+ return; /* nothing to do here */
+
+ RGWBucketStats bs;
+
+ map<RGWObjCategory, RGWBucketStats>::iterator iter;
+ for (iter = stats->begin(); iter != stats->end(); ++iter) {
+ RGWBucketStats& s = iter->second;
+ bs.num_kb += s.num_kb;
+ bs.num_kb_rounded += s.num_kb_rounded;
+ bs.num_objects += s.num_objects;
+ }
+
+ cache->async_refresh_response(bucket, bs);
+}
+
+int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs)
+{
+ if (qs.async_update_flag.inc() != 1) { /* are we the first one here? */
+ qs.async_update_flag.dec();
+ return 0;
+ }
+
+ AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket);
+
+ async_refcount.get();
+
+ int ret = handler->init_fetch();
+ if (ret < 0) {
+ async_refcount.put();
+ handler->put();
+ return ret;
+ }
+
+ return 0;
+}
+
+void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats)
+{
+ RGWQuotaBucketStats qs;
+
+ stats_map.find(bucket, qs);
+
+ set_stats(bucket, qs, stats);
+ async_refcount.put();
+}
+
+void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWBucketStats& stats)
+{
+ qs.stats = stats;
+ qs.expiration = ceph_clock_now(store->ctx());
+ qs.async_refresh_time = qs.expiration;
+ qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
+ qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
+
+ stats_map.add(bucket, qs);
+}
+
int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats) {
RGWQuotaBucketStats qs;
+ utime_t now = ceph_clock_now(store->ctx());
if (stats_map.find(bucket, qs)) {
+ if (now >= qs.async_refresh_time) {
+ int r = async_refresh(bucket);
+ if (r < 0) {
+ ldout(s->ctx(), 0) << "ERROR: quota async refresh returned ret=" << ret << dendl;
+
+ /* continue processing, might be a transient error, async refresh is just optimization */
+ }
+ }
if (qs.expiration > ceph_clock_now(store->ctx())) {
stats = qs.stats;
return 0;
}
}
- int ret = fetch_bucket_totals(bucket, qs.stats);
+ int ret = fetch_bucket_totals(bucket, stats);
if (ret < 0 && ret != -ENOENT)
return ret;
- qs.expiration = ceph_clock_now(store->ctx());
- qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
-
- stats_map.add(bucket, qs);
+ set_stats(bucket, qs, stats);
return 0;
}
diff --git a/src/rgw/rgw_quota.h b/src/rgw/rgw_quota.h
index 39cfc62de55..6cdc21c52a5 100644
--- a/src/rgw/rgw_quota.h
+++ b/src/rgw/rgw_quota.h
@@ -3,7 +3,9 @@
#include "include/utime.h"
+#include "include/atomic.h"
#include "common/lru_map.h"
+#include "common/RefCountedObj.h"
class RGWRados;
class JSONObj;
@@ -40,9 +42,12 @@ WRITE_CLASS_ENCODER(RGWQuotaInfo)
class rgw_bucket;
class RGWQuotaHandler {
+ RefCountedWaitObject async_refcount;
public:
RGWQuotaHandler() {}
- virtual ~RGWQuotaHandler() {}
+ virtual ~RGWQuotaHandler() {
+ async_refcount.put_wait(); /* wait for all pending async requests to complete */
+ }
virtual int check_quota(rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
uint64_t num_objs, uint64_t size) = 0;
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 8035e0589de..06a8113af44 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -4614,6 +4614,39 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_
return 0;
}
+class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
+ RGWGetBucketStats_CB *cb;
+
+public:
+ RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
+
+ void handle_response(int r, rgw_bucket_dir_header& header) {
+ map<RGWObjCategory, RGWBucketStats> stats;
+
+ if (r >= 0) {
+ translate_raw_stats(header, stats);
+ cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
+ }
+
+ cb->handle_response(r);
+
+ cb->put();
+ }
+};
+
+int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
+{
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
+ int r = cls_bucket_head_async(bucket, get_ctx);
+ if (r < 0) {
+ ctx->put();
+ delete get_ctx;
+ return r;
+ }
+
+ return 0;
+}
+
void RGWRados::get_bucket_instance_entry(rgw_bucket& bucket, string& entry)
{
entry = bucket.name + ":" + bucket.bucket_id;
@@ -5481,7 +5514,7 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx,
return 0;
}
-int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
{
librados::IoCtx index_ctx;
string oid;
@@ -5489,7 +5522,7 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header&
if (r < 0)
return r;
- r = cls_rgw_get_dir_header(index_ctx, oid, &header);
+ r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
if (r < 0)
return r;
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index a23f90f1f23..52b898123d4 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -761,6 +761,29 @@ public:
int renew_state();
};
+class RGWGetBucketStats_CB : public RefCountedObject {
+protected:
+ rgw_bucket bucket;
+ uint64_t bucket_ver;
+ uint64_t master_ver;
+ map<RGWObjCategory, RGWBucketStats> *stats;
+ string max_marker;
+public:
+ RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
+ virtual ~RGWGetBucketStats_CB() {}
+ virtual void handle_response(int r) = 0;
+ virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
+ map<RGWObjCategory, RGWBucketStats> *_stats,
+ const string &_max_marker) {
+ bucket_ver = _bucket_ver;
+ master_ver = _master_ver;
+ stats = _stats;
+ max_marker = _max_marker;
+ }
+};
+
+class RGWGetDirHeader_CB;
+
class RGWRados
{
@@ -1295,6 +1318,7 @@ public:
int decode_policy(bufferlist& bl, ACLOwner *owner);
int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWBucketStats>& stats,
string *max_marker);
+ int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
void get_bucket_instance_obj(rgw_bucket& bucket, rgw_obj& obj);
void get_bucket_instance_entry(rgw_bucket& bucket, string& entry);
void get_bucket_meta_oid(rgw_bucket& bucket, string& oid);
@@ -1326,6 +1350,7 @@ public:
map<string, RGWObjEnt>& m, bool *is_truncated,
string *last_entry, bool (*force_check_filter)(const string& name) = NULL);
int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
+ int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
RGWModifyOp op, rgw_obj& oid, string& tag);
int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,