summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-05-01 13:32:10 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-05-08 11:22:08 -0700
commit6659b2b6cf3b231045051cc4654fb3ecfc5d00df (patch)
tree1fde1b11568823db44be657272da59782e93999e
parent171b0bf267aaaf21b06008a58410ba209424585b (diff)
downloadceph-6659b2b6cf3b231045051cc4654fb3ecfc5d00df.tar.gz
rgw: data changes log, don't always send new requests
We may piggy back on older entries that hasn't expired yet. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_bucket.cc64
-rw-r--r--src/rgw/rgw_bucket.h22
2 files changed, 84 insertions, 2 deletions
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index 7d8a3b36e83..e6f985c45e4 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -1056,6 +1056,51 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
}
int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+ lock.Lock();
+ ChangeStatus *& status = changes[bucket.name];
+ if (!status) {
+ status = new ChangeStatus;
+ }
+
+ status->get();
+ lock.Unlock();
+
+ utime_t now = ceph_clock_now(cct);
+
+ status->lock->Lock();
+
+#warning FIXME delta config
+ if (now < status->cur_expiration) {
+ /* no need to send, recently completed */
+ status->lock->Unlock();
+ status->put();
+ return 0;
+ }
+
+ RefCountedCond *cond;
+
+ if (status->pending) {
+ cond = status->cond;
+
+ assert(cond);
+
+ status->cond->get();
+ status->lock->Unlock();
+ status->put();
+
+ cond->wait();
+ cond->put();
+#warning FIXME need to return actual status
+ return 0;
+ }
+
+ status->cond = new RefCountedCond;
+ status->pending = true;
+
+ status->cur_sent = now;
+
+ status->lock->Unlock();
+
string& oid = oids[choose_oid(bucket)];
utime_t ut = ceph_clock_now(cct);
@@ -1065,7 +1110,24 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
change.key = bucket.name;
::encode(change, bl);
string section;
- return store->time_log_add(oid, ut, section, change.key, bl);
+ int ret = store->time_log_add(oid, ut, section, change.key, bl);
+
+ status->lock->Lock();
+
+ cond = status->cond;
+
+ status->pending = false;
+ status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
+ status->cur_expiration += utime_t(5, 0);
+ status->cond = NULL;
+ status->lock->Unlock();
+
+ status->put();
+
+ cond->done();
+ cond->put();
+
+ return ret;
}
int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h
index ddca6122efc..7c4c3d3c6cb 100644
--- a/src/rgw/rgw_bucket.h
+++ b/src/rgw/rgw_bucket.h
@@ -256,9 +256,29 @@ class RGWDataChangesLog {
int num_shards;
string *oids;
+ Mutex lock;
+
+ struct ChangeStatus : public RefCountedObject {
+ utime_t cur_expiration;
+ utime_t cur_sent;
+ bool pending;
+ RefCountedCond *cond;
+ Mutex *lock;
+
+ ChangeStatus() : pending(false), cond(NULL) {
+ lock = new Mutex("RGWDataChangesLog::ChangeStatus");
+ }
+
+ ~ChangeStatus() {
+ delete lock;
+ }
+ };
+
+ map<string, ChangeStatus *> changes;
+
public:
- RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) {
+ RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog") {
num_shards = 128; /* FIXME */
oids = new string[num_shards];