summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-05-01 21:34:38 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-05-01 21:34:38 -0700
commit2984ff4bc4c990e53ff4c66816c948d0d9d2298f (patch)
tree365c7f581c3e2608c9e03db289ecd70a65dd2a84
parente15b3639589edf82ceced597f4d71940a28825b1 (diff)
downloadceph-2984ff4bc4c990e53ff4c66816c948d0d9d2298f.tar.gz
rgw: changed data log renew thread
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/cls/log/cls_log.cc17
-rw-r--r--src/cls/log/cls_log_client.cc22
-rw-r--r--src/cls/log/cls_log_client.h4
-rw-r--r--src/cls/log/cls_log_ops.h6
-rw-r--r--src/rgw/rgw_bucket.cc177
-rw-r--r--src/rgw/rgw_bucket.h28
-rw-r--r--src/rgw/rgw_rados.cc30
-rw-r--r--src/rgw/rgw_rados.h2
8 files changed, 170 insertions, 116 deletions
diff --git a/src/cls/log/cls_log.cc b/src/cls/log/cls_log.cc
index c2d50fb6a34..ac5efc4f0b5 100644
--- a/src/cls/log/cls_log.cc
+++ b/src/cls/log/cls_log.cc
@@ -70,17 +70,20 @@ static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *ou
return -EINVAL;
}
- cls_log_entry& entry = op.entry;
+ for (list<cls_log_entry>::iterator iter = op.entries.begin();
+ iter != op.entries.end(); ++iter) {
+ cls_log_entry& entry = *iter;
- string index;
+ string index;
- get_index(hctx, entry.timestamp, index);
+ get_index(hctx, entry.timestamp, index);
- CLS_LOG(0, "storing entry at %s", index.c_str());
+ CLS_LOG(0, "storing entry at %s", index.c_str());
- int ret = write_log_entry(hctx, index, entry);
- if (ret < 0)
- return ret;
+ int ret = write_log_entry(hctx, index, entry);
+ if (ret < 0)
+ return ret;
+ }
return 0;
}
diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc
index d1c199ba263..c551f407358 100644
--- a/src/cls/log/cls_log_client.cc
+++ b/src/cls/log/cls_log_client.cc
@@ -9,25 +9,39 @@ using namespace librados;
+void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entries)
+{
+ bufferlist in;
+ cls_log_add_op call;
+ call.entries = entries;
+ ::encode(call, in);
+ op.exec("log", "add", in);
+}
+
void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry)
{
bufferlist in;
cls_log_add_op call;
- call.entry = entry;
+ call.entries.push_back(entry);
::encode(call, in);
op.exec("log", "add", in);
}
-void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp,
+void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp,
const string& section, const string& name, bufferlist& bl)
{
- cls_log_entry entry;
-
entry.timestamp = timestamp;
entry.section = section;
entry.name = name;
entry.data = bl;
+}
+
+void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp,
+ const string& section, const string& name, bufferlist& bl)
+{
+ cls_log_entry entry;
+ cls_log_add_prepare_entry(entry, timestamp, section, name, bl);
cls_log_add(op, entry);
}
diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h
index 6c0046b26f2..4171adbda11 100644
--- a/src/cls/log/cls_log_client.h
+++ b/src/cls/log/cls_log_client.h
@@ -9,6 +9,10 @@
* log objclass
*/
+void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp,
+ const string& section, const string& name, bufferlist& bl);
+
+void cls_log_add(librados::ObjectWriteOperation& op, list<cls_log_entry>& entry);
void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry);
void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp,
const string& section, const string& name, bufferlist& bl);
diff --git a/src/cls/log/cls_log_ops.h b/src/cls/log/cls_log_ops.h
index 6dc457ed5fd..6a5366f5a66 100644
--- a/src/cls/log/cls_log_ops.h
+++ b/src/cls/log/cls_log_ops.h
@@ -8,19 +8,19 @@
#include "cls_log_types.h"
struct cls_log_add_op {
- cls_log_entry entry;
+ list<cls_log_entry> entries;
cls_log_add_op() {}
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(entry, bl);
+ ::encode(entries, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
DECODE_START(1, bl);
- ::decode(entry, bl);
+ ::decode(entries, bl);
DECODE_FINISH(bl);
}
};
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index ac2367dce00..89e63791442 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -973,105 +973,6 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
}
-#if 0
-
-class CompletionMap {
- map<rgw_bucket, RefCountedCond *> entries;
- Mutex lock;
-
-public:
-
- void add(string& s) {
- Mutex::Locker l(lock);
-
- entries[s] = new RefCountedObject;
- }
-
-
- bool wait(string& s) {
- map<string, RefCountedCond *>::iterator iter;
- l.Lock();
- iter = entries.find(s);
- if (iter == entries.end()) {
- l.Unlock();
- return false;
- }
-
- RefCountedCond *rcc = iter->second;
- rcc->get();
- l.Unlock();
-
- rcc->wait();
- rcc->put();
-
- return true;
-
- }
-
- void complete(string& s) {
- lock.Lock();
-
- map<string, RefCountedCond *>::iterator iter = entries.find(s);
- if (iter == entries.end()) {
- lock.Unlock();
- return;
- }
-
- RefCountedCond *rcc = iter->second;
-
- entries.erase(iter);
-
- lock.Unlock();
-
- rcc->complete();
- rcc->put();
- }
-
-};
-
-
-class RGWChangedBucketsTracker {
- CephContext *cct;
- RGWRados *store;
-
- map<rgw_bucket, utime_t> last_reported;
-
- struct PendingInfo : public RefCountedCond {
- PendingInfo() {}
- };
-
- CompletionMap pending;
-
- Mutex lock;
-public:
- RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {}
-
- int report_bucket_changed(rgw_bucket& bucket) {
- lock.Lock();
-
- map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket);
-
- bool exists = (iter != iter.end());
- if (exists) {
- utime_t& t = iter->second;
- utime_t now = ceph_clock_now(cct);
-
- if (now > t + get_resolution_sec())
- exists = false;
- }
-
- lock.Unlock();
-
- if (exists)
- return true;
- }
-
- uint32_t get_resolution_sec();
-};
-
-
-#endif
-
void rgw_data_change::dump(Formatter *f) const
{
string type;
@@ -1094,6 +995,44 @@ int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
return (int)r;
}
+int RGWDataChangesLog::renew_entries()
+{
+ map<int, list<cls_log_entry> > m;
+
+ map<string, rgw_bucket>::iterator iter;
+ string section;
+ utime_t ut = ceph_clock_now(cct);
+ for (iter = cur_cycle.begin(); iter != cur_cycle.end(); ++iter) {
+ rgw_bucket& bucket = iter->second;
+ int index = choose_oid(bucket);
+
+ cls_log_entry entry;
+
+ rgw_data_change change;
+ bufferlist bl;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bucket.name;
+ ::encode(change, bl);
+
+ store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
+
+ m[index].push_back(entry);
+ }
+
+ map<int, list<cls_log_entry> >::iterator miter;
+ for (miter = m.begin(); miter != m.end(); ++miter) {
+ list<cls_log_entry>& entries = miter->second;
+
+ int ret = store->time_log_add(oids[miter->first], entries);
+ if (ret < 0) {
+ lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
lock.Lock();
@@ -1103,6 +1042,8 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
changes.add(bucket.name, status);
}
+ cur_cycle[bucket.name] = bucket;
+
lock.Unlock();
utime_t now = ceph_clock_now(cct);
@@ -1218,3 +1159,41 @@ int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int
return 0;
}
+
+bool RGWDataChangesLog::going_down()
+{
+ return (down_flag.read() != 0);
+}
+
+RGWDataChangesLog::~RGWDataChangesLog() {
+ down_flag.set(1);
+ renew_thread->stop();
+ renew_thread->join();
+ delete[] oids;
+}
+
+void *RGWDataChangesLog::ChangesRenewThread::entry() {
+ do {
+ dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
+ int r = log->renew_entries();
+ if (r < 0) {
+ dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
+ }
+
+ if (log->going_down())
+ break;
+
+ lock.Lock();
+ cond.WaitInterval(cct, lock, utime_t(20, 0));
+ lock.Unlock();
+ } while (!log->going_down());
+
+ return NULL;
+}
+
+void RGWDataChangesLog::ChangesRenewThread::stop()
+{
+ Mutex::Locker l(lock);
+ cond.Signal();
+}
+
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h
index 95b10a1c34d..6868a1bca46 100644
--- a/src/rgw/rgw_bucket.h
+++ b/src/rgw/rgw_bucket.h
@@ -263,6 +263,8 @@ class RGWDataChangesLog {
Mutex lock;
+ atomic_t down_flag;
+
struct ChangeStatus {
utime_t cur_expiration;
utime_t cur_sent;
@@ -283,6 +285,22 @@ class RGWDataChangesLog {
lru_map<string, ChangeStatusPtr> changes;
+ map<string, rgw_bucket> cur_cycle;
+
+ class ChangesRenewThread : public Thread {
+ CephContext *cct;
+ RGWDataChangesLog *log;
+ Mutex lock;
+ Cond cond;
+
+ public:
+ ChangesRenewThread(CephContext *_cct, RGWDataChangesLog *_log) : cct(_cct), log(_log), lock("ChangesRenewThread") {}
+ void *entry();
+ void stop();
+ };
+
+ ChangesRenewThread *renew_thread;
+
public:
RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWDataChangesLog"),
@@ -297,14 +315,16 @@ public:
snprintf(buf, sizeof(buf), "%s.%d", prefix, i);
oids[i] = buf;
}
- }
- ~RGWDataChangesLog() {
- delete[] oids;
+ renew_thread = new ChangesRenewThread(cct, this);
+ renew_thread->create();
}
+ ~RGWDataChangesLog();
+
int choose_oid(rgw_bucket& bucket);
int add_entry(rgw_bucket& bucket);
+ int renew_entries();
int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
list<rgw_data_change>& entries, string& marker, bool *truncated);
@@ -316,6 +336,8 @@ public:
};
int list_entries(utime_t& start_time, utime_t& end_time, int max_entries,
list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated);
+
+ bool going_down();
};
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 4adf1390c93..25b39ffc6b4 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -1096,6 +1096,11 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, string& sec
name = prefix + buf;
}
+void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl)
+{
+ cls_log_add_prepare_entry(entry, ut, section, key, bl);
+}
+
int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl)
{
librados::IoCtx io_ctx;
@@ -1121,6 +1126,31 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, string& section
return r;
}
+int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries)
+{
+ librados::IoCtx io_ctx;
+
+ const char *log_pool = zone.log_pool.name.c_str();
+ int r = rados->ioctx_create(log_pool, io_ctx);
+ if (r == -ENOENT) {
+ rgw_bucket pool(log_pool);
+ r = create_pool(pool);
+ if (r < 0)
+ return r;
+
+ // retry
+ r = rados->ioctx_create(log_pool, io_ctx);
+ }
+ if (r < 0)
+ return r;
+
+ ObjectWriteOperation op;
+ cls_log_add(op, entries);
+
+ r = io_ctx.operate(oid, &op);
+ return r;
+}
+
int RGWRados::time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated)
{
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index 1d062a63daf..7a8add427ad 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -985,6 +985,8 @@ public:
void shard_name(const string& prefix, unsigned max_shards, string& key, string& name);
void shard_name(const string& prefix, unsigned max_shards, string& section, string& key, string& name);
+ void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl);
+ int time_log_add(const string& oid, list<cls_log_entry>& entries);
int time_log_add(const string& oid, const utime_t& ut, string& section, string& key, bufferlist& bl);
int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
int max_entries, list<cls_log_entry>& entries, string& marker, bool *truncated);