diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-01 21:34:38 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-05-01 21:34:38 -0700 |
commit | 2984ff4bc4c990e53ff4c66816c948d0d9d2298f (patch) | |
tree | 365c7f581c3e2608c9e03db289ecd70a65dd2a84 | |
parent | e15b3639589edf82ceced597f4d71940a28825b1 (diff) | |
download | ceph-2984ff4bc4c990e53ff4c66816c948d0d9d2298f.tar.gz |
rgw: changed data log renew thread
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r-- | src/cls/log/cls_log.cc | 17 | ||||
-rw-r--r-- | src/cls/log/cls_log_client.cc | 22 | ||||
-rw-r--r-- | src/cls/log/cls_log_client.h | 4 | ||||
-rw-r--r-- | src/cls/log/cls_log_ops.h | 6 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.cc | 177 | ||||
-rw-r--r-- | src/rgw/rgw_bucket.h | 28 | ||||
-rw-r--r-- | src/rgw/rgw_rados.cc | 30 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 2 |
8 files changed, 170 insertions, 116 deletions
diff --git a/src/cls/log/cls_log.cc b/src/cls/log/cls_log.cc index c2d50fb6a34..ac5efc4f0b5 100644 --- a/src/cls/log/cls_log.cc +++ b/src/cls/log/cls_log.cc @@ -70,17 +70,20 @@ static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *ou return -EINVAL; } - cls_log_entry& entry = op.entry; + for (list<cls_log_entry>::iterator iter = op.entries.begin(); + iter != op.entries.end(); ++iter) { + cls_log_entry& entry = *iter; - string index; + string index; - get_index(hctx, entry.timestamp, index); + get_index(hctx, entry.timestamp, index); - CLS_LOG(0, "storing entry at %s", index.c_str()); + CLS_LOG(0, "storing entry at %s", index.c_str()); - int ret = write_log_entry(hctx, index, entry); - if (ret < 0) - return ret; + int ret = write_log_entry(hctx, index, entry); + if (ret < 0) + return ret; + } return 0; } diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc index d1c199ba263..c551f407358 100644 --- a/src/cls/log/cls_log_client.cc +++ b/src/cls/log/cls_log_client.cc @@ -9,25 +9,39 @@ using namespace librados; +void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entries) +{ + bufferlist in; + cls_log_add_op call; + call.entries = entries; + ::encode(call, in); + op.exec("log", "add", in); +} + void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry) { bufferlist in; cls_log_add_op call; - call.entry = entry; + call.entries.push_back(entry); ::encode(call, in); op.exec("log", "add", in); } -void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, +void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, const string& section, const string& name, bufferlist& bl) { - cls_log_entry entry; - entry.timestamp = timestamp; entry.section = section; entry.name = name; entry.data = bl; +} + +void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, + const string& section, const string& name, bufferlist& bl) +{ + cls_log_entry entry; + cls_log_add_prepare_entry(entry, timestamp, section, name, bl); cls_log_add(op, entry); } diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h index 6c0046b26f2..4171adbda11 100644 --- a/src/cls/log/cls_log_client.h +++ b/src/cls/log/cls_log_client.h @@ -9,6 +9,10 @@ * log objclass */ +void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, + const string& section, const string& name, bufferlist& bl); + +void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entry); void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, const string& section, const string& name, bufferlist& bl); diff --git a/src/cls/log/cls_log_ops.h b/src/cls/log/cls_log_ops.h index 6dc457ed5fd..6a5366f5a66 100644 --- a/src/cls/log/cls_log_ops.h +++ b/src/cls/log/cls_log_ops.h @@ -8,19 +8,19 @@ #include "cls_log_types.h" struct cls_log_add_op { - cls_log_entry entry; + list<cls_log_entry> entries; cls_log_add_op() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - ::encode(entry, bl); + ::encode(entries, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { DECODE_START(1, bl); - ::decode(entry, bl); + ::decode(entries, bl); DECODE_FINISH(bl); } }; diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index ac2367dce00..89e63791442 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -973,105 +973,6 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state, } -#if 0 - -class CompletionMap { - map<rgw_bucket, RefCountedCond *> entries; - Mutex lock; - -public: - - void add(string& s) { - Mutex::Locker l(lock); - - entries[s] = new RefCountedObject; - } - - - bool wait(string& s) { - map<string, RefCountedCond *>::iterator iter; - l.Lock(); - iter = entries.find(s); - if (iter == entries.end()) { - l.Unlock(); - return false; - } - - RefCountedCond *rcc = iter->second; - rcc->get(); - l.Unlock(); - - rcc->wait(); - rcc->put(); - - return true; - - } - - void complete(string& s) { - lock.Lock(); - - map<string, RefCountedCond *>::iterator iter = entries.find(s); - if (iter == entries.end()) { - lock.Unlock(); - return; - } - - RefCountedCond *rcc = iter->second; - - entries.erase(iter); - - lock.Unlock(); - - rcc->complete(); - rcc->put(); - } - -}; - - -class RGWChangedBucketsTracker { - CephContext *cct; - RGWRados *store; - - map<rgw_bucket, utime_t> last_reported; - - struct PendingInfo : public RefCountedCond { - PendingInfo() {} - }; - - CompletionMap pending; - - Mutex lock; -public: - RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {} - - int report_bucket_changed(rgw_bucket& bucket) { - lock.Lock(); - - map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket); - - bool exists = (iter != iter.end()); - if (exists) { - utime_t& t = iter->second; - utime_t now = ceph_clock_now(cct); - - if (now > t + get_resolution_sec()) - exists = false; - } - - lock.Unlock(); - - if (exists) - return true; - } - - uint32_t get_resolution_sec(); -}; - - -#endif - void rgw_data_change::dump(Formatter *f) const { string type; @@ -1094,6 +995,44 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { return (int)r; } +int RGWDataChangesLog::renew_entries() +{ + map<int, list<cls_log_entry> > m; + + map<string, rgw_bucket>::iterator iter; + string section; + utime_t ut = ceph_clock_now(cct); + for (iter = cur_cycle.begin(); iter != cur_cycle.end(); ++iter) { + rgw_bucket& bucket = iter->second; + int index = choose_oid(bucket); + + cls_log_entry entry; + + rgw_data_change change; + bufferlist bl; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bucket.name; + ::encode(change, bl); + + store->time_log_prepare_entry(entry, ut, section, bucket.name, bl); + + m[index].push_back(entry); + } + + map<int, list<cls_log_entry> >::iterator miter; + for (miter = m.begin(); miter != m.end(); ++miter) { + list<cls_log_entry>& entries = miter->second; + + int ret = store->time_log_add(oids[miter->first], entries); + if (ret < 0) { + lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl; + return ret; + } + } + + return 0; +} + int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { lock.Lock(); @@ -1103,6 +1042,8 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { changes.add(bucket.name, status); } + cur_cycle[bucket.name] = bucket; + lock.Unlock(); utime_t now = ceph_clock_now(cct); @@ -1218,3 +1159,41 @@ int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int return 0; } + +bool RGWDataChangesLog::going_down() +{ + return (down_flag.read() != 0); +} + +RGWDataChangesLog::~RGWDataChangesLog() { + down_flag.set(1); + renew_thread->stop(); + renew_thread->join(); + delete[] oids; +} + +void *RGWDataChangesLog::ChangesRenewThread::entry() { + do { + dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl; + int r = log->renew_entries(); + if (r < 0) { + dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl; + } + + if (log->going_down()) + break; + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(20, 0)); + lock.Unlock(); + } while (!log->going_down()); + + return NULL; +} + +void RGWDataChangesLog::ChangesRenewThread::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 95b10a1c34d..6868a1bca46 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -263,6 +263,8 @@ class RGWDataChangesLog { Mutex lock; + atomic_t down_flag; + struct ChangeStatus { utime_t cur_expiration; utime_t cur_sent; @@ -283,6 +285,22 @@ class RGWDataChangesLog { lru_map<string, ChangeStatusPtr> changes; + map<string, rgw_bucket> cur_cycle; + + class ChangesRenewThread : public Thread { + CephContext *cct; + RGWDataChangesLog *log; + Mutex lock; + Cond cond; + + public: + ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread") {} + void *entry(); + void stop(); + }; + + ChangesRenewThread *renew_thread; + public: RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog"), @@ -297,14 +315,16 @@ public: snprintf(buf, sizeof(buf), "%s.%d", prefix, i); oids[i] = buf; } - } - ~RGWDataChangesLog() { - delete[] oids; + renew_thread = new ChangesRenewThread(cct, this); + renew_thread->create(); } + ~RGWDataChangesLog(); + int choose_oid(rgw_bucket& bucket); int add_entry(rgw_bucket& bucket); + int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, list<rgw_data_change>& entries, string& marker, bool *truncated); @@ -316,6 +336,8 @@ public: }; int list_entries(utime_t& start_time, utime_t& end_time, int max_entries, list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated); + + bool going_down(); }; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4adf1390c93..25b39ffc6b4 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1096,6 +1096,11 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, string& sec name = prefix + buf; } +void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl) +{ + cls_log_add_prepare_entry(entry, ut, section, key, bl); +} + int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl) { librados::IoCtx io_ctx; @@ -1121,6 +1126,31 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section return r; } +int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries) +{ + librados::IoCtx io_ctx; + + const char *log_pool = zone.log_pool.name.c_str(); + int r = rados->ioctx_create(log_pool, io_ctx); + if (r == -ENOENT) { + rgw_bucket pool(log_pool); + r = create_pool(pool); + if (r < 0) + return r; + + // retry + r = rados->ioctx_create(log_pool, io_ctx); + } + if (r < 0) + return r; + + ObjectWriteOperation op; + cls_log_add(op, entries); + + r = io_ctx.operate(oid, &op); + return r; +} + int RGWRados::time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 1d062a63daf..7a8add427ad 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -985,6 +985,8 @@ public: void shard_name(const string& prefix, unsigned max_shards, string& key, string& name); void shard_name(const string& prefix, unsigned max_shards, string& section, string& key, string& name); + void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl); + int time_log_add(const string& oid, list<cls_log_entry>& entries); int time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated); |