summaryrefslogtreecommitdiff
path: root/src/cls_rgw.cc
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@inktank.com>2012-06-11 10:11:17 -0700
committerYehuda Sadeh <yehuda@inktank.com>2012-06-11 13:25:47 -0700
commit9a70ec94c9cba296b852914f86f693acb8fbdb90 (patch)
tree9478e0ec0d33a09ecb6d712d7c74f289416f42e4 /src/cls_rgw.cc
parentec689e3e7e7ad49e7a15b924ec60008641a90dd2 (diff)
downloadceph-9a70ec94c9cba296b852914f86f693acb8fbdb90.tar.gz
rgw: new class methods for handling usage information
The new methods are: - user_usage_log_add: add new usage information - user_usage_log_read: get usage information - user_usage_log_trim: remove usage information Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
Diffstat (limited to 'src/cls_rgw.cc')
-rw-r--r--src/cls_rgw.cc266
1 files changed, 266 insertions, 0 deletions
diff --git a/src/cls_rgw.cc b/src/cls_rgw.cc
index d50e95068fe..0d3f17187f6 100644
--- a/src/cls_rgw.cc
+++ b/src/cls_rgw.cc
@@ -21,6 +21,9 @@ cls_method_handle_t h_rgw_bucket_list;
cls_method_handle_t h_rgw_bucket_prepare_op;
cls_method_handle_t h_rgw_bucket_complete_op;
cls_method_handle_t h_rgw_dir_suggest_changes;
+cls_method_handle_t h_rgw_user_usage_log_add;
+cls_method_handle_t h_rgw_user_usage_log_read;
+cls_method_handle_t h_rgw_user_usage_log_trim;
#define ROUND_BLOCK_SIZE 4096
@@ -408,6 +411,266 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
return 0;
}
+static void usage_record_prefix_by_time(uint64_t epoch, string& key)
+{
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%011llu", (long long unsigned)epoch);
+ key = buf;
+}
+
+static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key)
+{
+ char buf[32 + user.size() + bucket.size()];
+ snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str());
+ key = buf;
+}
+
+static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key)
+{
+ char buf[32 + user.size() + bucket.size()];
+ snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str());
+ key = buf;
+}
+
+static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e)
+{
+ bufferlist::iterator kiter = record_bl.begin();
+ try {
+ ::decode(e, kiter);
+ } catch (buffer::error& err) {
+ CLS_LOG("ERROR: usage_record_decode(): failed to decode record_bl\n");
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG("rgw_user_usage_log_add()");
+
+ bufferlist::iterator in_iter = in->begin();
+ rgw_cls_usage_log_add_op op;
+
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG("ERROR: rgw_user_usage_log_add(): failed to decode request\n");
+ return -EINVAL;
+ }
+
+ rgw_usage_log_info& info = op.info;
+ vector<rgw_usage_log_entry>::iterator iter;
+
+ for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) {
+ rgw_usage_log_entry& entry = *iter;
+ string key_by_time;
+ usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
+
+ CLS_LOG("rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str());
+
+ bufferlist record_bl;
+ int ret = cls_cxx_map_read_key(hctx, key_by_time, &record_bl);
+ if (ret < 0 && ret != -ENOENT) {
+ CLS_LOG("ERROR: rgw_user_usage_log_add(): cls_cxx_map_read_key returned %d\n", ret);
+ return -EINVAL;
+ }
+ if (ret >= 0) {
+ rgw_usage_log_entry e;
+ ret = usage_record_decode(record_bl, e);
+ if (ret < 0)
+ return ret;
+ CLS_LOG("rgw_user_usage_log_add aggregating existing bucket\n");
+ entry.bytes_sent += e.bytes_sent;
+ entry.bytes_received += e.bytes_received;
+ }
+
+ bufferlist new_record_bl;
+ ::encode(entry, new_record_bl);
+ ret = cls_cxx_map_write_key(hctx, key_by_time, &new_record_bl);
+ if (ret < 0)
+ return ret;
+
+ string key_by_user;
+ usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+ ret = cls_cxx_map_write_key(hctx, key_by_user, &new_record_bl);
+ if (ret < 0)
+ return ret;
+ }
+
+ return 0;
+}
+
+static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64_t end,
+ string& user, string& key_iter, uint32_t max_entries, bool *truncated,
+ int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *),
+ void *param)
+{
+ CLS_LOG("usage_iterate_range");
+
+ map<string, bufferlist> keys;
+#define NUM_KEYS 32
+ string filter_prefix;
+ string start_key, end_key;
+ bool by_user = !user.empty();
+ uint32_t i = 0;
+ string user_key;
+
+ if (truncated)
+ *truncated = false;
+
+ if (!by_user) {
+ usage_record_prefix_by_time(end, end_key);
+ } else {
+ user_key = user;
+ user_key.append("_");
+ }
+
+ if (key_iter.empty()) {
+ if (by_user) {
+ start_key = user;
+ } else {
+ usage_record_prefix_by_time(start, start_key);
+ }
+ } else {
+ start_key = key_iter;
+ }
+
+ do {
+ int ret = cls_cxx_map_read_keys(hctx, start_key, filter_prefix, NUM_KEYS, &keys);
+ if (ret < 0)
+ return ret;
+
+
+ map<string, bufferlist>::iterator iter = keys.begin();
+ if (iter == keys.end())
+ break;
+
+ for (; iter != keys.end(); ++iter) {
+ const string& key = iter->first;
+ rgw_usage_log_entry e;
+
+ if (!by_user && key.compare(end_key) >= 0)
+ return 0;
+
+ if (by_user && key.compare(0, user_key.size(), user_key) != 0)
+ return 0;
+
+ ret = usage_record_decode(iter->second, e);
+ if (ret < 0)
+ return ret;
+
+ if (e.epoch < start)
+ continue;
+
+ /* keys are sorted by epoch, so once we're past end we're done */
+ if (e.epoch >= end)
+ return 0;
+
+ ret = cb(hctx, key, e, param);
+ if (ret < 0)
+ return ret;
+
+
+ i++;
+ if (max_entries && (i > max_entries)) {
+ *truncated = true;
+ key_iter = key;
+ return 0;
+ }
+ }
+ iter--;
+ start_key = iter->first;
+ } while (true);
+ return 0;
+}
+
+static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
+{
+ map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param;
+ rgw_user_bucket ub(entry.owner, entry.bucket);
+ rgw_usage_log_entry& le = (*usage)[ub];
+ le.bytes_sent += entry.bytes_sent;
+ le.bytes_received += entry.bytes_received;
+ le.epoch = entry.epoch;
+ le.owner = entry.owner;
+ le.bucket = entry.bucket;
+
+ return 0;
+}
+
+int rgw_user_usage_log_read(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG("rgw_user_usage_log_read()");
+
+ bufferlist::iterator in_iter = in->begin();
+ rgw_cls_usage_log_read_op op;
+
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG("ERROR: rgw_user_usage_log_read(): failed to decode request\n");
+ return -EINVAL;
+ }
+
+ rgw_cls_usage_log_read_ret ret_info;
+ map<rgw_user_bucket, rgw_usage_log_entry> *usage = &ret_info.usage;
+ string iter = op.iter;
+#define MAX_ENTRIES 1000
+ uint32_t max_entries = (op.max_entries ? op.max_entries : MAX_ENTRIES);
+ int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, iter, max_entries, &ret_info.truncated, usage_log_read_cb, (void *)usage);
+ if (ret < 0)
+ return ret;
+
+ if (ret_info.truncated)
+ ret_info.next_iter = iter;
+
+ ::encode(ret_info, *out);
+ return 0;
+}
+
+static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param)
+{
+ string key_by_time;
+ string key_by_user;
+
+ usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time);
+ usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user);
+
+ int ret = cls_cxx_map_remove_key(hctx, key_by_time);
+ if (ret < 0)
+ return ret;
+
+ return cls_cxx_map_remove_key(hctx, key_by_user);
+}
+
+int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG("rgw_user_usage_log_trim()");
+
+ /* only continue if object exists! */
+ int ret = cls_cxx_stat(hctx, NULL, NULL);
+ if (ret < 0)
+ return ret;
+
+ bufferlist::iterator in_iter = in->begin();
+ rgw_cls_usage_log_trim_op op;
+
+ try {
+ ::decode(op, in_iter);
+ } catch (buffer::error& err) {
+ CLS_LOG("ERROR: rgw_user_log_usage_log_trim(): failed to decode request\n");
+ return -EINVAL;
+ }
+
+ string iter;
+ ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, 0, NULL, usage_log_trim_cb, NULL);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
void __cls_init()
{
CLS_LOG("Loaded rgw class!");
@@ -418,6 +681,9 @@ void __cls_init()
cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op);
cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_complete_op, &h_rgw_bucket_complete_op);
cls_register_cxx_method(h_class, "dir_suggest_changes", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes);
+ cls_register_cxx_method(h_class, "user_usage_log_add", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_user_usage_log_add, &h_rgw_user_usage_log_add);
+ cls_register_cxx_method(h_class, "user_usage_log_read", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_user_usage_log_read, &h_rgw_user_usage_log_read);
+ cls_register_cxx_method(h_class, "user_usage_log_trim", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_user_usage_log_trim, &h_rgw_user_usage_log_trim);
return;
}