summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-05-06 14:41:10 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-05-06 14:41:10 -0700
commit9377ca6df8f88237b0c70701789e79b06fd23ba7 (patch)
treeb025a145b0ed4815edc4c9406ffb56b9cbe99310
parentfff440ba53e01732806f0ea6a6d62d028787454e (diff)
downloadceph-wip-rgw-bucketlog-2.tar.gz
rgw: resend data log entry if took too longwip-rgw-bucketlog-2
If took too long, we want to resend. Also, fix issue with renew. Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_bucket.cc41
1 files changed, 29 insertions, 12 deletions
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index acdd594f0c9..85bd51e5d37 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -1023,6 +1023,7 @@ int RGWDataChangesLog::renew_entries()
store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
+ m[index].first.push_back(bucket.name);
m[index].second.push_back(entry);
}
@@ -1074,6 +1075,7 @@ void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
ChangeStatusPtr status;
_get_change(bucket_name, status);
+ ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl;
status->cur_expiration = expiration;
}
@@ -1089,6 +1091,8 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
status->lock->Lock();
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+
if (now < status->cur_expiration) {
/* no need to send, recently completed */
status->lock->Unlock();
@@ -1118,22 +1122,35 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
status->cond = new RefCountedCond;
status->pending = true;
- status->cur_sent = now;
+ string& oid = oids[choose_oid(bucket)];
+ utime_t expiration;
- status->lock->Unlock();
+ int ret;
+
+ do {
+ status->cur_sent = now;
+
+ expiration = now;
+ expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
+
+ status->lock->Unlock();
- string& oid = oids[choose_oid(bucket)];
+ bufferlist bl;
+ rgw_data_change change;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bucket.name;
+ ::encode(change, bl);
+ string section;
- utime_t ut = ceph_clock_now(cct);
- bufferlist bl;
- rgw_data_change change;
- change.entity_type = ENTITY_TYPE_BUCKET;
- change.key = bucket.name;
- ::encode(change, bl);
- string section;
- int ret = store->time_log_add(oid, ut, section, change.key, bl);
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
- status->lock->Lock();
+ ret = store->time_log_add(oid, now, section, change.key, bl);
+
+ now = ceph_clock_now(cct);
+
+ status->lock->Lock();
+
+ } while (!ret && ceph_clock_now(cct) > expiration);
cond = status->cond;