summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <greg@inktank.com>2013-06-20 11:01:01 -0700
committerYehuda Sadeh <yehuda@inktank.com>2013-06-20 14:10:35 -0700
commitab79ba46a4d2c97050ae021fbab684f3a3e54c11 (patch)
tree4a04a663c0f4c0c333fc04311c27ad7ad0742a55
parent36514731a524800d377075dc3e8032a023f88ce3 (diff)
downloadceph-ab79ba46a4d2c97050ae021fbab684f3a3e54c11.tar.gz
cls_replica_log: integrate with RGWRados
We introduce an implementation class RGWReplicaLogger, and two user classes RGWReplicaObjectLogger (for the data/metadata logs) and RGWReplicaBucketLogger (for the bucket logs). Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/Makefile.am10
-rw-r--r--src/rgw/rgw_rados.h2
-rw-r--r--src/rgw/rgw_replica_log.cc107
-rw-r--r--src/rgw/rgw_replica_log.h113
4 files changed, 229 insertions, 3 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 3155d764d97..245e5541a0b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -401,14 +401,16 @@ librgw_a_SOURCES = \
rgw/rgw_cors.cc \
rgw/rgw_cors_s3.cc \
rgw/rgw_auth_s3.cc \
- rgw/rgw_metadata.cc
+ rgw/rgw_metadata.cc \
+ rgw/rgw_replica_log.cc
librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS}
librgw_a_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS}
noinst_LIBRARIES += librgw.a
my_radosgw_ldadd = \
- libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a libcls_statelog_client.a \
- libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \
+ libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \
+ libcls_statelog_client.a libcls_replica_log_client.a libcls_lock_client.a \
+ libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \
$(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS)
radosgw_SOURCES = \
@@ -420,6 +422,7 @@ radosgw_SOURCES = \
rgw/rgw_rest_user.cc \
rgw/rgw_rest_bucket.cc \
rgw/rgw_rest_metadata.cc \
+ rgw/rgw_replica_log.cc \
rgw/rgw_rest_log.cc \
rgw/rgw_http_client.cc \
rgw/rgw_swift.cc \
@@ -2125,6 +2128,7 @@ noinst_HEADERS = \
rgw/rgw_swift.h\
rgw/rgw_swift_auth.h\
rgw/rgw_rados.h\
+ rgw/rgw_replica_log.h \
rgw/rgw_resolve.h\
rgw/rgw_rest.h\
rgw/rgw_rest_swift.h\
diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h
index f292f47cb0e..70537ba5bef 100644
--- a/src/rgw/rgw_rados.h
+++ b/src/rgw/rgw_rados.h
@@ -563,6 +563,7 @@ struct RGWRegionMap {
WRITE_CLASS_ENCODER(RGWRegionMap);
class RGWDataChangesLog;
+class RGWReplicaLogger;
class RGWStateLog {
RGWRados *store;
@@ -653,6 +654,7 @@ class RGWRados
{
friend class RGWGC;
friend class RGWStateLog;
+ friend class RGWReplicaLogger;
/** Open the pool used as root for this gateway */
int open_root_pool_ctx();
diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc
new file mode 100644
index 00000000000..9934902aef9
--- /dev/null
+++ b/src/rgw/rgw_replica_log.cc
@@ -0,0 +1,107 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ * Copyright 2013 Inktank
+ */
+
+#include "rgw_replica_log.h"
+#include "cls/replica_log/cls_replica_log_client.h"
+#include "rgw_rados.h"
+
+RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) :
+ cct(_store->cct), store(_store) {}
+
+int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool)
+{
+ int r = store->rados->ioctx_create(pool.c_str(), ctx);
+ if (r < 0) {
+ lderr(cct) << "ERROR: could not open rados pool "
+ << pool << dendl;
+ }
+ return r;
+}
+
+int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
+ const string& daemon_id,
+ const string& marker, const utime_t& time,
+ const list<pair<string, utime_t> > *entries)
+{
+ cls_replica_log_progress_marker progress;
+ cls_replica_log_prepare_marker(progress, daemon_id, marker, time,
+ entries);
+
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation opw;
+ cls_replica_log_update_bound(opw, progress);
+ return ioctx.operate(oid, &opw);
+}
+
+int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
+ const string& daemon_id)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+
+ librados::ObjectWriteOperation opw;
+ cls_replica_log_delete_bound(opw, daemon_id);
+ return ioctx.operate(oid, &opw);
+}
+
+int RGWReplicaLogger::get_bounds(const string& oid, const string& pool,
+ string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+
+ return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers);
+}
+
+void RGWReplicaLogger::get_bound_info(
+ const cls_replica_log_progress_marker& progress,
+ string& entity, string& marker,
+ utime_t time,
+ list<pair<string, utime_t> >& entries) {
+ cls_replica_log_extract_marker(progress, entity, marker, time, entries);
+}
+
+RGWReplicaObjectLogger::
+RGWReplicaObjectLogger(RGWRados *_store,
+ const string& _pool,
+ const string& _prefix) : RGWReplicaLogger(_store),
+ pool(_pool), prefix(_prefix) {
+ if (pool.empty())
+ store->get_log_pool_name(pool);
+}
+
+int RGWReplicaObjectLogger::create_log_objects(int shards)
+{
+ librados::IoCtx ioctx;
+ int r = open_ioctx(ioctx, pool);
+ if (r < 0) {
+ return r;
+ }
+ for (int i = 0; i < shards; ++i) {
+ string oid;
+ get_shard_oid(i, oid);
+ r = ioctx.create(oid, false);
+ if (r < 0)
+ return r;
+ }
+ return r;
+}
diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h
new file mode 100644
index 00000000000..fd461c2340f
--- /dev/null
+++ b/src/rgw/rgw_replica_log.h
@@ -0,0 +1,113 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#ifndef RGW_REPLICA_LOG_H_
+#define RGW_REPLICA_LOG_H_
+
+#include <string>
+#include "cls/replica_log/cls_replica_log_types.h"
+#include "include/types.h"
+#include "include/utime.h"
+#include "include/rados/librados.hpp"
+#include "rgw_common.h"
+
+class RGWRados;
+class CephContext;
+
+using namespace std;
+
+#define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog."
+#define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog."
+
+class RGWReplicaLogger {
+protected:
+ CephContext *cct;
+ RGWRados *store;
+ int open_ioctx(librados::IoCtx& ctx, const string& pool);
+
+ RGWReplicaLogger(RGWRados *_store);
+
+ int update_bound(const string& oid, const string& pool,
+ const string& daemon_id, const string& marker,
+ const utime_t& time,
+ const list<pair<string, utime_t> > *entries);
+ int delete_bound(const string& oid, const string& pool,
+ const string& daemon_id);
+ int get_bounds(const string& oid, const string& pool,
+ string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers);
+
+public:
+ static void get_bound_info(const cls_replica_log_progress_marker& progress,
+ string& entity, string& marker,
+ utime_t time,
+ list<pair<string, utime_t> >& entries);
+};
+
+class RGWReplicaObjectLogger : private RGWReplicaLogger {
+ string pool;
+ string prefix;
+
+ void get_shard_oid(int id, string& oid) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", id);
+ oid = prefix + buf;
+ }
+
+public:
+ RGWReplicaObjectLogger(RGWRados *_store,
+ const string& _pool,
+ const string& _prefix);
+
+ int create_log_objects(int shards);
+ int update_bound(int shard, const string& daemon_id, const string& marker,
+ const utime_t& time,
+ const list<pair<string, utime_t> > *entries) {
+ string oid;
+ get_shard_oid(shard, oid);
+ return RGWReplicaLogger::update_bound(oid, pool,
+ daemon_id, marker, time, entries);
+ }
+ int delete_bound(int shard, const string& daemon_id) {
+ string oid;
+ get_shard_oid(shard, oid);
+ return RGWReplicaLogger::delete_bound(oid, pool,
+ daemon_id);
+ }
+ int get_bounds(int shard, string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers) {
+ string oid;
+ get_shard_oid(shard, oid);
+ return RGWReplicaLogger::get_bounds(oid, pool,
+ marker, oldest_time, markers);
+ }
+};
+
+class RGWReplicaBucketLogger : private RGWReplicaLogger {
+public:
+ RGWReplicaBucketLogger(RGWRados *_store) :
+ RGWReplicaLogger(_store) {}
+ int update_bound(const rgw_bucket& bucket, const string& daemon_id,
+ const string& marker, const utime_t& time,
+ const list<pair<string, utime_t> > *entries) {
+ return RGWReplicaLogger::update_bound(bucket.name, bucket.index_pool,
+ daemon_id, marker, time, entries);
+ }
+ int delete_bound(const rgw_bucket& bucket, const string& daemon_id) {
+ return RGWReplicaLogger::delete_bound(bucket.name, bucket.index_pool,
+ daemon_id);
+ }
+ int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time,
+ list<cls_replica_log_progress_marker>& markers) {
+ return RGWReplicaLogger::get_bounds(bucket.name, bucket.index_pool,
+ marker, oldest_time, markers);
+ }
+};
+
+#endif /* RGW_REPLICA_LOG_H_ */