diff options
author | Greg Farnum <greg@inktank.com> | 2013-06-20 11:01:01 -0700 |
---|---|---|
committer | Yehuda Sadeh <yehuda@inktank.com> | 2013-06-20 14:10:35 -0700 |
commit | ab79ba46a4d2c97050ae021fbab684f3a3e54c11 (patch) | |
tree | 4a04a663c0f4c0c333fc04311c27ad7ad0742a55 | |
parent | 36514731a524800d377075dc3e8032a023f88ce3 (diff) | |
download | ceph-ab79ba46a4d2c97050ae021fbab684f3a3e54c11.tar.gz |
cls_replica_log: integrate with RGWRados
We introduce an implementation class RGWReplicaLogger, and two user
classes RGWReplicaObjectLogger (for the data/metadata logs) and
RGWReplicaBucketLogger (for the bucket logs).
Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/Makefile.am | 10 | ||||
-rw-r--r-- | src/rgw/rgw_rados.h | 2 | ||||
-rw-r--r-- | src/rgw/rgw_replica_log.cc | 107 | ||||
-rw-r--r-- | src/rgw/rgw_replica_log.h | 113 |
4 files changed, 229 insertions, 3 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 3155d764d97..245e5541a0b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -401,14 +401,16 @@ librgw_a_SOURCES = \ rgw/rgw_cors.cc \ rgw/rgw_cors_s3.cc \ rgw/rgw_auth_s3.cc \ - rgw/rgw_metadata.cc + rgw/rgw_metadata.cc \ + rgw/rgw_replica_log.cc librgw_a_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS} librgw_a_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS} noinst_LIBRARIES += librgw.a my_radosgw_ldadd = \ - libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a libcls_statelog_client.a \ - libcls_lock_client.a libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \ + libglobal.la librgw.a librados.la libcls_rgw_client.a libcls_log_client.a \ + libcls_statelog_client.a libcls_replica_log_client.a libcls_lock_client.a \ + libcls_refcount_client.a libcls_version_client.a -lcurl -lexpat \ $(PTHREAD_LIBS) -lm $(CRYPTO_LIBS) $(EXTRALIBS) radosgw_SOURCES = \ @@ -420,6 +422,7 @@ radosgw_SOURCES = \ rgw/rgw_rest_user.cc \ rgw/rgw_rest_bucket.cc \ rgw/rgw_rest_metadata.cc \ + rgw/rgw_replica_log.cc \ rgw/rgw_rest_log.cc \ rgw/rgw_http_client.cc \ rgw/rgw_swift.cc \ @@ -2125,6 +2128,7 @@ noinst_HEADERS = \ rgw/rgw_swift.h\ rgw/rgw_swift_auth.h\ rgw/rgw_rados.h\ + rgw/rgw_replica_log.h \ rgw/rgw_resolve.h\ rgw/rgw_rest.h\ rgw/rgw_rest_swift.h\ diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index f292f47cb0e..70537ba5bef 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -563,6 +563,7 @@ struct RGWRegionMap { WRITE_CLASS_ENCODER(RGWRegionMap); class RGWDataChangesLog; +class RGWReplicaLogger; class RGWStateLog { RGWRados *store; @@ -653,6 +654,7 @@ class RGWRados { friend class RGWGC; friend class RGWStateLog; + friend class RGWReplicaLogger; /** Open the pool used as root for this gateway */ int open_root_pool_ctx(); diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc new file mode 100644 index 00000000000..9934902aef9 --- /dev/null +++ b/src/rgw/rgw_replica_log.cc @@ -0,0 +1,107 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "rgw_replica_log.h" +#include "cls/replica_log/cls_replica_log_client.h" +#include "rgw_rados.h" + +RGWReplicaLogger::RGWReplicaLogger(RGWRados *_store) : + cct(_store->cct), store(_store) {} + +int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool) +{ + int r = store->rados->ioctx_create(pool.c_str(), ctx); + if (r < 0) { + lderr(cct) << "ERROR: could not open rados pool " + << pool << dendl; + } + return r; +} + +int RGWReplicaLogger::update_bound(const string& oid, const string& pool, + const string& daemon_id, + const string& marker, const utime_t& time, + const list<pair<string, utime_t> > *entries) +{ + cls_replica_log_progress_marker progress; + cls_replica_log_prepare_marker(progress, daemon_id, marker, time, + entries); + + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + cls_replica_log_update_bound(opw, progress); + return ioctx.operate(oid, &opw); +} + +int RGWReplicaLogger::delete_bound(const string& oid, const string& pool, + const string& daemon_id) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + librados::ObjectWriteOperation opw; + cls_replica_log_delete_bound(opw, daemon_id); + return ioctx.operate(oid, &opw); +} + +int RGWReplicaLogger::get_bounds(const string& oid, const string& pool, + string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + + return cls_replica_log_get_bounds(ioctx, oid, marker, oldest_time, markers); +} + +void RGWReplicaLogger::get_bound_info( + const cls_replica_log_progress_marker& progress, + string& entity, string& marker, + utime_t time, + list<pair<string, utime_t> >& entries) { + cls_replica_log_extract_marker(progress, entity, marker, time, entries); +} + +RGWReplicaObjectLogger:: +RGWReplicaObjectLogger(RGWRados *_store, + const string& _pool, + const string& _prefix) : RGWReplicaLogger(_store), + pool(_pool), prefix(_prefix) { + if (pool.empty()) + store->get_log_pool_name(pool); +} + +int RGWReplicaObjectLogger::create_log_objects(int shards) +{ + librados::IoCtx ioctx; + int r = open_ioctx(ioctx, pool); + if (r < 0) { + return r; + } + for (int i = 0; i < shards; ++i) { + string oid; + get_shard_oid(i, oid); + r = ioctx.create(oid, false); + if (r < 0) + return r; + } + return r; +} diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h new file mode 100644 index 00000000000..fd461c2340f --- /dev/null +++ b/src/rgw/rgw_replica_log.h @@ -0,0 +1,113 @@ +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef RGW_REPLICA_LOG_H_ +#define RGW_REPLICA_LOG_H_ + +#include <string> +#include "cls/replica_log/cls_replica_log_types.h" +#include "include/types.h" +#include "include/utime.h" +#include "include/rados/librados.hpp" +#include "rgw_common.h" + +class RGWRados; +class CephContext; + +using namespace std; + +#define META_REPLICA_LOG_OBJ_PREFIX "meta.replicalog." +#define DATA_REPLICA_LOG_OBJ_PREFIX "data.replicalog." + +class RGWReplicaLogger { +protected: + CephContext *cct; + RGWRados *store; + int open_ioctx(librados::IoCtx& ctx, const string& pool); + + RGWReplicaLogger(RGWRados *_store); + + int update_bound(const string& oid, const string& pool, + const string& daemon_id, const string& marker, + const utime_t& time, + const list<pair<string, utime_t> > *entries); + int delete_bound(const string& oid, const string& pool, + const string& daemon_id); + int get_bounds(const string& oid, const string& pool, + string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers); + +public: + static void get_bound_info(const cls_replica_log_progress_marker& progress, + string& entity, string& marker, + utime_t time, + list<pair<string, utime_t> >& entries); +}; + +class RGWReplicaObjectLogger : private RGWReplicaLogger { + string pool; + string prefix; + + void get_shard_oid(int id, string& oid) { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", id); + oid = prefix + buf; + } + +public: + RGWReplicaObjectLogger(RGWRados *_store, + const string& _pool, + const string& _prefix); + + int create_log_objects(int shards); + int update_bound(int shard, const string& daemon_id, const string& marker, + const utime_t& time, + const list<pair<string, utime_t> > *entries) { + string oid; + get_shard_oid(shard, oid); + return RGWReplicaLogger::update_bound(oid, pool, + daemon_id, marker, time, entries); + } + int delete_bound(int shard, const string& daemon_id) { + string oid; + get_shard_oid(shard, oid); + return RGWReplicaLogger::delete_bound(oid, pool, + daemon_id); + } + int get_bounds(int shard, string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) { + string oid; + get_shard_oid(shard, oid); + return RGWReplicaLogger::get_bounds(oid, pool, + marker, oldest_time, markers); + } +}; + +class RGWReplicaBucketLogger : private RGWReplicaLogger { +public: + RGWReplicaBucketLogger(RGWRados *_store) : + RGWReplicaLogger(_store) {} + int update_bound(const rgw_bucket& bucket, const string& daemon_id, + const string& marker, const utime_t& time, + const list<pair<string, utime_t> > *entries) { + return RGWReplicaLogger::update_bound(bucket.name, bucket.index_pool, + daemon_id, marker, time, entries); + } + int delete_bound(const rgw_bucket& bucket, const string& daemon_id) { + return RGWReplicaLogger::delete_bound(bucket.name, bucket.index_pool, + daemon_id); + } + int get_bounds(const rgw_bucket& bucket, string& marker, utime_t& oldest_time, + list<cls_replica_log_progress_marker>& markers) { + return RGWReplicaLogger::get_bounds(bucket.name, bucket.index_pool, + marker, oldest_time, markers); + } +}; + +#endif /* RGW_REPLICA_LOG_H_ */ |