summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2013-04-25 19:06:08 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-04-29 22:46:25 -0700
commit53fb6832b656da67d66e83f5dfa2327d74aee9b6 (patch)
treeee8ac3af5886cd910def582238485904ea20685f
parenta9ae823c33c88caa03cd51c2b6ca76d27382a52a (diff)
downloadceph-53fb6832b656da67d66e83f5dfa2327d74aee9b6.tar.gz
rgw: data changes log, naive implementation
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
-rw-r--r--src/rgw/rgw_admin.cc49
-rw-r--r--src/rgw/rgw_bucket.cc188
-rw-r--r--src/rgw/rgw_bucket.h72
-rw-r--r--src/rgw/rgw_rados.cc11
-rw-r--r--src/rgw/rgw_rados.h6
5 files changed, 322 insertions, 4 deletions
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index 6418e4c8a29..bb6f69ed638 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -90,6 +90,7 @@ void _usage()
cerr << " metadata list list metadata info\n";
cerr << " mdlog list list metadata log\n";
cerr << " bilog list list bucket index log\n";
+ cerr << " datalog list list data log\n";
cerr << "options:\n";
cerr << " --uid=<id> user id\n";
cerr << " --subuser=<name> subuser name\n";
@@ -198,6 +199,7 @@ enum {
OPT_METADATA_LIST,
OPT_MDLOG_LIST,
OPT_BILOG_LIST,
+ OPT_DATALOG_LIST,
};
static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
@@ -225,7 +227,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
strcmp(cmd, "temp") == 0 ||
strcmp(cmd, "metadata") == 0 ||
strcmp(cmd, "mdlog") == 0 ||
- strcmp(cmd, "bilog") == 0) {
+ strcmp(cmd, "bilog") == 0 ||
+ strcmp(cmd, "datalog") == 0) {
*need_more = true;
return 0;
}
@@ -366,6 +369,9 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
} else if (strcmp(prev_cmd, "bilog") == 0) {
if (strcmp(cmd, "list") == 0)
return OPT_BILOG_LIST;
+ } else if (strcmp(prev_cmd, "datalog") == 0) {
+ if (strcmp(cmd, "list") == 0)
+ return OPT_DATALOG_LIST;
}
return -EINVAL;
@@ -1797,6 +1803,47 @@ next:
formatter->close_section();
formatter->flush(cout);
}
+
+ if (opt_cmd == OPT_DATALOG_LIST) {
+ formatter->open_array_section("entries");
+ bool truncated;
+ int count = 0;
+ if (max_entries < 0)
+ max_entries = 1000;
+
+ utime_t start_time, end_time;
+
+ int ret = parse_date_str(start_date, start_time);
+ if (ret < 0)
+ return -ret;
+
+ ret = parse_date_str(end_date, end_time);
+ if (ret < 0)
+ return -ret;
+
+ RGWDataChangesLog *log = store->data_log;
+ RGWDataChangesLog::LogMarker marker;
+
+ do {
+ list<rgw_data_change> entries;
+ ret = log->list_entries(start_time, end_time, max_entries - count, entries, marker, &truncated);
+ if (ret < 0) {
+ cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ count += entries.size();
+
+ for (list<rgw_data_change>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+ rgw_data_change& entry = *iter;
+ encode_json("entry", entry, formatter);
+ }
+ formatter->flush(cout);
+ } while (truncated && count < max_entries);
+
+ formatter->close_section();
+ formatter->flush(cout);
+ }
return 0;
}
diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc
index d13d359a11c..00f52550aae 100644
--- a/src/rgw/rgw_bucket.cc
+++ b/src/rgw/rgw_bucket.cc
@@ -4,6 +4,7 @@
#include <map>
#include "common/errno.h"
+#include "common/ceph_json.h"
#include "rgw_rados.h"
#include "rgw_acl.h"
#include "rgw_acl_s3.h"
@@ -971,3 +972,190 @@ int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
return 0;
}
+
+#if 0
+
+class CompletionMap {
+ map<rgw_bucket, RefCountedCond *> entries;
+ Mutex lock;
+
+public:
+
+ void add(string& s) {
+ Mutex::Locker l(lock);
+
+ entries[s] = new RefCountedObject;
+ }
+
+
+ bool wait(string& s) {
+ map<string, RefCountedCond *>::iterator iter;
+ l.Lock();
+ iter = entries.find(s);
+ if (iter == entries.end()) {
+ l.Unlock();
+ return false;
+ }
+
+ RefCountedCond *rcc = iter->second;
+ rcc->get();
+ l.Unlock();
+
+ rcc->wait();
+ rcc->put();
+
+ return true;
+
+ }
+
+ void complete(string& s) {
+ lock.Lock();
+
+ map<string, RefCountedCond *>::iterator iter = entries.find(s);
+ if (iter == entries.end()) {
+ lock.Unlock();
+ return;
+ }
+
+ RefCountedCond *rcc = iter->second;
+
+ entries.erase(iter);
+
+ lock.Unlock();
+
+ rcc->complete();
+ rcc->put();
+ }
+
+};
+
+
+class RGWChangedBucketsTracker {
+ CephContext *cct;
+ RGWRados *store;
+
+ map<rgw_bucket, utime_t> last_reported;
+
+ struct PendingInfo : public RefCountedCond {
+ PendingInfo() {}
+ };
+
+ CompletionMap pending;
+
+ Mutex lock;
+public:
+ RGWChangedBucketsTracker(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), lock("RGWChanedBucketsTracker") {}
+
+ int report_bucket_changed(rgw_bucket& bucket) {
+ lock.Lock();
+
+ map<rgw_bucket, utime_t>::iteartor iter = last_reported.find(bucket);
+
+ bool exists = (iter != iter.end());
+ if (exists) {
+ utime_t& t = iter->second;
+ utime_t now = ceph_clock_now(cct);
+
+ if (now > t + get_resolution_sec())
+ exists = false;
+ }
+
+ lock.Unlock();
+
+ if (exists)
+ return true;
+ }
+
+ uint32_t get_resolution_sec();
+};
+
+
+#endif
+
+void rgw_data_change::dump(Formatter *f) const
+{
+ string type;
+ switch (entity_type) {
+ case ENTITY_TYPE_BUCKET:
+ type = "bucket";
+ break;
+ default:
+ type = "unknown";
+ }
+ encode_json("entity_type", type, f);
+ encode_json("key", key, f);
+}
+
+
+int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
+ string& name = bucket.name;
+ uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards;
+
+ return (int)r;
+}
+
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+ string& oid = oids[choose_oid(bucket)];
+
+ utime_t ut = ceph_clock_now(cct);
+ bufferlist bl;
+ rgw_data_change change;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bucket.name;
+ ::encode(change, bl);
+ string section;
+ return store->time_log_add(oid, ut, section, change.key, bl);
+}
+
+int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
+ list<rgw_data_change>& entries, string& marker, bool *truncated) {
+
+ list<cls_log_entry> log_entries;
+
+ int ret = store->time_log_list(oids[shard], start_time, end_time,
+ max_entries, log_entries, marker, truncated);
+ if (ret < 0)
+ return ret;
+
+ list<cls_log_entry>::iterator iter;
+ for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+ rgw_data_change entry;
+ bufferlist::iterator liter = iter->data.begin();
+ try {
+ ::decode(entry, liter);
+ } catch (buffer::error& err) {
+ lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
+ return -EIO;
+ }
+ entries.push_back(entry);
+ }
+
+ return 0;
+}
+
+int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int max_entries,
+ list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated) {
+ bool truncated;
+
+ entries.clear();
+
+ for (; marker.shard < num_shards && (int)entries.size() < max_entries;
+ marker.shard++, marker.marker.clear()) {
+ int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
+ marker.marker, &truncated);
+ if (ret == -ENOENT) {
+ continue;
+ }
+ if (ret < 0) {
+ return ret;
+ }
+ if (truncated) {
+ *ptruncated = true;
+ return 0;
+ }
+ }
+
+ *ptruncated = (marker.shard < num_shards);
+
+ return 0;
+}
diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h
index c1a8f60deea..d3e9f968c6c 100644
--- a/src/rgw/rgw_bucket.h
+++ b/src/rgw/rgw_bucket.h
@@ -222,4 +222,76 @@ public:
static int info(RGWRados *store, RGWBucketAdminOpState& op_state, RGWFormatterFlusher& flusher);
};
+
+enum DataLogEntityType {
+ ENTITY_TYPE_BUCKET = 1,
+};
+
+struct rgw_data_change {
+ DataLogEntityType entity_type;
+ string key;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ uint8_t t = (uint8_t)entity_type;
+ ::encode(t, bl);
+ ::encode(key, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ uint8_t t;
+ ::decode(t, bl);
+ entity_type = (DataLogEntityType)t;
+ ::decode(key, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(rgw_data_change)
+
+class RGWDataChangesLog {
+ CephContext *cct;
+ RGWRados *store;
+
+ int num_shards;
+ string *oids;
+
+public:
+
+ RGWDataChangesLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store) {
+ num_shards = 128; /* FIXME */
+ oids = new string[num_shards];
+
+ const char *prefix = "bucket_log"; /* FIXME */
+
+ for (int i = 0; i < num_shards; i++) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%s.%d", prefix, i);
+ oids[i] = buf;
+ }
+ }
+
+ ~RGWDataChangesLog() {
+ delete[] oids;
+ }
+
+ int choose_oid(rgw_bucket& bucket);
+ int add_entry(rgw_bucket& bucket);
+ int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
+ list<rgw_data_change>& entries, string& marker, bool *truncated);
+
+ struct LogMarker {
+ int shard;
+ string marker;
+
+ LogMarker() : shard(0) {}
+ };
+ int list_entries(utime_t& start_time, utime_t& end_time, int max_entries,
+ list<rgw_data_change>& entries, LogMarker& marker, bool *ptruncated);
+};
+
+
#endif
diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc
index 0f5fc53dae5..4adf1390c93 100644
--- a/src/rgw/rgw_rados.cc
+++ b/src/rgw/rgw_rados.cc
@@ -12,6 +12,7 @@
#include "rgw_cache.h"
#include "rgw_acl.h"
#include "rgw_metadata.h"
+#include "rgw_bucket.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_client.h"
@@ -497,6 +498,7 @@ void RGWRadosCtx::set_prefetch_data(rgw_obj& obj) {
void RGWRados::finalize()
{
delete meta_mgr;
+ delete data_log;
if (use_gc_thread) {
gc->stop_processor();
delete gc;
@@ -525,6 +527,7 @@ int RGWRados::init_rados()
return ret;
meta_mgr = new RGWMetadataManager(cct, this);
+ data_log = new RGWDataChangesLog(cct, this);
return ret;
}
@@ -2884,6 +2887,12 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
if (bucket_is_system(bucket))
return 0;
+ int ret = data_log->add_entry(obj.bucket);
+ if (ret < 0) {
+ lderr(cct) << "ERROR: failed writing data log" << dendl;
+ return ret;
+ }
+
if (state && state->obj_tag.length()) {
int len = state->obj_tag.length();
char buf[len + 1];
@@ -2895,7 +2904,7 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
append_rand_alpha(cct, tag, tag, 32);
}
}
- int ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag,
+ ret = cls_obj_prepare_op(bucket, CLS_RGW_OP_ADD, tag,
obj.object, obj.key);
return ret;
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index bf816d36c75..1d062a63daf 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -455,7 +455,7 @@ struct RGWRegionMap {
};
WRITE_CLASS_ENCODER(RGWRegionMap);
-
+class RGWDataChangesLog;
class RGWRados
{
@@ -563,7 +563,7 @@ public:
num_watchers(0), watchers(NULL), watch_handles(NULL),
bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
cct(NULL), rados(NULL),
- pools_initialized(false), meta_mgr(NULL) {}
+ pools_initialized(false), meta_mgr(NULL), data_log(NULL) {}
void set_context(CephContext *_cct) {
cct = _cct;
@@ -582,6 +582,8 @@ public:
RGWMetadataManager *meta_mgr;
+ RGWDataChangesLog *data_log;
+
virtual ~RGWRados() {
if (rados) {
rados->shutdown();