diff options
author | Yehuda Sadeh <yehuda@inktank.com> | 2012-06-11 10:11:17 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2012-06-11 13:25:47 -0700 |
commit | 9a70ec94c9cba296b852914f86f693acb8fbdb90 (patch) | |
tree | 9478e0ec0d33a09ecb6d712d7c74f289416f42e4 /src/cls_rgw.cc | |
parent | ec689e3e7e7ad49e7a15b924ec60008641a90dd2 (diff) | |
download | ceph-9a70ec94c9cba296b852914f86f693acb8fbdb90.tar.gz |
rgw: new class methods for handling usage information
The new methods are:
- user_usage_log_add: add new usage information
- user_usage_log_read: get usage information
- user_usage_log_trim: remove usage information
Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
Diffstat (limited to 'src/cls_rgw.cc')
-rw-r--r-- | src/cls_rgw.cc | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/src/cls_rgw.cc b/src/cls_rgw.cc index d50e95068fe..0d3f17187f6 100644 --- a/src/cls_rgw.cc +++ b/src/cls_rgw.cc @@ -21,6 +21,9 @@ cls_method_handle_t h_rgw_bucket_list; cls_method_handle_t h_rgw_bucket_prepare_op; cls_method_handle_t h_rgw_bucket_complete_op; cls_method_handle_t h_rgw_dir_suggest_changes; +cls_method_handle_t h_rgw_user_usage_log_add; +cls_method_handle_t h_rgw_user_usage_log_read; +cls_method_handle_t h_rgw_user_usage_log_trim; #define ROUND_BLOCK_SIZE 4096 @@ -408,6 +411,266 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis return 0; } +static void usage_record_prefix_by_time(uint64_t epoch, string& key) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "%011llu", (long long unsigned)epoch); + key = buf; +} + +static void usage_record_name_by_time(uint64_t epoch, string& user, string& bucket, string& key) +{ + char buf[32 + user.size() + bucket.size()]; + snprintf(buf, sizeof(buf), "%011llu_%s_%s", (long long unsigned)epoch, user.c_str(), bucket.c_str()); + key = buf; +} + +static void usage_record_name_by_user(string& user, uint64_t epoch, string& bucket, string& key) +{ + char buf[32 + user.size() + bucket.size()]; + snprintf(buf, sizeof(buf), "%s_%011llu_%s", user.c_str(), (long long unsigned)epoch, bucket.c_str()); + key = buf; +} + +static int usage_record_decode(bufferlist& record_bl, rgw_usage_log_entry& e) +{ + bufferlist::iterator kiter = record_bl.begin(); + try { + ::decode(e, kiter); + } catch (buffer::error& err) { + CLS_LOG("ERROR: usage_record_decode(): failed to decode record_bl\n"); + return -EINVAL; + } + + return 0; +} + +int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG("rgw_user_usage_log_add()"); + + bufferlist::iterator in_iter = in->begin(); + rgw_cls_usage_log_add_op op; + + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG("ERROR: rgw_user_usage_log_add(): failed to decode request\n"); + return -EINVAL; + } + + rgw_usage_log_info& info = op.info; + vector<rgw_usage_log_entry>::iterator iter; + + for (iter = info.entries.begin(); iter != info.entries.end(); ++iter) { + rgw_usage_log_entry& entry = *iter; + string key_by_time; + usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time); + + CLS_LOG("rgw_user_usage_log_add user=%s bucket=%s\n", entry.owner.c_str(), entry.bucket.c_str()); + + bufferlist record_bl; + int ret = cls_cxx_map_read_key(hctx, key_by_time, &record_bl); + if (ret < 0 && ret != -ENOENT) { + CLS_LOG("ERROR: rgw_user_usage_log_add(): cls_cxx_map_read_key returned %d\n", ret); + return -EINVAL; + } + if (ret >= 0) { + rgw_usage_log_entry e; + ret = usage_record_decode(record_bl, e); + if (ret < 0) + return ret; + CLS_LOG("rgw_user_usage_log_add aggregating existing bucket\n"); + entry.bytes_sent += e.bytes_sent; + entry.bytes_received += e.bytes_received; + } + + bufferlist new_record_bl; + ::encode(entry, new_record_bl); + ret = cls_cxx_map_write_key(hctx, key_by_time, &new_record_bl); + if (ret < 0) + return ret; + + string key_by_user; + usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user); + ret = cls_cxx_map_write_key(hctx, key_by_user, &new_record_bl); + if (ret < 0) + return ret; + } + + return 0; +} + +static int usage_iterate_range(cls_method_context_t hctx, uint64_t start, uint64_t end, + string& user, string& key_iter, uint32_t max_entries, bool *truncated, + int (*cb)(cls_method_context_t, const string&, rgw_usage_log_entry&, void *), + void *param) +{ + CLS_LOG("usage_iterate_range"); + + map<string, bufferlist> keys; +#define NUM_KEYS 32 + string filter_prefix; + string start_key, end_key; + bool by_user = !user.empty(); + uint32_t i = 0; + string user_key; + + if (truncated) + *truncated = false; + + if (!by_user) { + usage_record_prefix_by_time(end, end_key); + } else { + user_key = user; + user_key.append("_"); + } + + if (key_iter.empty()) { + if (by_user) { + start_key = user; + } else { + usage_record_prefix_by_time(start, start_key); + } + } else { + start_key = key_iter; + } + + do { + int ret = cls_cxx_map_read_keys(hctx, start_key, filter_prefix, NUM_KEYS, &keys); + if (ret < 0) + return ret; + + + map<string, bufferlist>::iterator iter = keys.begin(); + if (iter == keys.end()) + break; + + for (; iter != keys.end(); ++iter) { + const string& key = iter->first; + rgw_usage_log_entry e; + + if (!by_user && key.compare(end_key) >= 0) + return 0; + + if (by_user && key.compare(0, user_key.size(), user_key) != 0) + return 0; + + ret = usage_record_decode(iter->second, e); + if (ret < 0) + return ret; + + if (e.epoch < start) + continue; + + /* keys are sorted by epoch, so once we're past end we're done */ + if (e.epoch >= end) + return 0; + + ret = cb(hctx, key, e, param); + if (ret < 0) + return ret; + + + i++; + if (max_entries && (i > max_entries)) { + *truncated = true; + key_iter = key; + return 0; + } + } + iter--; + start_key = iter->first; + } while (true); + return 0; +} + +static int usage_log_read_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param) +{ + map<rgw_user_bucket, rgw_usage_log_entry> *usage = (map<rgw_user_bucket, rgw_usage_log_entry> *)param; + rgw_user_bucket ub(entry.owner, entry.bucket); + rgw_usage_log_entry& le = (*usage)[ub]; + le.bytes_sent += entry.bytes_sent; + le.bytes_received += entry.bytes_received; + le.epoch = entry.epoch; + le.owner = entry.owner; + le.bucket = entry.bucket; + + return 0; +} + +int rgw_user_usage_log_read(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG("rgw_user_usage_log_read()"); + + bufferlist::iterator in_iter = in->begin(); + rgw_cls_usage_log_read_op op; + + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG("ERROR: rgw_user_usage_log_read(): failed to decode request\n"); + return -EINVAL; + } + + rgw_cls_usage_log_read_ret ret_info; + map<rgw_user_bucket, rgw_usage_log_entry> *usage = &ret_info.usage; + string iter = op.iter; +#define MAX_ENTRIES 1000 + uint32_t max_entries = (op.max_entries ? op.max_entries : MAX_ENTRIES); + int ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.owner, iter, max_entries, &ret_info.truncated, usage_log_read_cb, (void *)usage); + if (ret < 0) + return ret; + + if (ret_info.truncated) + ret_info.next_iter = iter; + + ::encode(ret_info, *out); + return 0; +} + +static int usage_log_trim_cb(cls_method_context_t hctx, const string& key, rgw_usage_log_entry& entry, void *param) +{ + string key_by_time; + string key_by_user; + + usage_record_name_by_time(entry.epoch, entry.owner, entry.bucket, key_by_time); + usage_record_name_by_user(entry.owner, entry.epoch, entry.bucket, key_by_user); + + int ret = cls_cxx_map_remove_key(hctx, key_by_time); + if (ret < 0) + return ret; + + return cls_cxx_map_remove_key(hctx, key_by_user); +} + +int rgw_user_usage_log_trim(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + CLS_LOG("rgw_user_usage_log_trim()"); + + /* only continue if object exists! */ + int ret = cls_cxx_stat(hctx, NULL, NULL); + if (ret < 0) + return ret; + + bufferlist::iterator in_iter = in->begin(); + rgw_cls_usage_log_trim_op op; + + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG("ERROR: rgw_user_log_usage_log_trim(): failed to decode request\n"); + return -EINVAL; + } + + string iter; + ret = usage_iterate_range(hctx, op.start_epoch, op.end_epoch, op.user, iter, 0, NULL, usage_log_trim_cb, NULL); + if (ret < 0) + return ret; + + return 0; +} + void __cls_init() { CLS_LOG("Loaded rgw class!"); @@ -418,6 +681,9 @@ void __cls_init() cls_register_cxx_method(h_class, "bucket_prepare_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_prepare_op, &h_rgw_bucket_prepare_op); cls_register_cxx_method(h_class, "bucket_complete_op", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_bucket_complete_op, &h_rgw_bucket_complete_op); cls_register_cxx_method(h_class, "dir_suggest_changes", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_dir_suggest_changes, &h_rgw_dir_suggest_changes); + cls_register_cxx_method(h_class, "user_usage_log_add", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_user_usage_log_add, &h_rgw_user_usage_log_add); + cls_register_cxx_method(h_class, "user_usage_log_read", CLS_METHOD_RD | CLS_METHOD_PUBLIC, rgw_user_usage_log_read, &h_rgw_user_usage_log_read); + cls_register_cxx_method(h_class, "user_usage_log_trim", CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PUBLIC, rgw_user_usage_log_trim, &h_rgw_user_usage_log_trim); return; } |