summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README28
-rw-r--r--ceph.spec.in1
-rw-r--r--doc/dev/versions.rst41
-rw-r--r--doc/rados/operations/operating.rst31
-rw-r--r--src/auth/cephx/CephxKeyServer.cc6
-rw-r--r--src/ceph_osd.cc2
-rw-r--r--src/common/config_opts.h2
-rw-r--r--src/common/sharedptr_registry.hpp23
-rw-r--r--src/librados/AioCompletionImpl.h7
-rw-r--r--src/librados/IoCtxImpl.cc15
-rw-r--r--src/librados/IoCtxImpl.h6
-rw-r--r--src/librados/librados.cc6
-rw-r--r--src/librbd/internal.cc2
-rw-r--r--src/messages/MOSDOpReply.h58
-rw-r--r--src/mon/DataHealthService.cc42
-rw-r--r--src/mon/DataHealthService.h1
-rw-r--r--src/mon/Monitor.cc15
-rw-r--r--src/mon/MonitorDBStore.h8
-rw-r--r--src/mon/mon_types.h68
-rw-r--r--src/objclass/class_api.cc2
-rw-r--r--src/os/FileStore.cc3
-rw-r--r--src/os/KeyValueDB.h2
-rw-r--r--src/os/LevelDBStore.h64
-rw-r--r--src/os/WBThrottle.cc2
-rw-r--r--src/osd/OSD.cc79
-rw-r--r--src/osd/OSD.h3
-rw-r--r--src/osd/PG.cc3
-rw-r--r--src/osd/PGLog.cc1
-rw-r--r--src/osd/PGLog.h6
-rw-r--r--src/osd/ReplicatedPG.cc168
-rw-r--r--src/osd/ReplicatedPG.h4
-rw-r--r--src/osd/osd_types.cc33
-rw-r--r--src/osd/osd_types.h21
-rw-r--r--src/osdc/Objecter.cc15
-rw-r--r--src/osdc/Objecter.h56
-rw-r--r--src/test/ObjectMap/KeyValueDBMemory.h18
-rw-r--r--src/test/ObjectMap/test_store_tool/test_store_tool.cc14
-rw-r--r--src/test/common/test_sharedptr_registry.cc24
-rw-r--r--src/test/test_osd_types.cc3
39 files changed, 630 insertions, 253 deletions
diff --git a/README b/README
index 1dcf94512ac..3662d0ea2cb 100644
--- a/README
+++ b/README
@@ -97,7 +97,11 @@ To build the documentation, ensure that you are in the top-level `/ceph director
Build Prerequisites
--------------------
+===================
+
+
+debian-based
+------------
To build the source code, you must install the following:
- automake
@@ -132,3 +136,25 @@ To build the source code, you must install the following:
For example:
$ apt-get install automake autoconf pkg-config gcc g++ make libboost-dev libedit-dev libssl-dev libtool libfcgi libfcgi-dev libfuse-dev linux-kernel-headers libcrypto++-dev libaio-dev libgoogle-perftools-dev libkeyutils-dev uuid-dev libatomic-ops-dev libboost-program-options-dev libboost-thread-dev libexpat1-dev libleveldb-dev libsnappy-dev libcurl4-gnutls-dev python-argparse python-flask
+
+rpm-based
+---------
+These are the rpm packages needed to install in an rpm-based OS:
+
+ autoconf
+ automake
+ gcc
+ make
+ libtool
+ python-argparse
+ python-flask
+ libuuid-devel
+ nss-devel
+ fuse-devel
+ gperftools-devel
+ libedit-devel
+ libatomic_ops-devel
+ snappy-devel
+ leveldb-devel
+ libaio-devel
+ boost-devel
diff --git a/ceph.spec.in b/ceph.spec.in
index 11a962d8bdc..8091018c1dc 100644
--- a/ceph.spec.in
+++ b/ceph.spec.in
@@ -126,7 +126,6 @@ Requires: apache2-mod_fcgid
%else
BuildRequires: expat-devel
BuildRequires: fcgi-devel
-Requires: mod_fcgid
%endif
%description radosgw
radosgw is an S3 HTTP REST gateway for the RADOS object store. It is
diff --git a/doc/dev/versions.rst b/doc/dev/versions.rst
new file mode 100644
index 00000000000..c6a1ee69023
--- /dev/null
+++ b/doc/dev/versions.rst
@@ -0,0 +1,41 @@
+==============
+Public OSD Version
+==============
+We maintain two versions on disk: an eversion_t pg_log.head and a
+version_t info.user_version. Each object is tagged with both the pg
+version and user_version it was last modified with. The PG version is
+modified by manipulating OpContext::at_version and then persisting it
+to the pg log as transactions, and is incremented in all the places it
+used to be. The user_version is modified by manipulating the new
+OpContext::user_at_version and is also persisted via the pg log
+transactions.
+user_at_version is modified only in ReplicatedPG::prepare_transaction
+when the op was a "user modify" (a non-watch write), and the durable
+user_version is updated according to the following rules:
+1) increment user_at_version
+2) set user_at_version to the maximum of itself and
+ctx->at_version.version.
+3) ctx->new_obs.oi.user_version = ctx->user_at_version (to change the
+object's user_version)
+
+This set of update semantics mean that for traditional pools the
+user_version will be equal to the past reassert_version, while for
+caching pools the object and PG user-version will be able to cross
+pools without making a total mess of things.
+In order to support old clients, we keep the old reassert_version but
+rename it to "bad_replay_version"; we fill it in as before: for writes
+it is set to the at_version (and is the proper replay version); for
+watches it is set to our user version; for ENOENT replies it is set to
+the replay version's epoch but the user_version's version. We also now
+fill in the version_t portion of the bad_replay_version on read ops as
+well as write ops, which should be fine for all old clients.
+
+For new clients, we prevent them from reading bad_replay_version and
+add two proper members: user_version and replay_version; user_version
+is filled in on every operation (reads included) while replay_version
+is filled in for writes.
+
+The objclass function get_current_version() now always returns the
+user_at_version, which means it is guaranteed to contain the version
+of the last user update in the PG (including on reads!).
+
diff --git a/doc/rados/operations/operating.rst b/doc/rados/operations/operating.rst
index 591704217d0..9942ea3cabf 100644
--- a/doc/rados/operations/operating.rst
+++ b/doc/rados/operations/operating.rst
@@ -8,28 +8,31 @@ Running Ceph with Upstart
=========================
When deploying Ceph Cuttlefish and beyond with ``ceph-deploy``, you may start
-and stop Ceph daemons or the entire cluster using the event-based `Upstart`_.
+and stop Ceph daemons on a :term:`Ceph Node` using the event-based `Upstart`_.
Upstart does not require you to define daemon instances in the Ceph configuration
file (although, they are still required for ``sysvinit`` should you choose to
use it).
-To list the Ceph Upstart jobs and instances, execute::
+To list the Ceph Upstart jobs and instances on a node, execute::
sudo initctl list | grep ceph
See `initctl`_ for additional details.
-Starting a Cluster
-------------------
+Starting all Daemons
+--------------------
-To start the cluster, execute the following::
+To start all daemons on a Ceph Node (irrespective of type), execute the
+following::
sudo start ceph-all
-Stopping a Cluster
-------------------
-To stop the cluster, execute the following::
+Stopping all Daemons
+--------------------
+
+To stop all daemons on a Ceph Node (irrespective of type), execute the
+following::
sudo stop ceph-all
@@ -37,7 +40,8 @@ To stop the cluster, execute the following::
Starting all Daemons by Type
----------------------------
-To start all daemons of a particular type, execute one of the following::
+To start all daemons of a particular type on a Ceph Node, execute one of the
+following::
sudo start ceph-osd-all
sudo start ceph-mon-all
@@ -47,7 +51,8 @@ To start all daemons of a particular type, execute one of the following::
Stopping all Daemons by Type
----------------------------
-To stop all daemons of a particular type, execute one of the following::
+To stop all daemons of a particular type on a Ceph Node, execute one of the
+following::
sudo stop ceph-osd-all
sudo stop ceph-mon-all
@@ -57,7 +62,8 @@ To stop all daemons of a particular type, execute one of the following::
Starting a Daemon
-----------------
-To start a specific daemon instance, execute one of the following::
+To start a specific daemon instance on a Ceph Node, execute one of the
+following::
sudo start ceph-osd id={id}
sudo start ceph-mon id={hostname}
@@ -73,7 +79,8 @@ For example::
Stopping a Daemon
-----------------
-To stop a specific daemon instance, execute one of the following::
+To stop a specific daemon instance on a Ceph Node, execute one of the
+following::
sudo stop ceph-osd id={id}
sudo stop ceph-mon id={hostname}
diff --git a/src/auth/cephx/CephxKeyServer.cc b/src/auth/cephx/CephxKeyServer.cc
index e0c8174a2a1..e57b5575142 100644
--- a/src/auth/cephx/CephxKeyServer.cc
+++ b/src/auth/cephx/CephxKeyServer.cc
@@ -163,7 +163,7 @@ bool KeyServer::_check_rotating_secrets()
ldout(cct, 10) << __func__ << " added " << added << dendl;
data.rotating_ver++;
//data.next_rotating_time = ceph_clock_now(cct);
- //data.next_rotating_time += MIN(g_conf->auth_mon_ticket_ttl, g_conf->auth_service_ticket_ttl);
+ //data.next_rotating_time += MIN(cct->_conf->auth_mon_ticket_ttl, cct->_conf->auth_service_ticket_ttl);
_dump_rotating_secrets();
return true;
}
@@ -191,7 +191,7 @@ int KeyServer::_rotate_secret(uint32_t service_id)
RotatingSecrets& r = data.rotating_secrets[service_id];
int added = 0;
utime_t now = ceph_clock_now(cct);
- double ttl = service_id == CEPH_ENTITY_TYPE_AUTH ? g_conf->auth_mon_ticket_ttl : g_conf->auth_service_ticket_ttl;
+ double ttl = service_id == CEPH_ENTITY_TYPE_AUTH ? cct->_conf->auth_mon_ticket_ttl : cct->_conf->auth_service_ticket_ttl;
while (r.need_new_secrets(now)) {
ExpiringCryptoKey ek;
@@ -424,7 +424,7 @@ int KeyServer::_build_session_auth_info(uint32_t service_id, CephXServiceTicketI
{
info.service_id = service_id;
info.ticket = auth_ticket_info.ticket;
- info.ticket.init_timestamps(ceph_clock_now(cct), g_conf->auth_service_ticket_ttl);
+ info.ticket.init_timestamps(ceph_clock_now(cct), cct->_conf->auth_service_ticket_ttl);
generate_secret(info.session_key);
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index d8590bff817..dc6f435bdcf 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -465,6 +465,8 @@ int main(int argc, const char **argv)
register_async_signal_handler_oneshot(SIGINT, handle_osd_signal);
register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal);
+ osd->final_init();
+
if (g_conf->inject_early_sigterm)
kill(getpid(), SIGTERM);
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 89405121698..f526f80c929 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -203,6 +203,7 @@ OPTION(mon_leveldb_max_open_files, OPT_INT, 0) // monitor's leveldb max open fil
OPTION(mon_leveldb_compression, OPT_BOOL, false) // monitor's leveldb uses compression
OPTION(mon_leveldb_paranoid, OPT_BOOL, false) // monitor's leveldb paranoid flag
OPTION(mon_leveldb_log, OPT_STR, "")
+OPTION(mon_leveldb_size_warn, OPT_U64, 40*1024*1024*1024) // issue a warning when the monitor's leveldb goes over 40GB (in bytes)
OPTION(paxos_stash_full_interval, OPT_INT, 25) // how often (in commits) to stash a full copy of the PaxosService state
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first sync the monitor stores
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
@@ -524,6 +525,7 @@ OPTION(osd_max_attr_size, OPT_U64, 65536)
OPTION(filestore, OPT_BOOL, false)
/// filestore wb throttle limits
+OPTION(filestore_wbthrottle_enable, OPT_BOOL, true)
OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 41943040)
OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 419430400)
OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 500)
diff --git a/src/common/sharedptr_registry.hpp b/src/common/sharedptr_registry.hpp
index 6579bd4ba71..90043001ee7 100644
--- a/src/common/sharedptr_registry.hpp
+++ b/src/common/sharedptr_registry.hpp
@@ -64,16 +64,21 @@ public:
}
bool get_next(const K &key, pair<K, VPtr> *next) {
- VPtr next_val;
- Mutex::Locker l(lock);
- typename map<K, WeakVPtr>::iterator i = contents.upper_bound(key);
- while (i != contents.end() &&
- !(next_val = i->second.lock()))
- ++i;
- if (i == contents.end())
- return false;
+ pair<K, VPtr> r;
+ {
+ Mutex::Locker l(lock);
+ VPtr next_val;
+ typename map<K, WeakVPtr>::iterator i = contents.upper_bound(key);
+ while (i != contents.end() &&
+ !(next_val = i->second.lock()))
+ ++i;
+ if (i == contents.end())
+ return false;
+ if (next)
+ r = make_pair(i->first, next_val);
+ }
if (next)
- *next = make_pair(i->first, next_val);
+ *next = r;
return true;
}
diff --git a/src/librados/AioCompletionImpl.h b/src/librados/AioCompletionImpl.h
index a40282a7c25..63a56db8aa8 100644
--- a/src/librados/AioCompletionImpl.h
+++ b/src/librados/AioCompletionImpl.h
@@ -32,7 +32,7 @@ struct librados::AioCompletionImpl {
int ref, rval;
bool released;
bool ack, safe;
- eversion_t objver;
+ version_t objver;
rados_callback_t callback_complete, callback_safe;
void *callback_complete_arg, *callback_safe_arg;
@@ -49,6 +49,7 @@ struct librados::AioCompletionImpl {
AioCompletionImpl() : lock("AioCompletionImpl lock", false, false),
ref(1), rval(0), released(false), ack(false), safe(false),
+ objver(0),
callback_complete(0),
callback_safe(0),
callback_complete_arg(0),
@@ -130,9 +131,9 @@ struct librados::AioCompletionImpl {
}
uint64_t get_version() {
lock.Lock();
- eversion_t v = objver;
+ version_t v = objver;
lock.Unlock();
- return v.version;
+ return v;
}
void get() {
diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc
index c7900585458..aaa987316a3 100644
--- a/src/librados/IoCtxImpl.cc
+++ b/src/librados/IoCtxImpl.cc
@@ -502,7 +502,7 @@ int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
Cond cond;
bool done;
int r;
- eversion_t ver;
+ version_t ver;
Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
@@ -536,7 +536,7 @@ int librados::IoCtxImpl::operate_read(const object_t& oid,
Cond cond;
bool done;
int r;
- eversion_t ver;
+ version_t ver;
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
@@ -609,7 +609,6 @@ int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
return -EDOM;
Context *onack = new C_aio_Ack(c);
- eversion_t ver;
c->is_read = true;
c->io = this;
@@ -1002,7 +1001,7 @@ int librados::IoCtxImpl::getxattrs(const object_t& oid,
return r;
}
-void librados::IoCtxImpl::set_sync_op_version(eversion_t& ver)
+void librados::IoCtxImpl::set_sync_op_version(version_t ver)
{
last_objver = ver;
}
@@ -1016,7 +1015,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver,
bool done;
int r;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t objver;
+ version_t objver;
lock->Lock();
@@ -1071,7 +1070,7 @@ int librados::IoCtxImpl::unwatch(const object_t& oid, uint64_t cookie)
bool done;
int r;
Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t ver;
+ version_t ver;
lock->Lock();
client->unregister_watcher(cookie);
@@ -1102,7 +1101,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
bool done, done_all;
int r;
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
- eversion_t objver;
+ version_t objver;
uint64_t cookie;
C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
@@ -1144,7 +1143,7 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b
return r;
}
-eversion_t librados::IoCtxImpl::last_version()
+version_t librados::IoCtxImpl::last_version()
{
return last_objver;
}
diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h
index 74ca1d09880..ccecd4e8184 100644
--- a/src/librados/IoCtxImpl.h
+++ b/src/librados/IoCtxImpl.h
@@ -37,7 +37,7 @@ struct librados::IoCtxImpl {
::SnapContext snapc;
uint64_t assert_ver;
map<object_t, uint64_t> assert_src_version;
- eversion_t last_objver;
+ version_t last_objver;
uint32_t notify_timeout;
object_locator_t oloc;
@@ -183,7 +183,7 @@ struct librados::IoCtxImpl {
int pool_change_auid(unsigned long long auid);
int pool_change_auid_async(unsigned long long auid, PoolAsyncCompletionImpl *c);
- void set_sync_op_version(eversion_t& ver);
+ void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(const object_t& oid, uint64_t cookie);
int notify(const object_t& oid, uint64_t ver, bufferlist& bl);
@@ -191,7 +191,7 @@ struct librados::IoCtxImpl {
const object_t& oid, uint64_t notify_id, uint64_t ver,
uint64_t cookie);
- eversion_t last_version();
+ version_t last_version();
void set_assert_version(uint64_t ver);
void set_assert_src_version(const object_t& oid, uint64_t ver);
void set_notify_timeout(uint32_t timeout);
diff --git a/src/librados/librados.cc b/src/librados/librados.cc
index 0a36092a3d9..f704412559f 100644
--- a/src/librados/librados.cc
+++ b/src/librados/librados.cc
@@ -1108,8 +1108,7 @@ const librados::ObjectIterator& librados::IoCtx::objects_end() const
uint64_t librados::IoCtx::get_last_version()
{
- eversion_t ver = io_ctx_impl->last_version();
- return ver.version;
+ return io_ctx_impl->last_version();
}
int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c,
@@ -2142,8 +2141,7 @@ extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len
extern "C" uint64_t rados_get_last_version(rados_ioctx_t io)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
- eversion_t ver = ctx->last_version();
- return ver.version;
+ return ctx->last_version();
}
extern "C" int rados_pool_create(rados_t cluster, const char *name)
diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc
index 9c9ae16dfa4..abc6ff92a28 100644
--- a/src/librbd/internal.cc
+++ b/src/librbd/internal.cc
@@ -2845,7 +2845,7 @@ reprotect_and_return_err:
{
CephContext *cct = ictx->cct;
ldout(cct, 20) << "aio_write " << ictx << " off = " << off << " len = "
- << len << " buf = " << &buf << dendl;
+ << len << " buf = " << (void*)buf << dendl;
if (!len)
return 0;
diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h
index 65f7b23987c..2defeecc1e0 100644
--- a/src/messages/MOSDOpReply.h
+++ b/src/messages/MOSDOpReply.h
@@ -31,7 +31,7 @@
class MOSDOpReply : public Message {
- static const int HEAD_VERSION = 4;
+ static const int HEAD_VERSION = 5;
static const int COMPAT_VERSION = 2;
object_t oid;
@@ -39,7 +39,9 @@ class MOSDOpReply : public Message {
vector<OSDOp> ops;
int64_t flags;
int32_t result;
- eversion_t reassert_version;
+ eversion_t bad_replay_version;
+ eversion_t replay_version;
+ version_t user_version;
epoch_t osdmap_epoch;
int32_t retry_attempt;
@@ -52,10 +54,38 @@ public:
bool is_onnvram() const { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; }
int get_result() const { return result; }
- eversion_t get_version() { return reassert_version; }
+ eversion_t get_replay_version() { return replay_version; }
+ version_t get_user_version() { return user_version; }
void set_result(int r) { result = r; }
- void set_version(eversion_t v) { reassert_version = v; }
+
+ void set_reply_versions(eversion_t v, version_t uv) {
+ replay_version = v;
+ user_version = uv;
+ /* We go through some shenanigans here for backwards compatibility
+ * with old clients, who do not look at our replay_version and
+ * user_version but instead see what we now call the
+ * bad_replay_version. On pools without caching
+ * the user_version infrastructure is a slightly-laggy copy of
+ * the regular pg version/at_version infrastructure; the difference
+ * being it is not updated on watch ops like that is -- but on updates
+ * it is set equal to at_version. This means that for non-watch write ops
+ * on classic pools, all three of replay_version, user_version, and
+ * bad_replay_version are identical. But for watch ops the replay_version
+ * has been updated, while the user_at_version has not, and the semantics
+ * we promised old clients are that the version they see is not an update.
+ * So set the bad_replay_version to be the same as the user_at_version. */
+ bad_replay_version = v;
+ if (uv) {
+ bad_replay_version.version = uv;
+ }
+ }
+
+ /* Don't fill in replay_version for non-write ops */
+ void set_enoent_reply_versions(eversion_t v, version_t uv) {
+ user_version = uv;
+ bad_replay_version = v;
+ }
void add_flags(int f) { flags |= f; }
@@ -99,7 +129,7 @@ public:
oid = req->oid;
pgid = req->pgid;
osdmap_epoch = e;
- reassert_version = req->reassert_version;
+ user_version = 0;
retry_attempt = req->get_retry_attempt();
// zero out ops payload_len
@@ -121,6 +151,7 @@ public:
head.layout.ol_pgid = pgid.get_old_pg().v;
head.flags = flags;
head.osdmap_epoch = osdmap_epoch;
+ head.reassert_version = bad_replay_version;
head.result = result;
head.num_ops = ops.size();
head.object_len = oid.name.length();
@@ -134,7 +165,7 @@ public:
::encode(pgid, payload);
::encode(flags, payload);
::encode(result, payload);
- ::encode(reassert_version, payload);
+ ::encode(bad_replay_version, payload);
::encode(osdmap_epoch, payload);
__u32 num_ops = ops.size();
@@ -146,6 +177,9 @@ public:
for (unsigned i = 0; i < num_ops; i++)
::encode(ops[i].rval, payload);
+
+ ::encode(replay_version, payload);
+ ::encode(user_version, payload);
}
}
virtual void decode_payload() {
@@ -161,7 +195,8 @@ public:
pgid = pg_t(head.layout.ol_pgid);
result = head.result;
flags = head.flags;
- reassert_version = head.reassert_version;
+ replay_version = head.reassert_version;
+ user_version = replay_version.version;
osdmap_epoch = head.osdmap_epoch;
retry_attempt = -1;
} else {
@@ -169,7 +204,7 @@ public:
::decode(pgid, p);
::decode(flags, p);
::decode(result, p);
- ::decode(reassert_version, p);
+ ::decode(bad_replay_version, p);
::decode(osdmap_epoch, p);
__u32 num_ops = ops.size();
@@ -189,6 +224,13 @@ public:
OSDOp::split_osd_op_vector_out_data(ops, data);
}
+
+ if (header.version >= 5) {
+ ::decode(replay_version, p);
+ ::decode(user_version, p);
+ } else
+ replay_version = bad_replay_version;
+ user_version = replay_version.version;
}
}
diff --git a/src/mon/DataHealthService.cc b/src/mon/DataHealthService.cc
index 6e8aa313a36..5fc745ce11d 100644
--- a/src/mon/DataHealthService.cc
+++ b/src/mon/DataHealthService.cc
@@ -81,6 +81,18 @@ health_status_t DataHealthService::get_health(
health_detail = "low disk space!";
}
+ if (stats.store_stats.bytes_total >= g_conf->mon_leveldb_size_warn) {
+ if (health_status > HEALTH_WARN)
+ health_status = HEALTH_WARN;
+ if (!health_detail.empty())
+ health_detail.append("; ");
+ stringstream ss;
+ ss << "store is getting too big! "
+ << prettybyte_t(stats.store_stats.bytes_total)
+ << " >= " << prettybyte_t(g_conf->mon_leveldb_size_warn);
+ health_detail.append(ss.str());
+ }
+
if (overall_status > health_status)
overall_status = health_status;
@@ -95,18 +107,15 @@ health_status_t DataHealthService::get_health(
if (f) {
f->open_object_section("mon");
f->dump_string("name", mon_name.c_str());
- f->dump_int("kb_total", stats.kb_total);
- f->dump_int("kb_used", stats.kb_used);
- f->dump_int("kb_avail", stats.kb_avail);
- f->dump_int("avail_percent", stats.latest_avail_percent);
- f->dump_stream("last_updated") << stats.last_update;
+ // leave this unenclosed by an object section to avoid breaking backward-compatibility
+ stats.dump(f);
f->dump_stream("health") << health_status;
if (health_status != HEALTH_OK)
- f->dump_string("health_detail", health_detail);
+ f->dump_string("health_detail", health_detail);
f->close_section();
}
}
-
+
if (f) {
f->close_section(); // mons
f->close_section(); // data_health
@@ -115,6 +124,22 @@ health_status_t DataHealthService::get_health(
return overall_status;
}
+int DataHealthService::update_store_stats(DataStats &ours)
+{
+ map<string,uint64_t> extra;
+ uint64_t store_size = mon->store->get_estimated_size(extra);
+ assert(store_size > 0);
+
+ ours.store_stats.bytes_total = store_size;
+ ours.store_stats.bytes_sst = extra["sst"];
+ ours.store_stats.bytes_log = extra["log"];
+ ours.store_stats.bytes_misc = extra["misc"];
+ ours.last_update = ceph_clock_now(g_ceph_context);
+
+ return 0;
+}
+
+
int DataHealthService::update_stats()
{
struct statfs stbuf;
@@ -135,7 +160,8 @@ int DataHealthService::update_stats()
<< " total " << ours.kb_total << " used " << ours.kb_used << " avail " << ours.kb_avail
<< dendl;
ours.last_update = ceph_clock_now(g_ceph_context);
- return 0;
+
+ return update_store_stats(ours);
}
void DataHealthService::share_stats()
diff --git a/src/mon/DataHealthService.h b/src/mon/DataHealthService.h
index 337e7a450f7..750c58e5f80 100644
--- a/src/mon/DataHealthService.h
+++ b/src/mon/DataHealthService.h
@@ -34,6 +34,7 @@ class DataHealthService :
int last_warned_percent;
void handle_tell(MMonHealth *m);
+ int update_store_stats(DataStats &ours);
int update_stats();
void share_stats();
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 45ca02027fc..10f5bfb149c 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -1292,7 +1292,8 @@ void Monitor::handle_probe_reply(MMonProbe *m)
dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
dout(10) << " monmap is " << *monmap << dendl;
- if (!is_probing()) {
+ // discover name and addrs during probing or electing states.
+ if (!is_probing() && !is_electing()) {
m->put();
return;
}
@@ -1326,6 +1327,12 @@ void Monitor::handle_probe_reply(MMonProbe *m)
<< peer_name << " -> " << m->name << " in my monmap"
<< dendl;
monmap->rename(peer_name, m->name);
+
+ if (is_electing()) {
+ m->put();
+ bootstrap();
+ return;
+ }
} else {
dout(10) << " peer name is " << peer_name << dendl;
}
@@ -1342,6 +1349,12 @@ void Monitor::handle_probe_reply(MMonProbe *m)
}
}
+ // end discover phase
+ if (!is_probing()) {
+ m->put();
+ return;
+ }
+
assert(paxos != NULL);
if (is_synchronizing()) {
diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h
index 276620f7516..85f6c895145 100644
--- a/src/mon/MonitorDBStore.h
+++ b/src/mon/MonitorDBStore.h
@@ -509,6 +509,10 @@ class MonitorDBStore
db->compact_prefix(prefix);
}
+ uint64_t get_estimated_size(map<string, uint64_t> &extras) {
+ return db->get_estimated_size(extras);
+ }
+
MonitorDBStore(const string& path) :
db(0), do_dump(false), dump_fd(-1) {
string::const_reverse_iterator rit;
@@ -523,8 +527,8 @@ class MonitorDBStore
LevelDBStore *db_ptr = new LevelDBStore(g_ceph_context, full_path);
if (!db_ptr) {
- std::cout << __func__ << " error initializing level db back storage in "
- << full_path << std::endl;
+ derr << __func__ << " error initializing level db back storage in "
+ << full_path << dendl;
assert(0 != "MonitorDBStore: error initializing level db back storage");
}
db.reset(db_ptr);
diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h
index 0eae3b172bf..0ae1aaf8d5e 100644
--- a/src/mon/mon_types.h
+++ b/src/mon/mon_types.h
@@ -40,6 +40,52 @@ inline const char *get_paxos_name(int p) {
#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v012"
+/**
+ * leveldb store stats
+ *
+ * If we ever decide to support multiple backends for the monitor store,
+ * we should then create an abstract class 'MonitorStoreStats' of sorts
+ * and inherit it on LevelDBStoreStats. I'm sure you'll figure something
+ * out.
+ */
+struct LevelDBStoreStats {
+ uint64_t bytes_total;
+ uint64_t bytes_sst;
+ uint64_t bytes_log;
+ uint64_t bytes_misc;
+ utime_t last_update;
+
+ void dump(Formatter *f) const {
+ assert(f != NULL);
+ f->dump_int("bytes_total", bytes_total);
+ f->dump_int("bytes_sst", bytes_sst);
+ f->dump_int("bytes_log", bytes_log);
+ f->dump_int("bytes_misc", bytes_misc);
+ f->dump_stream("last_updated") << last_update;
+ }
+
+ void encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(bytes_total, bl);
+ ::encode(bytes_sst, bl);
+ ::encode(bytes_log, bl);
+ ::encode(bytes_misc, bl);
+ ::encode(last_update, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator &p) {
+ DECODE_START(1, p);
+ ::decode(bytes_total, p);
+ ::decode(bytes_sst, p);
+ ::decode(bytes_log, p);
+ ::decode(bytes_misc, p);
+ ::decode(last_update, p);
+ DECODE_FINISH(p);
+ }
+};
+WRITE_CLASS_ENCODER(LevelDBStoreStats);
+
// data stats
struct DataStats {
@@ -50,13 +96,29 @@ struct DataStats {
int latest_avail_percent;
utime_t last_update;
+ LevelDBStoreStats store_stats;
+
+ void dump(Formatter *f) const {
+ assert(f != NULL);
+ f->dump_int("kb_total", kb_total);
+ f->dump_int("kb_used", kb_used);
+ f->dump_int("kb_avail", kb_avail);
+ f->dump_int("avail_percent", latest_avail_percent);
+ f->dump_stream("last_updated") << last_update;
+
+ f->open_object_section("store_stats");
+ store_stats.dump(f);
+ f->close_section();
+ }
+
void encode(bufferlist &bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(kb_total, bl);
::encode(kb_used, bl);
::encode(kb_avail, bl);
::encode(latest_avail_percent, bl);
::encode(last_update, bl);
+ ::encode(store_stats, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &p) {
@@ -66,10 +128,12 @@ struct DataStats {
::decode(kb_avail, p);
::decode(latest_avail_percent, p);
::decode(last_update, p);
+ if (struct_v > 1)
+ ::decode(store_stats, p);
+
DECODE_FINISH(p);
}
};
-
WRITE_CLASS_ENCODER(DataStats);
struct ScrubResult {
diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc
index 6e8de53467f..b95260b7e16 100644
--- a/src/objclass/class_api.cc
+++ b/src/objclass/class_api.cc
@@ -582,7 +582,7 @@ uint64_t cls_current_version(cls_method_context_t hctx)
{
ReplicatedPG::OpContext *ctx = *(ReplicatedPG::OpContext **)hctx;
- return ctx->at_version.version;
+ return ctx->user_at_version;
}
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 81407373685..d4d540df876 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -2600,7 +2600,8 @@ int FileStore::_write(coll_t cid, const hobject_t& oid,
r = bl.length();
// flush?
- if (!replaying)
+ if (!replaying &&
+ g_conf->filestore_wbthrottle_enable)
wbthrottle.queue_wb(fd, oid, offset, len, replica);
lfn_close(fd);
diff --git a/src/os/KeyValueDB.h b/src/os/KeyValueDB.h
index f62bca996a5..e98463aa763 100644
--- a/src/os/KeyValueDB.h
+++ b/src/os/KeyValueDB.h
@@ -165,6 +165,8 @@ public:
);
}
+ virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) = 0;
+
virtual ~KeyValueDB() {}
protected:
diff --git a/src/os/LevelDBStore.h b/src/os/LevelDBStore.h
index f3809cf3496..356ee59aa27 100644
--- a/src/os/LevelDBStore.h
+++ b/src/os/LevelDBStore.h
@@ -20,6 +20,12 @@
#include "leveldb/filter_policy.h"
#endif
+#include <errno.h>
+#include "common/errno.h"
+#include "common/dout.h"
+#include "include/assert.h"
+#include "common/Formatter.h"
+
#include "common/ceph_context.h"
class PerfCounters;
@@ -300,6 +306,64 @@ public:
return limit;
}
+ virtual uint64_t get_estimated_size(map<string,uint64_t> &extra) {
+ DIR *store_dir = opendir(path.c_str());
+ if (!store_dir) {
+ lderr(cct) << __func__ << " something happened opening the store: "
+ << cpp_strerror(errno) << dendl;
+ return 0;
+ }
+
+ uint64_t total_size = 0;
+ uint64_t sst_size = 0;
+ uint64_t log_size = 0;
+ uint64_t misc_size = 0;
+
+ struct dirent *entry = NULL;
+ while ((entry = readdir(store_dir)) != NULL) {
+ string n(entry->d_name);
+
+ if (n == "." || n == "..")
+ continue;
+
+ string fpath = path + '/' + n;
+ struct stat s;
+ int err = stat(fpath.c_str(), &s);
+ if (err < 0) {
+ lderr(cct) << __func__ << " error obtaining stats for " << fpath
+ << ": " << cpp_strerror(errno) << dendl;
+ goto err;
+ }
+
+ size_t pos = n.find_last_of('.');
+ if (pos == string::npos) {
+ misc_size += s.st_size;
+ continue;
+ }
+
+ string ext = n.substr(pos+1);
+ if (ext == "sst") {
+ sst_size += s.st_size;
+ } else if (ext == "log") {
+ log_size += s.st_size;
+ } else {
+ misc_size += s.st_size;
+ }
+ }
+
+ total_size = sst_size + log_size + misc_size;
+
+ extra["sst"] = sst_size;
+ extra["log"] = log_size;
+ extra["misc"] = misc_size;
+ extra["total"] = total_size;
+
+err:
+ closedir(store_dir);
+ return total_size;
+ }
+
+
protected:
WholeSpaceIterator _get_iterator() {
return std::tr1::shared_ptr<KeyValueDB::WholeSpaceIteratorImpl>(
diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc
index 23e24765cc2..8479b3c878d 100644
--- a/src/os/WBThrottle.cc
+++ b/src/os/WBThrottle.cc
@@ -145,7 +145,7 @@ void *WBThrottle::entry()
while (get_next_should_flush(&wb)) {
clearing = wb.get<0>();
lock.Unlock();
- ::fsync(**wb.get<1>());
+ ::fdatasync(**wb.get<1>());
if (wb.get<2>().nocache)
posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
lock.Lock();
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 3e19f5634c1..66022a3898a 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -1231,6 +1231,44 @@ int OSD::init()
// tick
tick_timer.add_event_after(g_conf->osd_heartbeat_interval, new C_Tick(this));
+ service.init();
+ service.publish_map(osdmap);
+ service.publish_superblock(superblock);
+
+ osd_lock.Unlock();
+
+ r = monc->authenticate();
+ if (r < 0) {
+ monc->shutdown();
+ store->umount();
+ osd_lock.Lock(); // locker is going to unlock this on function exit
+ if (is_stopping())
+ return 0;
+ return r;
+ }
+
+ while (monc->wait_auth_rotating(30.0) < 0) {
+ derr << "unable to obtain rotating service keys; retrying" << dendl;
+ }
+
+ osd_lock.Lock();
+ if (is_stopping())
+ return 0;
+
+ dout(10) << "ensuring pgs have consumed prior maps" << dendl;
+ consume_map();
+ peering_wq.drain();
+
+ dout(10) << "done with init, starting boot process" << dendl;
+ state = STATE_BOOTING;
+ start_boot();
+
+ return 0;
+}
+
+void OSD::final_init()
+{
+ int r;
AdminSocket *admin_socket = cct->get_admin_socket();
asok_hook = new OSDSocketHook(this);
r = admin_socket->register_command("dump_ops_in_flight",
@@ -1323,40 +1361,6 @@ int OSD::init()
test_ops_hook,
"inject metadata error");
assert(r == 0);
-
- service.init();
- service.publish_map(osdmap);
- service.publish_superblock(superblock);
-
- osd_lock.Unlock();
-
- r = monc->authenticate();
- if (r < 0) {
- monc->shutdown();
- store->umount();
- osd_lock.Lock(); // locker is going to unlock this on function exit
- if (is_stopping())
- return 0;
- return r;
- }
-
- while (monc->wait_auth_rotating(30.0) < 0) {
- derr << "unable to obtain rotating service keys; retrying" << dendl;
- }
-
- osd_lock.Lock();
- if (is_stopping())
- return 0;
-
- dout(10) << "ensuring pgs have consumed prior maps" << dendl;
- consume_map();
- peering_wq.drain();
-
- dout(10) << "done with init, starting boot process" << dendl;
- state = STATE_BOOTING;
- start_boot();
-
- return 0;
}
void OSD::create_logger()
@@ -6797,10 +6801,11 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
void OSDService::reply_op_error(OpRequestRef op, int err)
{
- reply_op_error(op, err, eversion_t());
+ reply_op_error(op, err, eversion_t(), 0);
}
-void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+ version_t uv)
{
MOSDOp *m = static_cast<MOSDOp*>(op->request);
assert(m->get_header().type == CEPH_MSG_OSD_OP);
@@ -6809,7 +6814,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags);
Messenger *msgr = client_messenger;
- reply->set_version(v);
+ reply->set_reply_versions(v, uv);
if (m->get_source().is_osd())
msgr = cluster_messenger;
msgr->send_message(reply, m->get_connection());
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 4d8c31e3046..e23c19b8f93 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -414,7 +414,7 @@ public:
void dec_scrubs_active();
void reply_op_error(OpRequestRef op, int err);
- void reply_op_error(OpRequestRef op, int err, eversion_t v);
+ void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
void handle_misdirected_op(PG *pg, OpRequestRef op);
// -- Watch --
@@ -1719,6 +1719,7 @@ public:
// startup/shutdown
int pre_init();
int init();
+ void final_init();
void suicide(int exitcode);
int shutdown();
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index cd5621cddf2..ef64fe37919 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1751,6 +1751,8 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
info.last_update = pg_log.get_head();
child->info.last_update = child->pg_log.get_head();
+ child->info.last_user_version = info.last_user_version;
+
info.log_tail = pg_log.get_tail();
child->info.log_tail = child->pg_log.get_tail();
@@ -2304,6 +2306,7 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl)
// raise last_update.
assert(e.version > info.last_update);
info.last_update = e.version;
+ info.last_user_version = e.user_version;
// log mutation
pg_log.add(e);
diff --git a/src/osd/PGLog.cc b/src/osd/PGLog.cc
index dac1f33fd91..6fcca1b41bd 100644
--- a/src/osd/PGLog.cc
+++ b/src/osd/PGLog.cc
@@ -514,6 +514,7 @@ void PGLog::merge_log(ObjectStore::Transaction& t,
log.index();
info.last_update = log.head = olog.head;
+ info.last_user_version = oinfo.last_user_version;
info.purged_snaps = oinfo.purged_snaps;
// process divergent items
diff --git a/src/osd/PGLog.h b/src/osd/PGLog.h
index 552f9b0cee9..8f192a8eac0 100644
--- a/src/osd/PGLog.h
+++ b/src/osd/PGLog.h
@@ -84,11 +84,11 @@ struct PGLog {
bool logged_req(const osd_reqid_t &r) const {
return caller_ops.count(r);
}
- eversion_t get_request_version(const osd_reqid_t &r) const {
+ const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
hash_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p = caller_ops.find(r);
if (p == caller_ops.end())
- return eversion_t();
- return p->second->version;
+ return NULL;
+ return p->second;
}
void index() {
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 0f904c73a2d..a04ab485e7e 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -905,13 +905,14 @@ void ReplicatedPG::do_op(OpRequestRef op)
return;
}
- eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid);
- if (oldv != eversion_t()) {
+ const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid);
+ if (entry) {
+ const eversion_t& oldv = entry->version;
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
delete ctx;
src_obc.clear();
if (already_complete(oldv)) {
- osd->reply_op_error(op, 0, oldv);
+ osd->reply_op_error(op, 0, oldv, entry->user_version);
} else {
if (m->wants_ack()) {
if (already_ack(oldv)) {
@@ -953,6 +954,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
<< dendl;
}
+ ctx->user_at_version = info.last_user_version;
+
// note my stats
utime_t now = ceph_clock_now(g_ceph_context);
@@ -1021,10 +1024,11 @@ void ReplicatedPG::do_op(OpRequestRef op)
}
ctx->reply->set_result(result);
- if (result >= 0)
- ctx->reply->set_version(ctx->reply_version);
- else if (result == -ENOENT)
- ctx->reply->set_version(info.last_update);
+ if (result >= 0) {
+ ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version);
+ } else if (result == -ENOENT) {
+ ctx->reply->set_enoent_reply_versions(info.last_update, ctx->user_at_version);
+ }
// read or error?
if (ctx->op_t.empty() || result < 0) {
@@ -1567,6 +1571,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
coid,
ctx->at_version,
ctx->obs->oi.version,
+ info.last_user_version,
osd_reqid_t(),
ctx->mtime)
);
@@ -1589,6 +1594,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
coid,
coi.version,
coi.prior_version,
+ info.last_user_version,
osd_reqid_t(),
ctx->mtime)
);
@@ -1613,6 +1619,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
snapoid,
ctx->at_version,
ctx->snapset_obc->obs.oi.version,
+ info.last_user_version,
osd_reqid_t(),
ctx->mtime)
);
@@ -1627,6 +1634,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
snapoid,
ctx->at_version,
ctx->snapset_obc->obs.oi.version,
+ info.last_user_version,
osd_reqid_t(),
ctx->mtime)
);
@@ -2417,9 +2425,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
uint64_t ver = op.watch.ver;
if (!ver)
result = -EINVAL;
- else if (ver < oi.user_version.version)
+ else if (ver < oi.user_version)
result = -ERANGE;
- else if (ver > oi.user_version.version)
+ else if (ver > oi.user_version)
result = -EOVERFLOW;
break;
}
@@ -2534,9 +2542,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
uint64_t ver = op.watch.ver;
if (!ver)
result = -EINVAL;
- else if (ver < src_obc->obs.oi.user_version.version)
+ else if (ver < src_obc->obs.oi.user_version)
result = -ERANGE;
- else if (ver > src_obc->obs.oi.user_version.version)
+ else if (ver > src_obc->obs.oi.user_version)
result = -EOVERFLOW;
break;
}
@@ -2821,7 +2829,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
dout(10) << "watch: ctx->obc=" << (void *)obc.get() << " cookie=" << cookie
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
- dout(10) << "watch: oi.user_version=" << oi.user_version.version << dendl;
+ dout(10) << "watch: oi.user_version=" << oi.user_version<< dendl;
dout(10) << "watch: peer_addr="
<< ctx->op->request->get_connection()->get_peer_addr() << dendl;
@@ -3590,7 +3598,8 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
<< " to " << coid << " v " << ctx->at_version
<< " snaps=" << snaps << dendl;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::CLONE, coid, ctx->at_version,
- ctx->obs->oi.version, ctx->reqid, ctx->new_obs.oi.mtime));
+ ctx->obs->oi.version, info.last_user_version,
+ ctx->reqid, ctx->new_obs.oi.mtime));
::encode(snaps, ctx->log.back().snaps);
ctx->at_version.version++;
@@ -3707,7 +3716,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
p->timeout,
p->cookie,
osd->get_next_id(get_osdmap()->get_epoch()),
- ctx->obc->obs.oi.user_version.version,
+ ctx->obc->obs.oi.user_version,
osd));
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
ctx->obc->watchers.begin();
@@ -3779,7 +3788,6 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
// read-op? done?
if (ctx->op_t.empty() && !ctx->modify) {
- ctx->reply_version = ctx->obs->oi.user_version;
unstable_stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
return result;
}
@@ -3805,7 +3813,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
dout(10) << " removing old " << snapoid << dendl;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid, ctx->at_version, old_version,
- osd_reqid_t(), ctx->mtime));
+ info.last_user_version, osd_reqid_t(), ctx->mtime));
ctx->at_version.version++;
ctx->snapset_obc->obs.exists = false;
@@ -3818,7 +3826,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
dout(10) << " final snapset " << ctx->new_snapset
<< " in " << snapoid << dendl;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, snapoid, ctx->at_version, old_version,
- osd_reqid_t(), ctx->mtime));
+ info.last_user_version, osd_reqid_t(), ctx->mtime));
ctx->snapset_obc = get_object_context(snapoid, true);
ctx->snapset_obc->obs.exists = true;
@@ -3837,11 +3845,16 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
// finish and log the op.
if (ctx->user_modify) {
/* update the user_version for any modify ops, except for the watch op */
- ctx->new_obs.oi.user_version = ctx->at_version;
+ ++ctx->user_at_version;
+ assert(ctx->user_at_version > ctx->new_obs.oi.user_version);
+ /* In order for new clients and old clients to interoperate properly
+ * when exchanging versions, we need to lower bound the user_version
+ * (which our new clients pay proper attention to)
+ * by the at_version (which is all the old clients can ever see). */
+ ctx->user_at_version = MAX(ctx->at_version.version, ctx->user_at_version);
+ ctx->new_obs.oi.user_version = ctx->user_at_version;
}
- ctx->reply_version = ctx->new_obs.oi.user_version;
ctx->bytes_written = ctx->op_t.get_encoded_bytes();
- ctx->new_obs.oi.version = ctx->at_version;
if (ctx->new_obs.exists) {
// on the head object
@@ -3869,7 +3882,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
if (!ctx->new_obs.exists)
logopcode = pg_log_entry_t::DELETE;
ctx->log.push_back(pg_log_entry_t(logopcode, soid, ctx->at_version, old_version,
- ctx->reqid, ctx->mtime));
+ ctx->user_at_version, ctx->reqid, ctx->mtime));
// apply new object state.
ctx->obc->obs = ctx->new_obs;
@@ -4081,7 +4094,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
i != waiting_for_ondisk[repop->v].end();
++i) {
- osd->reply_op_error(*i, 0, repop->v);
+ osd->reply_op_error(*i, 0, repop->v, 0);
}
waiting_for_ondisk.erase(repop->v);
}
@@ -4097,8 +4110,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
- else
+ else {
reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
+ }
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
@@ -4119,6 +4135,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
++i) {
MOSDOp *m = (MOSDOp*)(*i)->request;
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
reply->add_flags(CEPH_OSD_FLAG_ACK);
osd->send_message_osd_client(reply, m->get_connection());
}
@@ -4130,8 +4148,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
- else
+ else {
reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
+ reply->set_reply_versions(repop->ctx->at_version,
+ repop->ctx->user_at_version);
+ }
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
@@ -4472,6 +4493,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
ctx->at_version,
obc->obs.oi.version,
+ info.last_user_version,
osd_reqid_t(), ctx->mtime));
eversion_t old_last_update = pg_log.get_head();
@@ -4844,71 +4866,41 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
rm->epoch_started = get_osdmap()->get_epoch();
if (!m->noop) {
- if (m->logbl.length()) {
- // shipped transaction and log entries
- vector<pg_log_entry_t> log;
-
- bufferlist::iterator p = m->get_data().begin();
-
- ::decode(rm->opt, p);
- if (!(m->get_connection()->get_features() & CEPH_FEATURE_OSD_SNAPMAPPER))
- rm->opt.set_tolerate_collection_add_enoent();
- p = m->logbl.begin();
- ::decode(log, p);
- if (m->hobject_incorrect_pool) {
- for (vector<pg_log_entry_t>::iterator i = log.begin();
- i != log.end();
- ++i) {
- if (i->soid.pool == -1)
- i->soid.pool = info.pgid.pool();
- }
- rm->opt.set_pool_override(info.pgid.pool());
- }
- rm->opt.set_replica();
-
- info.stats = m->pg_stats;
- if (!rm->opt.empty()) {
- // If the opt is non-empty, we infer we are before
- // last_backfill (according to the primary, not our
- // not-quite-accurate value), and should update the
- // collections now. Otherwise, we do it later on push.
- update_snap_map(log, rm->localt);
+ assert(m->logbl.length());
+ // shipped transaction and log entries
+ vector<pg_log_entry_t> log;
+
+ bufferlist::iterator p = m->get_data().begin();
+
+ ::decode(rm->opt, p);
+ if (!(m->get_connection()->get_features() & CEPH_FEATURE_OSD_SNAPMAPPER))
+ rm->opt.set_tolerate_collection_add_enoent();
+ p = m->logbl.begin();
+ ::decode(log, p);
+ if (m->hobject_incorrect_pool) {
+ for (vector<pg_log_entry_t>::iterator i = log.begin();
+ i != log.end();
+ ++i) {
+ if (i->soid.pool == -1)
+ i->soid.pool = info.pgid.pool();
}
- append_log(log, m->pg_trim_to, rm->localt);
-
- rm->tls.push_back(&rm->localt);
- rm->tls.push_back(&rm->opt);
-
- } else {
- // do op
- assert(0);
-
- // TODO: this is severely broken because we don't know whether this object is really lost or
- // not. We just always assume that it's not right now.
- // Also, we're taking the address of a variable on the stack.
- object_info_t oi(soid);
- oi.lost = false; // I guess?
- oi.version = m->old_version;
- oi.size = m->old_size;
- ObjectState obs(oi, m->old_exists);
- SnapSetContext ssc(m->poid.oid);
-
- rm->ctx = new OpContext(op, m->reqid, m->ops, &obs, &ssc, this);
-
- rm->ctx->mtime = m->mtime;
- rm->ctx->at_version = m->version;
- rm->ctx->snapc = m->snapc;
+ rm->opt.set_pool_override(info.pgid.pool());
+ }
+ rm->opt.set_replica();
- ssc.snapset = m->snapset;
- rm->ctx->obc->ssc = &ssc;
-
- prepare_transaction(rm->ctx);
- append_log(rm->ctx->log, m->pg_trim_to, rm->ctx->local_t);
-
- rm->tls.push_back(&rm->ctx->op_t);
- rm->tls.push_back(&rm->ctx->local_t);
+ info.stats = m->pg_stats;
+ if (!rm->opt.empty()) {
+ // If the opt is non-empty, we infer we are before
+ // last_backfill (according to the primary, not our
+ // not-quite-accurate value), and should update the
+ // collections now. Otherwise, we do it later on push.
+ update_snap_map(log, rm->localt);
}
+ append_log(log, m->pg_trim_to, rm->localt);
+ rm->tls.push_back(&rm->localt);
+ rm->tls.push_back(&rm->opt);
+
rm->bytes_written = rm->opt.get_encoded_bytes();
} else {
@@ -6420,7 +6412,7 @@ ObjectContextRef ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
// Add log entry
++info.last_update.version;
- pg_log_entry_t e(what, oid, info.last_update, version, osd_reqid_t(), mtime);
+ pg_log_entry_t e(what, oid, info.last_update, version, info.last_user_version, osd_reqid_t(), mtime);
pg_log.add(e);
ObjectContextRef obc = get_object_context(oid, true);
@@ -6491,7 +6483,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
++info.last_update.version;
pg_log_entry_t e(
pg_log_entry_t::LOST_REVERT, oid, info.last_update,
- m->second.need, osd_reqid_t(), mtime);
+ m->second.need, info.last_user_version, osd_reqid_t(), mtime);
e.reverting_to = prev;
pg_log.add(e);
dout(10) << e << dendl;
@@ -6508,7 +6500,7 @@ void ReplicatedPG::mark_all_unfound_lost(int what)
// log it
++info.last_update.version;
pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, info.last_update, m->second.need,
- osd_reqid_t(), mtime);
+ info.last_user_version, osd_reqid_t(), mtime);
pg_log.add(e);
dout(10) << e << dendl;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 0fbe5afd9ca..bce141834ca 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -122,7 +122,7 @@ public:
utime_t mtime;
SnapContext snapc; // writer snap context
eversion_t at_version; // pg's current version pointer
- eversion_t reply_version; // the version that we report the client (depends on the op)
+ version_t user_at_version; // pg's current user version pointer
int current_osd_subop_num;
@@ -154,7 +154,7 @@ public:
op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
new_obs(_obs->oi, _obs->exists),
modify(false), user_modify(false),
- bytes_written(0), bytes_read(0),
+ bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
data_off(0), reply(NULL), pg(_pg),
num_read(0),
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index ea3e5d5c3eb..390c6a16baf 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -1553,7 +1553,7 @@ void pg_history_t::generate_test_instances(list<pg_history_t*>& o)
void pg_info_t::encode(bufferlist &bl) const
{
- ENCODE_START(27, 26, bl);
+ ENCODE_START(28, 26, bl);
::encode(pgid, bl);
::encode(last_update, bl);
::encode(last_complete, bl);
@@ -1563,12 +1563,13 @@ void pg_info_t::encode(bufferlist &bl) const
history.encode(bl);
::encode(purged_snaps, bl);
::encode(last_epoch_started, bl);
+ ::encode(last_user_version, bl);
ENCODE_FINISH(bl);
}
void pg_info_t::decode(bufferlist::iterator &bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(27, 26, 26, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(28, 26, 26, bl);
if (struct_v < 23) {
old_pg_t opgid;
::decode(opgid, bl);
@@ -1598,6 +1599,10 @@ void pg_info_t::decode(bufferlist::iterator &bl)
} else {
::decode(last_epoch_started, bl);
}
+ if (struct_v >= 28)
+ ::decode(last_user_version, bl);
+ else
+ last_user_version = last_update.version;
DECODE_FINISH(bl);
}
@@ -1609,6 +1614,7 @@ void pg_info_t::dump(Formatter *f) const
f->dump_stream("last_update") << last_update;
f->dump_stream("last_complete") << last_complete;
f->dump_stream("log_tail") << log_tail;
+ f->dump_int("last_user_version", last_user_version);
f->dump_stream("last_backfill") << last_backfill;
f->dump_stream("purged_snaps") << purged_snaps;
f->open_object_section("history");
@@ -1634,6 +1640,7 @@ void pg_info_t::generate_test_instances(list<pg_info_t*>& o)
o.back()->pgid = pg_t(1, 2, -1);
o.back()->last_update = eversion_t(3, 4);
o.back()->last_complete = eversion_t(5, 6);
+ o.back()->last_user_version = 2;
o.back()->log_tail = eversion_t(7, 8);
o.back()->last_backfill = hobject_t(object_t("objname"), "key", 123, 456, -1, "");
list<pg_stat_t*> s;
@@ -1912,7 +1919,7 @@ void pg_log_entry_t::decode_with_checksum(bufferlist::iterator& p)
void pg_log_entry_t::encode(bufferlist &bl) const
{
- ENCODE_START(7, 4, bl);
+ ENCODE_START(8, 4, bl);
::encode(op, bl);
::encode(soid, bl);
::encode(version, bl);
@@ -1934,12 +1941,13 @@ void pg_log_entry_t::encode(bufferlist &bl) const
if (op == LOST_REVERT)
::encode(prior_version, bl);
::encode(snaps, bl);
+ ::encode(user_version, bl);
ENCODE_FINISH(bl);
}
void pg_log_entry_t::decode(bufferlist::iterator &bl)
{
- DECODE_START_LEGACY_COMPAT_LEN(7, 4, 4, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(8, 4, 4, bl);
::decode(op, bl);
if (struct_v < 2) {
sobject_t old_soid;
@@ -1976,6 +1984,11 @@ void pg_log_entry_t::decode(bufferlist::iterator &bl)
::decode(snaps, bl);
}
+ if (struct_v >= 8)
+ ::decode(user_version, bl);
+ else
+ user_version = version.version;
+
DECODE_FINISH(bl);
}
@@ -2008,7 +2021,8 @@ void pg_log_entry_t::generate_test_instances(list<pg_log_entry_t*>& o)
o.push_back(new pg_log_entry_t());
hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
o.push_back(new pg_log_entry_t(MODIFY, oid, eversion_t(1,2), eversion_t(3,4),
- osd_reqid_t(entity_name_t::CLIENT(777), 8, 999), utime_t(8,9)));
+ 1, osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
+ utime_t(8,9)));
}
ostream& operator<<(ostream& out, const pg_log_entry_t& e)
@@ -2688,7 +2702,10 @@ void object_info_t::encode(bufferlist& bl) const
::encode(truncate_size, bl);
::encode(lost, bl);
::encode(old_watchers, bl);
- ::encode(user_version, bl);
+ /* shenanigans to avoid breaking backwards compatibility in the disk format.
+ * When we can, switch this out for simply putting the version_t on disk. */
+ eversion_t user_eversion(0, user_version);
+ ::encode(user_eversion, bl);
::encode(uses_tmap, bl);
::encode(watchers, bl);
ENCODE_FINISH(bl);
@@ -2733,7 +2750,9 @@ void object_info_t::decode(bufferlist::iterator& bl)
lost = false;
if (struct_v >= 4) {
::decode(old_watchers, bl);
- ::decode(user_version, bl);
+ eversion_t user_eversion;
+ ::decode(user_eversion, bl);
+ user_version = user_eversion.version;
}
if (struct_v >= 9)
::decode(uses_tmap, bl);
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 6cdacc9902c..9b2beb7e8a5 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -1267,6 +1267,8 @@ struct pg_info_t {
eversion_t last_complete; // last version pg was complete through.
epoch_t last_epoch_started;// last epoch at which this pg started on this osd
+ version_t last_user_version; // last user object version applied to store
+
eversion_t log_tail; // oldest log entry.
hobject_t last_backfill; // objects >= this and < last_complete may be missing
@@ -1278,11 +1280,13 @@ struct pg_info_t {
pg_history_t history;
pg_info_t()
- : last_epoch_started(0), last_backfill(hobject_t::get_max())
+ : last_epoch_started(0), last_user_version(0),
+ last_backfill(hobject_t::get_max())
{ }
pg_info_t(pg_t p)
: pgid(p),
- last_epoch_started(0), last_backfill(hobject_t::get_max())
+ last_epoch_started(0), last_user_version(0),
+ last_backfill(hobject_t::get_max())
{ }
bool is_empty() const { return last_update.version == 0; }
@@ -1481,6 +1485,7 @@ struct pg_log_entry_t {
__s32 op;
hobject_t soid;
eversion_t version, prior_version, reverting_to;
+ version_t user_version; // the user version for this entry
osd_reqid_t reqid; // caller+tid to uniquely identify request
utime_t mtime; // this is the _user_ mtime, mind you
bufferlist snaps; // only for clone entries
@@ -1490,12 +1495,14 @@ struct pg_log_entry_t {
uint64_t offset; // [soft state] my offset on disk
pg_log_entry_t()
- : op(0), invalid_hash(false), invalid_pool(false), offset(0) {}
+ : op(0), user_version(0),
+ invalid_hash(false), invalid_pool(false), offset(0) {}
pg_log_entry_t(int _op, const hobject_t& _soid,
const eversion_t& v, const eversion_t& pv,
+ version_t uv,
const osd_reqid_t& rid, const utime_t& mt)
: op(_op), soid(_soid), version(v),
- prior_version(pv),
+ prior_version(pv), user_version(uv),
reqid(rid), mtime(mt), invalid_hash(false), invalid_pool(false),
offset(0) {}
@@ -1952,7 +1959,7 @@ struct object_info_t {
string category;
eversion_t version, prior_version;
- eversion_t user_version;
+ version_t user_version;
osd_reqid_t last_reqid;
uint64_t size;
@@ -1983,12 +1990,12 @@ struct object_info_t {
static void generate_test_instances(list<object_info_t*>& o);
explicit object_info_t()
- : size(0), lost(false),
+ : user_version(0), size(0), lost(false),
truncate_seq(0), truncate_size(0), uses_tmap(false)
{}
object_info_t(const hobject_t& s)
- : soid(s), size(0),
+ : soid(s), user_version(0), size(0),
lost(false), truncate_seq(0), truncate_size(0), uses_tmap(false) {}
object_info_t(bufferlist& bl) {
diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc
index f94dc7baf6c..9fb0bfa446d 100644
--- a/src/osdc/Objecter.cc
+++ b/src/osdc/Objecter.cc
@@ -357,7 +357,7 @@ tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, utime_t mtime,
bufferlist& inbl, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver)
+ version_t *objver)
{
LingerOp *info = new LingerOp;
info->oid = oid;
@@ -388,7 +388,7 @@ tid_t Objecter::linger_read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onfinish,
- eversion_t *objver)
+ version_t *objver)
{
LingerOp *info = new LingerOp;
info->oid = oid;
@@ -1455,8 +1455,8 @@ void Objecter::send_op(Op *op)
m->set_mtime(op->mtime);
m->set_retry_attempt(op->attempts++);
- if (op->version != eversion_t())
- m->set_version(op->version); // we're replaying this op!
+ if (op->replay_version != eversion_t())
+ m->set_version(op->replay_version); // we're replaying this op!
if (op->priority)
m->set_priority(op->priority);
@@ -1525,7 +1525,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
ldout(cct, 7) << "handle_osd_op_reply " << tid
<< (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
- << " v " << m->get_version() << " in " << m->get_pg()
+ << " v " << m->get_replay_version() << " uv " << m->get_user_version()
+ << " in " << m->get_pg()
<< " attempt " << m->get_retry_attempt()
<< dendl;
Op *op = ops[tid];
@@ -1562,7 +1563,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
}
if (op->objver)
- *op->objver = m->get_version();
+ *op->objver = m->get_user_version();
if (op->reply_epoch)
*op->reply_epoch = m->get_map_epoch();
@@ -1602,7 +1603,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
// ack|commit -> ack
if (op->onack) {
ldout(cct, 15) << "handle_osd_op_reply ack" << dendl;
- op->version = m->get_version();
+ op->replay_version = m->get_replay_version();
onack = op->onack;
op->onack = 0; // only do callback once
num_unacked--;
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 090fb331611..7041ab984f7 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -768,12 +768,12 @@ public:
Context *onack, *oncommit;
tid_t tid;
- eversion_t version; // for op replay
+ eversion_t replay_version; // for op replay
int attempts;
bool paused;
- eversion_t *objver;
+ version_t *objver;
epoch_t *reply_epoch;
utime_t stamp;
@@ -787,7 +787,7 @@ public:
bool should_resend;
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
- int f, Context *ac, Context *co, eversion_t *ov) :
+ int f, Context *ac, Context *co, version_t *ov) :
session(NULL), session_item(this), incarnation(0),
oid(o), oloc(ol),
used_replica(false), con(NULL),
@@ -1005,7 +1005,7 @@ public:
vector<OSDOp> ops;
bufferlist inbl;
bufferlist *poutbl;
- eversion_t *pobjver;
+ version_t *pobjver;
bool registered;
Context *on_reg_ack, *on_reg_commit;
@@ -1283,7 +1283,7 @@ private:
tid_t mutate(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
const SnapContext& snapc, utime_t mtime, int flags,
- Context *onack, Context *oncommit, eversion_t *objver = NULL) {
+ Context *onack, Context *oncommit, version_t *objver = NULL) {
Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->priority = op.priority;
o->mtime = mtime;
@@ -1293,7 +1293,7 @@ private:
tid_t read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snapid, bufferlist *pbl, int flags,
- Context *onack, eversion_t *objver = NULL) {
+ Context *onack, version_t *objver = NULL) {
Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onack, NULL, objver);
o->priority = op.priority;
o->snapid = snapid;
@@ -1308,12 +1308,12 @@ private:
const SnapContext& snapc, utime_t mtime,
bufferlist& inbl, int flags,
Context *onack, Context *onfinish,
- eversion_t *objver);
+ version_t *objver);
tid_t linger_read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onack,
- eversion_t *objver);
+ version_t *objver);
void unregister_linger(uint64_t linger_id);
/**
@@ -1347,7 +1347,7 @@ private:
tid_t stat(const object_t& oid, const object_locator_t& oloc, snapid_t snap,
uint64_t *psize, utime_t *pmtime, int flags,
Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_STAT;
@@ -1361,7 +1361,7 @@ private:
tid_t read(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_READ;
@@ -1379,7 +1379,7 @@ private:
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
uint64_t trunc_size, __u32 trunc_seq,
Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_READ;
@@ -1395,7 +1395,7 @@ private:
tid_t mapext(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_MAPEXT;
@@ -1411,7 +1411,7 @@ private:
tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
const char *name, snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_GETXATTR;
@@ -1428,7 +1428,7 @@ private:
tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, snapid_t snap,
map<string,bufferlist>& attrset,
int flags, Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
@@ -1442,7 +1442,7 @@ private:
tid_t read_full(const object_t& oid, const object_locator_t& oloc,
snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
}
@@ -1451,7 +1451,7 @@ private:
vector<OSDOp>& ops, utime_t mtime,
const SnapContext& snapc, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL) {
+ version_t *objver = NULL) {
Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
@@ -1461,7 +1461,7 @@ private:
uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl,
utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_WRITE;
@@ -1479,7 +1479,7 @@ private:
uint64_t len, const SnapContext& snapc, const bufferlist &bl,
utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_APPEND;
@@ -1498,7 +1498,7 @@ private:
utime_t mtime, int flags,
uint64_t trunc_size, __u32 trunc_seq,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_WRITE;
@@ -1515,7 +1515,7 @@ private:
tid_t write_full(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
@@ -1532,7 +1532,7 @@ private:
utime_t mtime, int flags,
uint64_t trunc_size, __u32 trunc_seq,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
@@ -1547,7 +1547,7 @@ private:
tid_t zero(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_ZERO;
@@ -1561,7 +1561,7 @@ private:
tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, snapid_t snapid,
utime_t mtime, Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
@@ -1575,7 +1575,7 @@ private:
const SnapContext& snapc, utime_t mtime,
int global_flags, int create_flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_CREATE;
@@ -1588,7 +1588,7 @@ private:
tid_t remove(const object_t& oid, const object_locator_t& oloc,
const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_DELETE;
@@ -1599,7 +1599,7 @@ private:
}
tid_t lock(const object_t& oid, const object_locator_t& oloc, int op, int flags,
- Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ Context *onack, Context *oncommit, version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
SnapContext snapc; // no snapc for lock ops
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
@@ -1612,7 +1612,7 @@ private:
const char *name, const SnapContext& snapc, const bufferlist &bl,
utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_SETXATTR;
@@ -1630,7 +1630,7 @@ private:
const char *name, const SnapContext& snapc,
utime_t mtime, int flags,
Context *onack, Context *oncommit,
- eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
+ version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_RMXATTR;
diff --git a/src/test/ObjectMap/KeyValueDBMemory.h b/src/test/ObjectMap/KeyValueDBMemory.h
index 93d0809d491..5cffce3ef04 100644
--- a/src/test/ObjectMap/KeyValueDBMemory.h
+++ b/src/test/ObjectMap/KeyValueDBMemory.h
@@ -126,6 +126,24 @@ public:
return static_cast<TransactionImpl_*>(trans.get())->complete();
}
+ uint64_t get_estimated_size(map<string,uint64_t> &extras) {
+ uint64_t total_size = 0;
+
+ for (map<pair<string,string>,bufferlist>::iterator p = db.begin();
+ p != db.end(); ++p) {
+ string prefix = p->first.first;
+ bufferlist &bl = p->second;
+
+ uint64_t sz = bl.length();
+ total_size += sz;
+ if (extras.count(prefix) == 0)
+ extras[prefix] = 0;
+ extras[prefix] += sz;
+ }
+
+ return total_size;
+ }
+
private:
bool exists_prefix(const string &prefix) {
std::map<std::pair<string,string>,bufferlist>::iterator it;
diff --git a/src/test/ObjectMap/test_store_tool/test_store_tool.cc b/src/test/ObjectMap/test_store_tool/test_store_tool.cc
index ace91220df6..f81598ccfb8 100644
--- a/src/test/ObjectMap/test_store_tool/test_store_tool.cc
+++ b/src/test/ObjectMap/test_store_tool/test_store_tool.cc
@@ -90,6 +90,17 @@ class StoreTool
exists = false;
return bufferlist();
}
+
+ uint64_t get_size() {
+ map<string,uint64_t> extras;
+ uint64_t s = db->get_estimated_size(extras);
+ for (map<string,uint64_t>::iterator p = extras.begin();
+ p != extras.end(); ++p) {
+ std::cout << p->first << " - " << p->second << std::endl;
+ }
+ std::cout << "total: " << s << std::endl;
+ return s;
+ }
};
void usage(const char *pname)
@@ -101,6 +112,7 @@ void usage(const char *pname)
<< " exists <prefix> [key]\n"
<< " get <prefix> <key>\n"
<< " verify <store path>\n"
+ << " get-size\n"
<< std::endl;
}
@@ -173,6 +185,8 @@ int main(int argc, const char *argv[])
} else if (cmd == "verify") {
assert(0);
+ } else if (cmd == "get-size") {
+ std::cout << "estimated store size: " << st.get_size() << std::endl;
} else {
std::cerr << "Unrecognized command: " << cmd << std::endl;
return 1;
diff --git a/src/test/common/test_sharedptr_registry.cc b/src/test/common/test_sharedptr_registry.cc
index aec2107c9e5..b1713a9bd9f 100644
--- a/src/test/common/test_sharedptr_registry.cc
+++ b/src/test/common/test_sharedptr_registry.cc
@@ -137,8 +137,8 @@ TEST_F(SharedPtrRegistry_all, wait_lookup_or_create) {
EXPECT_TRUE(registry.lookup_or_create(key + 12345));
registry.remove(key);
ASSERT_TRUE(wait_for(registry, 0));
- EXPECT_TRUE(t.ptr);
t.join();
+ EXPECT_TRUE(t.ptr);
}
{
unsigned int key = 2;
@@ -163,9 +163,9 @@ TEST_F(SharedPtrRegistry_all, wait_lookup_or_create) {
}
registry.remove(key);
ASSERT_TRUE(wait_for(registry, 0));
+ t.join();
EXPECT_TRUE(t.ptr);
EXPECT_EQ(value, *t.ptr);
- t.join();
}
}
@@ -200,8 +200,8 @@ TEST_F(SharedPtrRegistry_all, wait_lookup) {
EXPECT_FALSE(registry.lookup(key + 12345));
registry.remove(key);
ASSERT_TRUE(wait_for(registry, 0));
- EXPECT_FALSE(t.ptr);
t.join();
+ EXPECT_FALSE(t.ptr);
}
TEST_F(SharedPtrRegistry_all, get_next) {
@@ -238,6 +238,24 @@ TEST_F(SharedPtrRegistry_all, get_next) {
EXPECT_FALSE(registry.get_next(i.first, &i));
}
+ {
+ //
+ // http://tracker.ceph.com/issues/6117
+ // reproduce the issue.
+ //
+ SharedPtrRegistryTest registry;
+ const unsigned int key1 = 111;
+ shared_ptr<int> *ptr1 = new shared_ptr<int>(registry.lookup_or_create(key1));
+ const unsigned int key2 = 222;
+ shared_ptr<int> ptr2 = registry.lookup_or_create(key2);
+
+ pair<unsigned int, shared_ptr<int> > i;
+ EXPECT_TRUE(registry.get_next(i.first, &i));
+ EXPECT_EQ(key1, i.first);
+ delete ptr1;
+ EXPECT_TRUE(registry.get_next(i.first, &i));
+ EXPECT_EQ(key2, i.first);
+ }
}
class SharedPtrRegistry_destructor : public ::testing::Test {
diff --git a/src/test/test_osd_types.cc b/src/test/test_osd_types.cc
index e07c9e06592..34674358285 100644
--- a/src/test/test_osd_types.cc
+++ b/src/test/test_osd_types.cc
@@ -712,7 +712,8 @@ TEST(pg_missing_t, add_next_event)
eversion_t version(10,5);
eversion_t prior_version(3,4);
pg_log_entry_t sample_e(pg_log_entry_t::DELETE, oid, version, prior_version,
- osd_reqid_t(entity_name_t::CLIENT(777), 8, 999), utime_t(8,9));
+ 0, osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
+ utime_t(8,9));
// new object (MODIFY)
{