summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-05-01 13:32:10 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-05-01 13:33:33 -0700
commitfc48937cdcf9adc6cde7d7d17022741efa93787f (patch)
treec921ed682058446b8ecbc08e77ddb34f764391b1
parent53fb6832b656da67d66e83f5dfa2327d74aee9b6 (diff)
downloadceph-fc48937cdcf9adc6cde7d7d17022741efa93787f.tar.gz
rgw: data changes log, don't always send new requests
We may piggy back on older entries that hasn't expired yet. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_bucket.cc64
-rw-r--r--src/rgw/rgw_bucket.h22
2 files changed, 84 insertions, 2 deletions
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index 00f52550aae..13920e9e87f 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -1095,6 +1095,51 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
}
int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+ lock.Lock();
+ ChangeStatus *& status = changes[bucket.name];
+ if (!status) {
+ status = new ChangeStatus;
+ }
+
+ status->get();
+ lock.Unlock();
+
+ utime_t now = ceph_clock_now(cct);
+
+ status->lock->Lock();
+
+#warning FIXME delta config
+ if (now < status->cur_expiration) {
+ /* no need to send, recently completed */
+ status->lock->Unlock();
+ status->put();
+ return 0;
+ }
+
+ RefCountedCond *cond;
+
+ if (status->pending) {
+ cond = status->cond;
+
+ assert(cond);
+
+ status->cond->get();
+ status->lock->Unlock();
+ status->put();
+
+ cond->wait();
+ cond->put();
+#warning FIXME need to return actual status
+ return 0;
+ }
+
+ status->cond = new RefCountedCond;
+ status->pending = true;
+
+ status->cur_sent = now;
+
+ status->lock->Unlock();
+
string& oid = oids[choose_oid(bucket)];
utime_t ut = ceph_clock_now(cct);
@@ -1104,7 +1149,24 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
change.key = bucket.name;
::encode(change, bl);
string section;
- return store->time_log_add(oid, ut, section, change.key, bl);
+ int ret = store->time_log_add(oid, ut, section, change.key, bl);
+
+ status->lock->Lock();
+
+ cond = status->cond;
+
+ status->pending = false;
+ status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
+ status->cur_expiration += utime_t(5, 0);
+ status->cond = NULL;
+ status->lock->Unlock();
+
+ status->put();
+
+ cond->done();
+ cond->put();
+
+ return ret;
}
int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h
index d3e9f968c6c..2ac0144409c 100644
--- a/src/rgw/rgw_bucket.h
+++ b/src/rgw/rgw_bucket.h
@@ -259,9 +259,29 @@ class RGWDataChangesLog {
int num_shards;
string *oids;
+ Mutex lock;
+
+ struct ChangeStatus : public RefCountedObject {
+ utime_t cur_expiration;
+ utime_t cur_sent;
+ bool pending;
+ RefCountedCond *cond;
+ Mutex *lock;
+
+ ChangeStatus() : pending(false), cond(NULL) {
+ lock = new Mutex("RGWDataChangesLog::ChangeStatus");
+ }
+
+ ~ChangeStatus() {
+ delete lock;
+ }
+ };
+
+ map<string, ChangeStatus *> changes;
+
public:
- RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) {
+ RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog") {
num_shards = 128; /* FIXME */
oids = new string[num_shards];