summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorathanatos <rexludorum@gmail.com>2013-09-26 12:01:19 -0700
committerathanatos <rexludorum@gmail.com>2013-09-26 12:01:19 -0700
commit97c56ef3ec3f3641cc6ee536842e21c1a70d1dfe (patch)
tree3bb43147b301812dab522198bfe12fada1a19355
parent62626b49c4286355fab038d6aab80f2ef7675d69 (diff)
parenta1f6b14e7fcd680f74a33e92845e54cbde69a5a3 (diff)
downloadceph-97c56ef3ec3f3641cc6ee536842e21c1a70d1dfe.tar.gz
Merge pull request #631 from ceph/wip-5857-8
Wip 5857 8 Reviewed-by: Sage Weil <sage@inktank.com>
-rw-r--r--doc/dev/osd_internals/erasure_coding/recovery.rst4
-rw-r--r--src/common/WorkQueue.h37
-rw-r--r--src/common/hobject.h24
-rw-r--r--src/include/Context.h20
-rw-r--r--src/os/FileStore.cc12
-rw-r--r--src/os/FileStore.h4
-rw-r--r--src/osd/Makefile.am3
-rw-r--r--src/osd/OSD.cc39
-rw-r--r--src/osd/OSD.h15
-rw-r--r--src/osd/PG.cc70
-rw-r--r--src/osd/PG.h15
-rw-r--r--src/osd/PGBackend.h210
-rw-r--r--src/osd/ReplicatedBackend.cc196
-rw-r--r--src/osd/ReplicatedBackend.h309
-rw-r--r--src/osd/ReplicatedPG.cc1219
-rw-r--r--src/osd/ReplicatedPG.h391
-rw-r--r--src/osd/osd_types.h1
17 files changed, 1722 insertions, 847 deletions
diff --git a/doc/dev/osd_internals/erasure_coding/recovery.rst b/doc/dev/osd_internals/erasure_coding/recovery.rst
new file mode 100644
index 00000000000..793a5b003dc
--- /dev/null
+++ b/doc/dev/osd_internals/erasure_coding/recovery.rst
@@ -0,0 +1,4 @@
+===================
+PGBackend Recovery
+===================
+
diff --git a/src/common/WorkQueue.h b/src/common/WorkQueue.h
index b2742accdce..794b577a71d 100644
--- a/src/common/WorkQueue.h
+++ b/src/common/WorkQueue.h
@@ -390,6 +390,43 @@ public:
void drain(WorkQueue_* wq = 0);
};
+class GenContextWQ :
+ public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
+ list<GenContext<ThreadPool::TPHandle&>*> _queue;
+public:
+ GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
+ : ThreadPool::WorkQueueVal<
+ GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
+
+ void _enqueue(GenContext<ThreadPool::TPHandle&> *c) {
+ _queue.push_back(c);
+ };
+ void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) {
+ _queue.push_front(c);
+ }
+ bool _empty() {
+ return _queue.empty();
+ }
+ GenContext<ThreadPool::TPHandle&> *_dequeue() {
+ assert(!_queue.empty());
+ GenContext<ThreadPool::TPHandle&> *c = _queue.front();
+ _queue.pop_front();
+ return c;
+ }
+ void _process(GenContext<ThreadPool::TPHandle&> *c, ThreadPool::TPHandle &tp) {
+ c->complete(tp);
+ }
+};
+class C_QueueInWQ : public Context {
+ GenContextWQ *wq;
+ GenContext<ThreadPool::TPHandle&> *c;
+public:
+ C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
+ : wq(wq), c(c) {}
+ void finish(int) {
+ wq->queue(c);
+ }
+};
#endif
diff --git a/src/common/hobject.h b/src/common/hobject.h
index f8f58b4a245..e483b664347 100644
--- a/src/common/hobject.h
+++ b/src/common/hobject.h
@@ -79,6 +79,30 @@ public:
return ret;
}
+ /// @return head version of this hobject_t
+ hobject_t get_head() const {
+ hobject_t ret(*this);
+ ret.snap = CEPH_NOSNAP;
+ return ret;
+ }
+
+ /// @return snapdir version of this hobject_t
+ hobject_t get_snapdir() const {
+ hobject_t ret(*this);
+ ret.snap = CEPH_SNAPDIR;
+ return ret;
+ }
+
+ /// @return true if object is neither head nor snapdir
+ bool is_snap() const {
+ return (snap != CEPH_NOSNAP) && (snap != CEPH_SNAPDIR);
+ }
+
+ /// @return true iff the object should have a snapset in it's attrs
+ bool has_snapset() const {
+ return !is_snap();
+ }
+
/* Do not use when a particular hash function is needed */
explicit hobject_t(const sobject_t &o) :
oid(o.oid), snap(o.snap), max(false), pool(-1) {
diff --git a/src/include/Context.h b/src/include/Context.h
index 9ec4414a047..663313ceec1 100644
--- a/src/include/Context.h
+++ b/src/include/Context.h
@@ -28,6 +28,26 @@
#define mydout(cct, v) lgeneric_subdout(cct, context, v)
/*
+ * GenContext - abstract callback class
+ */
+template <typename T>
+class GenContext {
+ GenContext(const GenContext& other);
+ const GenContext& operator=(const GenContext& other);
+
+ protected:
+ virtual void finish(T t) = 0;
+
+ public:
+ GenContext() {}
+ virtual ~GenContext() {} // we want a virtual destructor!!!
+ virtual void complete(T t) {
+ finish(t);
+ delete this;
+ }
+};
+
+/*
* Context - abstract callback class
*/
class Context {
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 5cd22e2b348..343fb25c0e4 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -1812,7 +1812,7 @@ int FileStore::_do_transactions(
for (list<Transaction*>::iterator p = tls.begin();
p != tls.end();
++p, trans_num++) {
- r = _do_transaction(**p, op_seq, trans_num);
+ r = _do_transaction(**p, op_seq, trans_num, handle);
if (r < 0)
break;
if (handle)
@@ -2074,7 +2074,9 @@ int FileStore::_check_replay_guard(int fd, const SequencerPosition& spos)
}
}
-unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_num)
+unsigned FileStore::_do_transaction(
+ Transaction& t, uint64_t op_seq, int trans_num,
+ ThreadPool::TPHandle *handle)
{
dout(10) << "_do_transaction on " << &t << dendl;
@@ -2082,6 +2084,9 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
SequencerPosition spos(op_seq, trans_num, 0);
while (i.have_op()) {
+ if (handle)
+ handle->reset_tp_timeout();
+
int op = i.get_op();
int r = 0;
@@ -4019,6 +4024,7 @@ int FileStore::collection_list_partial(coll_t c, ghobject_t start,
int min, int max, snapid_t seq,
vector<ghobject_t> *ls, ghobject_t *next)
{
+ dout(10) << "collection_list_partial: " << c << dendl;
Index index;
int r = get_index(c, &index);
if (r < 0)
@@ -4030,6 +4036,8 @@ int FileStore::collection_list_partial(coll_t c, ghobject_t start,
assert(!m_filestore_fail_eio || r != -EIO);
return r;
}
+ if (ls)
+ dout(20) << "objects: " << *ls << dendl;
return 0;
}
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index 8fd726767a1..b9017985a34 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -365,7 +365,9 @@ public:
int do_transactions(list<Transaction*> &tls, uint64_t op_seq) {
return _do_transactions(tls, op_seq, 0);
}
- unsigned _do_transaction(Transaction& t, uint64_t op_seq, int trans_num);
+ unsigned _do_transaction(
+ Transaction& t, uint64_t op_seq, int trans_num,
+ ThreadPool::TPHandle *handle);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
TrackedOpRef op = TrackedOpRef());
diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am
index ea7c036f858..9d3bc1d5e47 100644
--- a/src/osd/Makefile.am
+++ b/src/osd/Makefile.am
@@ -9,6 +9,7 @@ libosd_la_SOURCES = \
osd/PG.cc \
osd/PGLog.cc \
osd/ReplicatedPG.cc \
+ osd/ReplicatedBackend.cc \
osd/Ager.cc \
osd/OSD.cc \
osd/OSDCap.cc \
@@ -35,6 +36,8 @@ noinst_HEADERS += \
osd/PG.h \
osd/PGLog.h \
osd/ReplicatedPG.h \
+ osd/PGBackend.h \
+ osd/ReplicatedBackend.h \
osd/Watch.h \
osd/osd_types.h
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 529fb6ffb1b..9a2fbb5c576 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -180,6 +180,7 @@ OSDService::OSDService(OSD *osd) :
scrub_wq(osd->scrub_wq),
scrub_finalize_wq(osd->scrub_finalize_wq),
rep_scrub_wq(osd->rep_scrub_wq),
+ push_wq("push_wq", cct->_conf->osd_recovery_thread_timeout, &osd->recovery_tp),
class_handler(osd->class_handler),
publish_lock("OSDService::publish_lock"),
pre_publish_lock("OSDService::pre_publish_lock"),
@@ -3418,16 +3419,16 @@ void OSD::RemoveWQ::_process(pair<PGRef, DeletingStateRef> item)
if (!item.second->start_clearing())
return;
- if (pg->have_temp_coll()) {
+ list<coll_t> colls_to_remove;
+ pg->get_colls(&colls_to_remove);
+ for (list<coll_t>::iterator i = colls_to_remove.begin();
+ i != colls_to_remove.end();
+ ++i) {
bool cont = remove_dir(
- pg->cct, store, &mapper, &driver, pg->osr.get(), pg->get_temp_coll(), item.second);
+ pg->cct, store, &mapper, &driver, pg->osr.get(), *i, item.second);
if (!cont)
return;
}
- bool cont = remove_dir(
- pg->cct, store, &mapper, &driver, pg->osr.get(), coll, item.second);
- if (!cont)
- return;
if (!item.second->start_deleting())
return;
@@ -3438,9 +3439,12 @@ void OSD::RemoveWQ::_process(pair<PGRef, DeletingStateRef> item)
OSD::make_infos_oid(),
pg->log_oid,
t);
- if (pg->have_temp_coll())
- t->remove_collection(pg->get_temp_coll());
- t->remove_collection(coll);
+
+ for (list<coll_t>::iterator i = colls_to_remove.begin();
+ i != colls_to_remove.end();
+ ++i) {
+ t->remove_collection(*i);
+ }
// We need the sequencer to stick around until the op is complete
store->queue_transaction(
@@ -5895,22 +5899,11 @@ void OSD::split_pgs(
dout(10) << "m_seed " << i->ps() << dendl;
dout(10) << "split_bits is " << split_bits << dendl;
- rctx->transaction->create_collection(
- coll_t(*i));
- rctx->transaction->split_collection(
- coll_t(parent->info.pgid),
+ parent->split_colls(
+ *i,
split_bits,
i->m_seed,
- coll_t(*i));
- if (parent->have_temp_coll()) {
- rctx->transaction->create_collection(
- coll_t::make_temp_coll(*i));
- rctx->transaction->split_collection(
- coll_t::make_temp_coll(parent->info.pgid),
- split_bits,
- i->m_seed,
- coll_t::make_temp_coll(*i));
- }
+ rctx->transaction);
parent->split_into(
*i,
child,
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 15dc0440352..5fe667344a9 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -307,6 +307,7 @@ public:
ThreadPool::WorkQueue<PG> &scrub_wq;
ThreadPool::WorkQueue<PG> &scrub_finalize_wq;
ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
+ GenContextWQ push_wq;
ClassHandler *&class_handler;
void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
@@ -635,6 +636,20 @@ public:
OSDService(OSD *osd);
~OSDService();
};
+
+struct C_OSD_SendMessageOnConn: public Context {
+ OSDService *osd;
+ Message *reply;
+ ConnectionRef conn;
+ C_OSD_SendMessageOnConn(
+ OSDService *osd,
+ Message *reply,
+ ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {}
+ void finish(int) {
+ osd->send_message_osd_cluster(reply, conn.get());
+ }
+};
+
class OSD : public Dispatcher,
public md_config_obs_t {
/** OSD **/
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 6afa5599376..f1985bf961b 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1399,76 +1399,6 @@ void PG::queue_op(OpRequestRef op)
osd->op_wq.queue(make_pair(PGRef(this), op));
}
-void PG::do_request(
- OpRequestRef op,
- ThreadPool::TPHandle &handle)
-{
- // do any pending flush
- do_pending_flush();
-
- if (!op_has_sufficient_caps(op)) {
- osd->reply_op_error(op, -EPERM);
- return;
- }
- assert(!op_must_wait_for_map(get_osdmap(), op));
- if (can_discard_request(op)) {
- return;
- }
- if (!flushed) {
- dout(20) << " !flushed, waiting for active on " << op << dendl;
- waiting_for_active.push_back(op);
- return;
- }
-
- switch (op->request->get_type()) {
- case CEPH_MSG_OSD_OP:
- if (is_replay() || !is_active()) {
- dout(20) << " replay, waiting for active on " << op << dendl;
- waiting_for_active.push_back(op);
- return;
- }
- do_op(op); // do it now
- break;
-
- case MSG_OSD_SUBOP:
- do_sub_op(op);
- break;
-
- case MSG_OSD_SUBOPREPLY:
- do_sub_op_reply(op);
- break;
-
- case MSG_OSD_PG_SCAN:
- do_scan(op, handle);
- break;
-
- case MSG_OSD_PG_BACKFILL:
- do_backfill(op);
- break;
-
- case MSG_OSD_PG_PUSH:
- if (!is_active()) {
- waiting_for_active.push_back(op);
- op->mark_delayed("waiting for active");
- return;
- }
- do_push(op);
- break;
-
- case MSG_OSD_PG_PULL:
- do_pull(op);
- break;
-
- case MSG_OSD_PG_PUSH_REPLY:
- do_push_reply(op);
- break;
-
- default:
- assert(0 == "bad message type in do_request");
- }
-}
-
-
void PG::replay_queued_ops()
{
assert(is_replay() && is_active());
diff --git a/src/osd/PG.h b/src/osd/PG.h
index cdbe827a4a9..74809eea268 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -870,8 +870,12 @@ public:
virtual void _scrub(ScrubMap &map) { }
virtual void _scrub_clear_state() { }
virtual void _scrub_finish() { }
- virtual coll_t get_temp_coll() = 0;
- virtual bool have_temp_coll() = 0;
+ virtual void get_colls(list<coll_t> *out) = 0;
+ virtual void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) = 0;
virtual bool _report_snap_collection_errors(
const hobject_t &hoid,
const map<string, bufferptr> &attrs,
@@ -1789,10 +1793,10 @@ public:
// abstract bits
- void do_request(
+ virtual void do_request(
OpRequestRef op,
ThreadPool::TPHandle &handle
- );
+ ) = 0;
virtual void do_op(OpRequestRef op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
@@ -1802,9 +1806,6 @@ public:
ThreadPool::TPHandle &handle
) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
- virtual void do_push(OpRequestRef op) = 0;
- virtual void do_pull(OpRequestRef op) = 0;
- virtual void do_push_reply(OpRequestRef op) = 0;
virtual void snap_trimmer() = 0;
virtual int do_command(cmdmap_t cmdmap, ostream& ss,
diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h
new file mode 100644
index 00000000000..e3cc05bf345
--- /dev/null
+++ b/src/osd/PGBackend.h
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef PGBACKEND_H
+#define PGBACKEND_H
+
+#include "osd_types.h"
+#include "include/Context.h"
+#include <string>
+
+ /**
+ * PGBackend
+ *
+ * PGBackend defines an interface for logic handling IO and
+ * replication on RADOS objects. The PGBackend implementation
+ * is responsible for:
+ *
+ * 1) Handling client operations
+ * 2) Handling object recovery
+ * 3) Handling object access
+ */
+ class PGBackend {
+ public:
+ /**
+ * Provides interfaces for PGBackend callbacks
+ *
+ * The intention is that the parent calls into the PGBackend
+ * implementation holding a lock and that the callbacks are
+ * called under the same locks.
+ */
+ class Listener {
+ public:
+ /// Recovery
+
+ virtual void on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t) = 0;
+ /**
+ * Called with the transaction recovering oid
+ */
+ virtual void on_local_recover(
+ const hobject_t &oid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectContextRef obc,
+ ObjectStore::Transaction *t
+ ) = 0;
+
+ /**
+ * Called when transaction recovering oid is durable and
+ * applied on all replicas
+ */
+ virtual void on_global_recover(const hobject_t &oid) = 0;
+
+ /**
+ * Called when peer is recovered
+ */
+ virtual void on_peer_recover(
+ int peer,
+ const hobject_t &oid,
+ const ObjectRecoveryInfo &recovery_info,
+ const object_stat_sum_t &stat
+ ) = 0;
+
+ virtual void begin_peer_recover(
+ int peer,
+ const hobject_t oid) = 0;
+
+ virtual void failed_push(int from, const hobject_t &soid) = 0;
+
+
+ virtual void cancel_pull(const hobject_t &soid) = 0;
+
+ /**
+ * Bless a context
+ *
+ * Wraps a context in whatever outer layers the parent usually
+ * uses to call into the PGBackend
+ */
+ virtual Context *bless_context(Context *c) = 0;
+ virtual GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) = 0;
+
+ virtual void send_message(int to_osd, Message *m) = 0;
+ virtual void queue_transaction(ObjectStore::Transaction *t) = 0;
+ virtual epoch_t get_epoch() = 0;
+ virtual const vector<int> &get_acting() = 0;
+ virtual std::string gen_dbg_prefix() const = 0;
+
+ virtual const map<hobject_t, set<int> > &get_missing_loc() = 0;
+ virtual const map<int, pg_missing_t> &get_peer_missing() = 0;
+ virtual const map<int, pg_info_t> &get_peer_info() = 0;
+ virtual const pg_missing_t &get_local_missing() = 0;
+ virtual const PGLog &get_log() = 0;
+ virtual bool pgb_is_primary() const = 0;
+ virtual OSDMapRef pgb_get_osdmap() const = 0;
+ virtual const pg_info_t &get_info() const = 0;
+
+ virtual ObjectContextRef get_obc(
+ const hobject_t &hoid,
+ map<string, bufferptr> &attrs) = 0;
+
+ virtual ~Listener() {}
+ };
+ Listener *parent;
+ Listener *get_parent() const { return parent; }
+ PGBackend(Listener *l) : parent(l) {}
+ bool is_primary() const { return get_parent()->pgb_is_primary(); }
+ OSDMapRef get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ std::string gen_prefix() const {
+ return parent->gen_dbg_prefix();
+ }
+
+ /**
+ * RecoveryHandle
+ *
+ * We may want to recover multiple objects in the same set of
+ * messages. RecoveryHandle is an interface for the opaque
+ * object used by the implementation to store the details of
+ * the pending recovery operations.
+ */
+ struct RecoveryHandle {
+ virtual ~RecoveryHandle() {}
+ };
+
+ /// Get a fresh recovery operation
+ virtual RecoveryHandle *open_recovery_op() = 0;
+
+ /// run_recovery_op: finish the operation represented by h
+ virtual void run_recovery_op(
+ RecoveryHandle *h, ///< [in] op to finish
+ int priority ///< [in] msg priority
+ ) = 0;
+
+ /**
+ * recover_object
+ *
+ * Triggers a recovery operation on the specified hobject_t
+ * onreadable must be called before onwriteable
+ *
+ * On each replica (primary included), get_parent()->on_not_missing()
+ * must be called when the transaction finalizing the recovery
+ * is queued. Similarly, get_parent()->on_readable() must be called
+ * when the transaction is applied in the backing store.
+ *
+ * get_parent()->on_not_degraded() should be called on the primary
+ * when writes can resume on the object.
+ *
+ * obc may be NULL if the primary lacks the object.
+ *
+ * head may be NULL only if the head/snapdir is missing
+ *
+ * @param missing [in] set of info, missing pairs for queried nodes
+ * @param overlaps [in] mapping of object to file offset overlaps
+ */
+ virtual void recover_object(
+ const hobject_t &hoid, ///< [in] object to recover
+ ObjectContextRef head, ///< [in] context of the head/snapdir object
+ ObjectContextRef obc, ///< [in] context of the object
+ RecoveryHandle *h ///< [in,out] handle to attach recovery op to
+ ) = 0;
+
+ /// gives PGBackend a crack at an incoming message
+ virtual bool handle_message(
+ OpRequestRef op ///< [in] message received
+ ) = 0; ///< @return true if the message was handled
+
+ virtual void check_recovery_sources(const OSDMapRef osdmap) = 0;
+
+ /**
+ * implementation should clear itself, contexts blessed prior to on_change
+ * won't be called after on_change()
+ */
+ virtual void on_change(ObjectStore::Transaction *t) = 0;
+ virtual void clear_state() = 0;
+
+ virtual void on_flushed() = 0;
+
+
+ virtual void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) = 0;
+
+ virtual void temp_colls(list<coll_t> *out) = 0;
+
+ virtual void dump_recovery_info(Formatter *f) const = 0;
+
+ virtual coll_t get_temp_coll(ObjectStore::Transaction *t) = 0;
+ virtual void add_temp_obj(const hobject_t &oid) = 0;
+ virtual void clear_temp_obj(const hobject_t &oid) = 0;
+
+ virtual ~PGBackend() {}
+ };
+
+#endif
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
new file mode 100644
index 00000000000..9868e7af2c8
--- /dev/null
+++ b/src/osd/ReplicatedBackend.cc
@@ -0,0 +1,196 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "ReplicatedBackend.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPushReply.h"
+
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
+ return *_dout << pgb->get_parent()->gen_dbg_prefix();
+}
+
+ReplicatedBackend::ReplicatedBackend(
+ PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
+ PGBackend(pg), temp_created(false),
+ temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
+ coll(coll), osd(osd), cct(osd->cct) {}
+
+void ReplicatedBackend::run_recovery_op(
+ PGBackend::RecoveryHandle *_h,
+ int priority)
+{
+ RPGHandle *h = static_cast<RPGHandle *>(_h);
+ send_pushes(priority, h->pushes);
+ send_pulls(priority, h->pulls);
+ delete h;
+}
+
+void ReplicatedBackend::recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *_h
+ )
+{
+ dout(10) << __func__ << ": " << hoid << dendl;
+ RPGHandle *h = static_cast<RPGHandle *>(_h);
+ if (get_parent()->get_local_missing().is_missing(hoid)) {
+ assert(!obc);
+ // pull
+ prepare_pull(
+ hoid,
+ head,
+ h);
+ return;
+ } else {
+ assert(obc);
+ int started = start_pushes(
+ hoid,
+ obc,
+ h);
+ assert(started > 0);
+ }
+}
+
+void ReplicatedBackend::check_recovery_sources(const OSDMapRef osdmap)
+{
+ for(map<int, set<hobject_t> >::iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ) {
+ if (osdmap->is_down(i->first)) {
+ dout(10) << "check_recovery_sources resetting pulls from osd." << i->first
+ << ", osdmap has it marked down" << dendl;
+ for (set<hobject_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ assert(pulling.count(*j) == 1);
+ get_parent()->cancel_pull(*j);
+ pulling.erase(*j);
+ }
+ pull_from_peer.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+bool ReplicatedBackend::handle_message(
+ OpRequestRef op
+ )
+{
+ dout(10) << __func__ << ": " << op << dendl;
+ switch (op->request->get_type()) {
+ case MSG_OSD_PG_PUSH:
+ // TODOXXX: needs to be active possibly
+ do_push(op);
+ return true;
+
+ case MSG_OSD_PG_PULL:
+ do_pull(op);
+ return true;
+
+ case MSG_OSD_PG_PUSH_REPLY:
+ do_push_reply(op);
+ return true;
+
+ case MSG_OSD_SUBOP: {
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ if (m->ops.size() >= 1) {
+ OSDOp *first = &m->ops[0];
+ switch (first->op.op) {
+ case CEPH_OSD_OP_PULL:
+ sub_op_pull(op);
+ return true;
+ case CEPH_OSD_OP_PUSH:
+ // TODOXXX: needs to be active possibly
+ sub_op_push(op);
+ return true;
+ default:
+ break;
+ }
+ }
+ break;
+ }
+
+ case MSG_OSD_SUBOPREPLY: {
+ MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
+ if (r->ops.size() >= 1) {
+ OSDOp &first = r->ops[0];
+ switch (first.op.op) {
+ case CEPH_OSD_OP_PUSH:
+ // continue peer recovery
+ sub_op_push_reply(op);
+ return true;
+ }
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+ return false;
+}
+
+void ReplicatedBackend::clear_state()
+{
+ // clear pushing/pulling maps
+ pushing.clear();
+ pulling.clear();
+ pull_from_peer.clear();
+}
+
+void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
+{
+ dout(10) << __func__ << dendl;
+ // clear temp
+ for (set<hobject_t>::iterator i = temp_contents.begin();
+ i != temp_contents.end();
+ ++i) {
+ dout(10) << __func__ << ": Removing oid "
+ << *i << " from the temp collection" << dendl;
+ t->remove(get_temp_coll(t), *i);
+ }
+ temp_contents.clear();
+ clear_state();
+}
+
+coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t)
+{
+ if (temp_created)
+ return temp_coll;
+ if (!osd->store->collection_exists(temp_coll))
+ t->create_collection(temp_coll);
+ temp_created = true;
+ return temp_coll;
+}
+
+void ReplicatedBackend::on_flushed()
+{
+ if (have_temp_coll() &&
+ !osd->store->collection_empty(get_temp_coll())) {
+ vector<hobject_t> objects;
+ osd->store->collection_list(get_temp_coll(), objects);
+ derr << __func__ << ": found objects in the temp collection: "
+ << objects << ", crashing now"
+ << dendl;
+ assert(0 == "found garbage in the temp collection");
+ }
+}
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
new file mode 100644
index 00000000000..e34e55a618e
--- /dev/null
+++ b/src/osd/ReplicatedBackend.h
@@ -0,0 +1,309 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef REPBACKEND_H
+#define REPBACKEND_H
+
+#include "OSD.h"
+#include "PGBackend.h"
+#include "osd_types.h"
+
+struct C_ReplicatedBackend_OnPullComplete;
+class ReplicatedBackend : public PGBackend {
+ struct RPGHandle : public PGBackend::RecoveryHandle {
+ map<int, vector<PushOp> > pushes;
+ map<int, vector<PullOp> > pulls;
+ };
+ friend struct C_ReplicatedBackend_OnPullComplete;
+private:
+ bool temp_created;
+ const coll_t temp_coll;
+ coll_t get_temp_coll() const {
+ return temp_coll;
+ }
+ bool have_temp_coll() const { return temp_created; }
+
+ // Track contents of temp collection, clear on reset
+ set<hobject_t> temp_contents;
+public:
+ coll_t coll;
+ OSDService *osd;
+ CephContext *cct;
+
+ ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
+
+ /// @see PGBackend::open_recovery_op
+ RPGHandle *_open_recovery_op() {
+ return new RPGHandle();
+ }
+ PGBackend::RecoveryHandle *open_recovery_op() {
+ return _open_recovery_op();
+ }
+
+ /// @see PGBackend::run_recovery_op
+ void run_recovery_op(
+ PGBackend::RecoveryHandle *h,
+ int priority);
+
+ /// @see PGBackend::recover_object
+ void recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *h
+ );
+
+ void check_recovery_sources(const OSDMapRef osdmap);
+
+ /// @see PGBackend::handle_message
+ bool handle_message(
+ OpRequestRef op
+ );
+
+ void on_change(ObjectStore::Transaction *t);
+ void clear_state();
+ void on_flushed();
+
+ void temp_colls(list<coll_t> *out) {
+ if (temp_created)
+ out->push_back(temp_coll);
+ }
+ void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) {
+ coll_t target = coll_t::make_temp_coll(child);
+ if (!temp_created)
+ return;
+ t->create_collection(target);
+ t->split_collection(
+ temp_coll,
+ split_bits,
+ seed,
+ target);
+ }
+
+ virtual void dump_recovery_info(Formatter *f) const {
+ {
+ f->open_array_section("pull_from_peer");
+ for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ++i) {
+ f->open_object_section("pulling_from");
+ f->dump_int("pull_from", i->first);
+ {
+ f->open_array_section("pulls");
+ for (set<hobject_t>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("pull_info");
+ assert(pulling.count(*j));
+ pulling.find(*j)->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ {
+ f->open_array_section("pushing");
+ for (map<hobject_t, map<int, PushInfo> >::const_iterator i =
+ pushing.begin();
+ i != pushing.end();
+ ++i) {
+ f->open_object_section("object");
+ f->dump_stream("pushing") << i->first;
+ {
+ f->open_array_section("pushing_to");
+ for (map<int, PushInfo>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("push_progress");
+ f->dump_stream("object_pushing") << j->first;
+ {
+ f->open_object_section("push_info");
+ j->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ }
+private:
+ // push
+ struct PushInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+ };
+ map<hobject_t, map<int, PushInfo> > pushing;
+
+ // pull
+ struct PullInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ ObjectContextRef head_ctx;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+
+ bool is_complete() const {
+ return recovery_progress.is_complete(recovery_info);
+ }
+ };
+
+ coll_t get_temp_coll(ObjectStore::Transaction *t);
+ void add_temp_obj(const hobject_t &oid) {
+ temp_contents.insert(oid);
+ }
+ void clear_temp_obj(const hobject_t &oid) {
+ temp_contents.erase(oid);
+ }
+
+ map<hobject_t, PullInfo> pulling;
+
+ // Reverse mapping from osd peer to objects beging pulled from that peer
+ map<int, set<hobject_t> > pull_from_peer;
+
+ void sub_op_push(OpRequestRef op);
+ void sub_op_push_reply(OpRequestRef op);
+ void sub_op_pull(OpRequestRef op);
+
+ void _do_push(OpRequestRef op);
+ void _do_pull_response(OpRequestRef op);
+ void do_push(OpRequestRef op) {
+ if (is_primary()) {
+ _do_pull_response(op);
+ } else {
+ _do_push(op);
+ }
+ }
+ void do_pull(OpRequestRef op);
+ void do_push_reply(OpRequestRef op);
+
+ bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
+ void handle_pull(int peer, PullOp &op, PushOp *reply);
+ bool handle_pull_response(
+ int from, PushOp &op, PullOp *response,
+ list<ObjectContextRef> *to_continue,
+ ObjectStore::Transaction *t);
+ void handle_push(int from, PushOp &op, PushReplyOp *response,
+ ObjectStore::Transaction *t);
+
+ static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable);
+ void _failed_push(int from, const hobject_t &soid);
+
+ void send_pushes(int prio, map<int, vector<PushOp> > &pushes);
+ void prep_push_op_blank(const hobject_t& soid, PushOp *op);
+ int send_push_op_legacy(int priority, int peer,
+ PushOp &pop);
+ int send_pull_legacy(int priority, int peer,
+ const ObjectRecoveryInfo& recovery_info,
+ ObjectRecoveryProgress progress);
+ void send_pulls(
+ int priority,
+ map<int, vector<PullOp> > &pulls);
+
+ int build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op,
+ object_stat_sum_t *stat = 0);
+ void submit_push_data(ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ bufferlist omap_header,
+ map<string, bufferptr> &attrs,
+ map<string, bufferlist> &omap_entries,
+ ObjectStore::Transaction *t);
+ void submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t);
+
+ void calc_clone_subsets(
+ SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+ void prepare_pull(
+ const hobject_t& soid,
+ ObjectContextRef headctx,
+ RPGHandle *h);
+ int start_pushes(
+ const hobject_t &soid,
+ ObjectContextRef obj,
+ RPGHandle *h);
+ void prep_push_to_replica(
+ ObjectContextRef obc, const hobject_t& soid, int peer,
+ PushOp *pop);
+ void prep_push(ObjectContextRef obc,
+ const hobject_t& oid, int dest,
+ PushOp *op);
+ void prep_push(ObjectContextRef obc,
+ const hobject_t& soid, int peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets,
+ PushOp *op);
+ void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets);
+ ObjectRecoveryInfo recalc_subsets(
+ const ObjectRecoveryInfo& recovery_info,
+ SnapSetContext *ssc
+ );
+};
+
+#endif
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index d3201c91046..7831f95818d 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -60,8 +60,9 @@
#define dout_subsys ceph_subsys_osd
#define DOUT_PREFIX_ARGS this, osd->whoami, get_osdmap()
#undef dout_prefix
-#define dout_prefix _prefix(_dout, this, osd->whoami, get_osdmap())
-static ostream& _prefix(std::ostream *_dout, PG *pg, int whoami, OSDMapRef osdmap) {
+#define dout_prefix _prefix(_dout, this)
+template <typename T>
+static ostream& _prefix(std::ostream *_dout, T *pg) {
return *_dout << pg->gen_prefix();
}
@@ -79,6 +80,159 @@ PGLSFilter::~PGLSFilter()
{
}
+static void log_subop_stats(
+ OSDService *osd,
+ OpRequestRef op, int tag_inb, int tag_lat)
+{
+ utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t latency = now;
+ latency -= op->request->get_recv_stamp();
+
+ uint64_t inb = op->request->get_data().length();
+
+ osd->logger->inc(l_osd_sop);
+
+ osd->logger->inc(l_osd_sop_inb, inb);
+ osd->logger->tinc(l_osd_sop_lat, latency);
+
+ if (tag_inb)
+ osd->logger->inc(tag_inb, inb);
+ osd->logger->tinc(tag_lat, latency);
+}
+
+// ======================
+// PGBackend::Listener
+
+
+void ReplicatedPG::on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t)
+{
+ pg_log.revise_have(oid, eversion_t());
+ remove_snap_mapped_object(*t, oid);
+ t->remove(coll, oid);
+}
+
+void ReplicatedPG::on_local_recover(
+ const hobject_t &hoid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &_recovery_info,
+ ObjectContextRef obc,
+ ObjectStore::Transaction *t
+ )
+{
+ ObjectRecoveryInfo recovery_info(_recovery_info);
+ if (recovery_info.soid.snap < CEPH_NOSNAP) {
+ assert(recovery_info.oi.snaps.size());
+ OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ set<snapid_t> snaps(
+ recovery_info.oi.snaps.begin(),
+ recovery_info.oi.snaps.end());
+ snap_mapper.add_oid(
+ recovery_info.soid,
+ snaps,
+ &_t);
+ }
+
+ if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
+ assert(is_primary());
+ const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+ if (latest->op == pg_log_entry_t::LOST_REVERT &&
+ latest->reverting_to == recovery_info.version) {
+ dout(10) << " got old revert version " << recovery_info.version
+ << " for " << *latest << dendl;
+ recovery_info.version = latest->version;
+ // update the attr to the revert event version
+ recovery_info.oi.prior_version = recovery_info.oi.version;
+ recovery_info.oi.version = latest->version;
+ bufferlist bl;
+ ::encode(recovery_info.oi, bl);
+ t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+ }
+ }
+
+ // keep track of active pushes for scrub
+ ++active_pushes;
+
+ recover_got(recovery_info.soid, recovery_info.version);
+
+ if (is_primary()) {
+ info.stats.stats.sum.add(stat_diff);
+
+ assert(obc);
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+
+
+ t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+
+ publish_stats_to_osd();
+ if (waiting_for_missing_object.count(hoid)) {
+ dout(20) << " kicking waiters on " << hoid << dendl;
+ requeue_ops(waiting_for_missing_object[hoid]);
+ waiting_for_missing_object.erase(hoid);
+ if (pg_log.get_missing().missing.size() == 0) {
+ requeue_ops(waiting_for_all_missing);
+ waiting_for_all_missing.clear();
+ }
+ }
+ } else {
+ t->register_on_applied(
+ new C_OSD_AppliedRecoveredObjectReplica(this));
+
+ }
+
+ t->register_on_commit(
+ new C_OSD_CommittedPushedObject(
+ this,
+ get_osdmap()->get_epoch(),
+ info.last_complete));
+
+ // update pg
+ dirty_info = true;
+ write_if_dirty(*t);
+
+}
+
+void ReplicatedPG::on_global_recover(
+ const hobject_t &soid)
+{
+ publish_stats_to_osd();
+ dout(10) << "pushed " << soid << " to all replicas" << dendl;
+ assert(recovering.count(soid));
+ recovering.erase(soid);
+ finish_recovery_op(soid);
+ if (waiting_for_degraded_object.count(soid)) {
+ requeue_ops(waiting_for_degraded_object[soid]);
+ waiting_for_degraded_object.erase(soid);
+ }
+ finish_degraded_object(soid);
+}
+
+void ReplicatedPG::on_peer_recover(
+ int peer,
+ const hobject_t &soid,
+ const ObjectRecoveryInfo &recovery_info,
+ const object_stat_sum_t &stat)
+{
+ info.stats.stats.sum.add(stat);
+ publish_stats_to_osd();
+ // done!
+ peer_missing[peer].got(soid, recovery_info.version);
+ if (peer == backfill_target && backfills_in_flight.count(soid))
+ backfills_in_flight.erase(soid);
+}
+
+void ReplicatedPG::begin_peer_recover(
+ int peer,
+ const hobject_t soid)
+{
+ peer_missing[peer].revise_have(soid, eversion_t());
+}
+
// =======================
// pg changes
@@ -117,18 +271,18 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef o
assert(g != missing.missing.end());
const eversion_t &v(g->second.need);
- map<hobject_t, PullInfo>::const_iterator p = pulling.find(soid);
- if (p != pulling.end()) {
- dout(7) << "missing " << soid << " v " << v << ", already pulling." << dendl;
+ set<hobject_t>::const_iterator p = recovering.find(soid);
+ if (p != recovering.end()) {
+ dout(7) << "missing " << soid << " v " << v << ", already recovering." << dendl;
}
else if (missing_loc.find(soid) == missing_loc.end()) {
dout(7) << "missing " << soid << " v " << v << ", is unfound." << dendl;
}
else {
- dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
- map<int, vector<PullOp> > pulls;
- prepare_pull(soid, v, cct->_conf->osd_client_op_priority, &pulls);
- send_pulls(cct->_conf->osd_client_op_priority, pulls);
+ dout(7) << "missing " << soid << " v " << v << ", recovering." << dendl;
+ PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
+ recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+ pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
}
waiting_for_missing_object[soid].push_back(op);
op->mark_delayed("waiting for missing object");
@@ -165,15 +319,15 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
assert(is_degraded_object(soid));
// we don't have it (yet).
- if (pushing.count(soid)) {
+ if (recovering.count(soid)) {
dout(7) << "degraded "
<< soid
- << ", already pushing"
+ << ", already recovering"
<< dendl;
} else {
dout(7) << "degraded "
<< soid
- << ", pushing"
+ << ", recovering"
<< dendl;
eversion_t v;
for (unsigned i = 1; i < acting.size(); i++) {
@@ -184,9 +338,9 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef
break;
}
}
- map<int, vector<PushOp> > pushes;
- prep_object_replica_pushes(soid, v, cct->_conf->osd_client_op_priority, &pushes);
- send_pushes(cct->_conf->osd_client_op_priority, pushes);
+ PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
+ prep_object_replica_pushes(soid, v, h);
+ pgbackend->run_recovery_op(h, cct->_conf->osd_client_op_priority);
}
waiting_for_degraded_object[soid].push_back(op);
op->mark_delayed("waiting for degraded object");
@@ -628,9 +782,8 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid),
+ pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
- temp_created(false),
- temp_coll(coll_t::make_temp_coll(p)),
temp_seq(0),
snap_trimmer_machine(this)
{
@@ -644,6 +797,62 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
src_oloc.key = oid.name;
}
+void ReplicatedPG::do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
+{
+ // do any pending flush
+ do_pending_flush();
+
+ if (!op_has_sufficient_caps(op)) {
+ osd->reply_op_error(op, -EPERM);
+ return;
+ }
+ assert(!op_must_wait_for_map(get_osdmap(), op));
+ if (can_discard_request(op)) {
+ return;
+ }
+ if (!flushed) {
+ dout(20) << " !flushed, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+
+ if (pgbackend->handle_message(op))
+ return;
+
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ if (is_replay() || !is_active()) {
+ dout(20) << " replay, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+ do_op(op); // do it now
+ break;
+
+ case MSG_OSD_SUBOP:
+ do_sub_op(op);
+ break;
+
+ case MSG_OSD_SUBOPREPLY:
+ do_sub_op_reply(op);
+ break;
+
+ case MSG_OSD_PG_SCAN:
+ do_scan(op, handle);
+ break;
+
+ case MSG_OSD_PG_BACKFILL:
+ do_backfill(op);
+ break;
+
+ default:
+ assert(0 == "bad message type in do_request");
+ }
+}
+
+
/** do_op - do an op
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
@@ -1237,26 +1446,6 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
<< " lat " << latency << dendl;
}
-void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
-{
- utime_t now = ceph_clock_now(cct);
- utime_t latency = now;
- latency -= op->request->get_recv_stamp();
-
- uint64_t inb = op->request->get_data().length();
-
- osd->logger->inc(l_osd_sop);
-
- osd->logger->inc(l_osd_sop_inb, inb);
- osd->logger->tinc(l_osd_sop_lat, latency);
-
- if (tag_inb)
- osd->logger->inc(tag_inb, inb);
- osd->logger->tinc(tag_lat, latency);
-}
-
-
-
void ReplicatedPG::do_sub_op(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
@@ -1267,11 +1456,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
OSDOp *first = NULL;
if (m->ops.size() >= 1) {
first = &m->ops[0];
- switch (first->op.op) {
- case CEPH_OSD_OP_PULL:
- sub_op_pull(op);
- return;
- }
}
if (!is_active()) {
@@ -1282,9 +1466,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
if (first) {
switch (first->op.op) {
- case CEPH_OSD_OP_PUSH:
- sub_op_push(op);
- return;
case CEPH_OSD_OP_DELETE:
sub_op_remove(op);
return;
@@ -1313,11 +1494,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
if (r->ops.size() >= 1) {
OSDOp& first = r->ops[0];
switch (first.op.op) {
- case CEPH_OSD_OP_PUSH:
- // continue peer recovery
- sub_op_push_reply(op);
- return;
-
case CEPH_OSD_OP_SCRUB_RESERVE:
sub_op_scrub_reserve_reply(op);
return;
@@ -1403,7 +1579,7 @@ void ReplicatedPG::do_scan(
}
}
-void ReplicatedPG::_do_push(OpRequestRef op)
+void ReplicatedBackend::_do_push(OpRequestRef op)
{
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
assert(m->get_header().type == MSG_OSD_PG_PUSH);
@@ -1420,18 +1596,43 @@ void ReplicatedPG::_do_push(OpRequestRef op)
MOSDPGPushReply *reply = new MOSDPGPushReply;
reply->set_priority(m->get_priority());
- reply->pgid = info.pgid;
+ reply->pgid = get_info().pgid;
reply->map_epoch = m->map_epoch;
reply->replies.swap(replies);
reply->compute_cost(cct);
- t->register_on_complete(new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ t->register_on_complete(
+ get_parent()->bless_context(
+ new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection())));
- osd->store->queue_transaction(osr.get(), t);
+ get_parent()->queue_transaction(t);
}
-void ReplicatedPG::_do_pull_response(OpRequestRef op)
+struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
+ ReplicatedBackend *bc;
+ list<ObjectContextRef> to_continue;
+ int priority;
+ C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+ : bc(bc), priority(priority) {}
+
+ void finish(ThreadPool::TPHandle &handle) {
+ ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+ for (list<ObjectContextRef>::iterator i =
+ to_continue.begin();
+ i != to_continue.end();
+ ++i) {
+ if (!bc->start_pushes((*i)->obs.oi.soid, *i, h)) {
+ bc->get_parent()->on_global_recover(
+ (*i)->obs.oi.soid);
+ }
+ handle.reset_tp_timeout();
+ }
+ bc->run_recovery_op(h, priority);
+ }
+};
+
+void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
assert(m->get_header().type == MSG_OSD_PG_PUSH);
@@ -1439,31 +1640,45 @@ void ReplicatedPG::_do_pull_response(OpRequestRef op)
vector<PullOp> replies(1);
ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ list<ObjectContextRef> to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
- bool more = handle_pull_response(from, *i, &(replies.back()), t);
+ bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
if (more)
replies.push_back(PullOp());
}
+ if (!to_continue.empty()) {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ m->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new C_QueueInWQ(
+ &osd->push_wq,
+ get_parent()->bless_gencontext(c)));
+ }
replies.erase(replies.end() - 1);
if (replies.size()) {
MOSDPGPull *reply = new MOSDPGPull;
reply->set_priority(m->get_priority());
- reply->pgid = info.pgid;
+ reply->pgid = get_info().pgid;
reply->map_epoch = m->map_epoch;
reply->pulls.swap(replies);
reply->compute_cost(cct);
- t->register_on_complete(new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ t->register_on_complete(
+ get_parent()->bless_context(
+ new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection())));
}
- osd->store->queue_transaction(osr.get(), t);
+ get_parent()->queue_transaction(t);
}
-void ReplicatedPG::do_pull(OpRequestRef op)
+void ReplicatedBackend::do_pull(OpRequestRef op)
{
MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
assert(m->get_header().type == MSG_OSD_PG_PULL);
@@ -1479,7 +1694,7 @@ void ReplicatedPG::do_pull(OpRequestRef op)
send_pushes(m->get_priority(), replies);
}
-void ReplicatedPG::do_push_reply(OpRequestRef op)
+void ReplicatedBackend::do_push_reply(OpRequestRef op)
{
MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
@@ -3976,19 +4191,9 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
}
}
-bool ReplicatedPG::have_temp_coll()
-{
- return temp_created || osd->store->collection_exists(temp_coll);
-}
-
coll_t ReplicatedPG::get_temp_coll(ObjectStore::Transaction *t)
{
- if (temp_created)
- return temp_coll;
- if (!osd->store->collection_exists(temp_coll))
- t->create_collection(temp_coll);
- temp_created = true;
- return temp_coll;
+ return pgbackend->get_temp_coll(t);
}
hobject_t ReplicatedPG::generate_temp_object()
@@ -3996,6 +4201,7 @@ hobject_t ReplicatedPG::generate_temp_object()
ostringstream ss;
ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq);
hobject_t hoid(object_t(ss.str()), "", CEPH_NOSNAP, 0, -1, "");
+ pgbackend->add_temp_obj(hoid);
dout(20) << __func__ << " " << hoid << dendl;
return hoid;
}
@@ -4261,7 +4467,6 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
if (cop->temp_cursor.is_initial()) {
cop->temp_coll = get_temp_coll(&tctx->local_t);
cop->temp_oid = generate_temp_object();
- temp_contents.insert(cop->temp_oid);
repop->ctx->new_temp_oid = cop->temp_oid;
}
@@ -4331,7 +4536,7 @@ int ReplicatedPG::finish_copy(OpContext *ctx)
// finish writing to temp object, then move into place
_write_copy_chunk(cop, &t);
t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, obs.oi.soid);
- temp_contents.erase(cop->temp_oid);
+ pgbackend->clear_temp_obj(cop->temp_oid);
ctx->discard_temp_oid = cop->temp_oid;
}
@@ -4896,7 +5101,8 @@ void ReplicatedPG::check_blacklisted_obc_watchers(ObjectContextRef obc)
void ReplicatedPG::populate_obc_watchers(ObjectContextRef obc)
{
assert(is_active());
- assert(!is_missing_object(obc->obs.oi.soid) ||
+ assert((recovering.count(obc->obs.oi.soid) ||
+ !is_missing_object(obc->obs.oi.soid)) ||
(pg_log.get_log().objects.count(obc->obs.oi.soid) && // or this is a revert... see recover_primary()
pg_log.get_log().objects.find(obc->obs.oi.soid)->second->op ==
pg_log_entry_t::LOST_REVERT &&
@@ -5009,23 +5215,37 @@ ObjectContextRef ReplicatedPG::create_object_context(const object_info_t& oi,
}
ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
- bool can_create)
-{
+ bool can_create,
+ map<string, bufferptr> *attrs)
+{
+ assert(
+ attrs || !pg_log.get_missing().is_missing(soid) ||
+ // or this is a revert... see recover_primary()
+ (pg_log.get_log().objects.count(soid) &&
+ pg_log.get_log().objects.find(soid)->second->op ==
+ pg_log_entry_t::LOST_REVERT));
ObjectContextRef obc = object_contexts.lookup(soid);
if (obc) {
dout(10) << "get_object_context " << obc << " " << soid << dendl;
} else {
// check disk
bufferlist bv;
- int r = osd->store->getattr(coll, soid, OI_ATTR, bv);
- if (r < 0) {
- if (!can_create)
- return ObjectContextRef(); // -ENOENT!
-
- // new object.
- object_info_t oi(soid);
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
- return create_object_context(oi, ssc);
+ if (attrs) {
+ assert(attrs->count(OI_ATTR));
+ bv.push_back(attrs->find(OI_ATTR)->second);
+ } else {
+ int r = osd->store->getattr(coll, soid, OI_ATTR, bv);
+ if (r < 0) {
+ if (!can_create)
+ return ObjectContextRef(); // -ENOENT!
+
+ // new object.
+ object_info_t oi(soid);
+ SnapSetContext *ssc = get_snapset_context(
+ soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace(),
+ soid.has_snapset() ? attrs : 0);
+ return create_object_context(oi, ssc);
+ }
}
object_info_t oi(bv);
@@ -5037,10 +5257,11 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
obc->obs.oi = oi;
obc->obs.exists = true;
- if (can_create) {
- obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
- register_snapset_context(obc->ssc);
- }
+ obc->ssc = get_snapset_context(
+ soid.oid, soid.get_key(), soid.hash,
+ true, soid.get_namespace(),
+ soid.has_snapset() ? attrs : 0);
+ register_snapset_context(obc->ssc);
populate_obc_watchers(obc);
dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
@@ -5259,11 +5480,13 @@ SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
return ssc;
}
-SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
- const string& key,
- ps_t seed,
- bool can_create,
- const string& nspace)
+SnapSetContext *ReplicatedPG::get_snapset_context(
+ const object_t& oid,
+ const string& key,
+ ps_t seed,
+ bool can_create,
+ const string& nspace,
+ map<string, bufferptr> *attrs)
{
Mutex::Locker l(snapset_contexts_lock);
SnapSetContext *ssc;
@@ -5272,20 +5495,25 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
ssc = p->second;
} else {
bufferlist bv;
- hobject_t head(oid, key, CEPH_NOSNAP, seed,
- info.pgid.pool(), nspace);
- int r = osd->store->getattr(coll, head, SS_ATTR, bv);
- if (r < 0) {
- // try _snapset
- hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed,
- info.pgid.pool(), nspace);
- r = osd->store->getattr(coll, snapdir, SS_ATTR, bv);
- if (r < 0 && !can_create)
- return NULL;
+ if (!attrs) {
+ hobject_t head(oid, key, CEPH_NOSNAP, seed,
+ info.pgid.pool(), nspace);
+ int r = osd->store->getattr(coll, head, SS_ATTR, bv);
+ if (r < 0) {
+ // try _snapset
+ hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed,
+ info.pgid.pool(), nspace);
+ r = osd->store->getattr(coll, snapdir, SS_ATTR, bv);
+ if (r < 0 && !can_create)
+ return NULL;
+ }
+ } else {
+ assert(attrs->count(SS_ATTR));
+ bv.push_back(attrs->find(SS_ATTR)->second);
}
ssc = new SnapSetContext(oid);
_register_snapset_context(ssc);
- if (r >= 0) {
+ if (bv.length()) {
bufferlist::iterator bvp = bv.begin();
ssc->snapset.decode(bvp);
}
@@ -5361,12 +5589,12 @@ void ReplicatedPG::sub_op_modify(OpRequestRef op)
if (m->new_temp_oid != hobject_t()) {
dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
- temp_contents.insert(m->new_temp_oid);
+ pgbackend->add_temp_obj(m->new_temp_oid);
get_temp_coll(&rm->localt);
}
if (m->discard_temp_oid != hobject_t()) {
dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
- temp_contents.erase(m->discard_temp_oid);
+ pgbackend->clear_temp_obj(m->discard_temp_oid);
}
::decode(rm->opt, p);
@@ -5491,7 +5719,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
<< last_peering_reset << dendl;
}
- log_subop_stats(rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
+ log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
bool done = rm->applied && rm->committed;
unlock();
if (done) {
@@ -5532,11 +5760,12 @@ void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
// ===========================================================
-void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
- pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedBackend::calc_head_subsets(
+ ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
{
dout(10) << "calc_head_subsets " << head
<< " clone_overlap " << snapset.clone_overlap << dendl;
@@ -5586,11 +5815,12 @@ void ReplicatedPG::calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, con
<< " clone_subsets " << clone_subsets << dendl;
}
-void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
- const pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets)
+void ReplicatedBackend::calc_clone_subsets(
+ SnapSet& snapset, const hobject_t& soid,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t> >& clone_subsets)
{
dout(10) << "calc_clone_subsets " << soid
<< " clone_overlap " << snapset.clone_overlap << dendl;
@@ -5675,95 +5905,69 @@ void ReplicatedPG::calc_clone_subsets(SnapSet& snapset, const hobject_t& soid,
*/
enum { PULL_NONE, PULL_OTHER, PULL_YES };
-int ReplicatedPG::prepare_pull(
- const hobject_t& soid, eversion_t v,
- int priority,
- map<int, vector<PullOp> > *pulls)
-{
+void ReplicatedBackend::prepare_pull(
+ const hobject_t& soid,
+ ObjectContextRef headctx,
+ RPGHandle *h)
+{
+ assert(get_parent()->get_local_missing().missing.count(soid));
+ eversion_t v = get_parent()->get_local_missing().missing.find(
+ soid)->second.need;
+ const map<hobject_t, set<int> > &missing_loc(
+ get_parent()->get_missing_loc());
+ const map<int, pg_missing_t > &peer_missing(
+ get_parent()->get_peer_missing());
int fromosd = -1;
- map<hobject_t,set<int> >::iterator q = missing_loc.find(soid);
- if (q != missing_loc.end()) {
- // randomize the list of possible sources
- // should we take weights into account?
- vector<int> shuffle(q->second.begin(), q->second.end());
- random_shuffle(shuffle.begin(), shuffle.end());
- for (vector<int>::iterator p = shuffle.begin();
- p != shuffle.end();
- ++p) {
- if (get_osdmap()->is_up(*p)) {
- fromosd = *p;
- break;
- }
- }
- }
- if (fromosd < 0) {
- dout(7) << "pull " << soid
- << " v " << v
- << " but it is unfound" << dendl;
- return PULL_NONE;
- }
+ map<hobject_t,set<int> >::const_iterator q = missing_loc.find(soid);
+ assert(q != missing_loc.end());
+ assert(!q->second.empty());
+
+ // pick a pullee
+ vector<int> shuffle(q->second.begin(), q->second.end());
+ random_shuffle(shuffle.begin(), shuffle.end());
+ vector<int>::iterator p = shuffle.begin();
+ assert(get_osdmap()->is_up(*p));
+ fromosd = *p;
+ assert(fromosd >= 0);
+
+ dout(7) << "pull " << soid
+ << "v " << v
+ << " on osds " << *p
+ << " from osd." << fromosd
+ << dendl;
assert(peer_missing.count(fromosd));
- if (peer_missing[fromosd].is_missing(soid, v)) {
- assert(peer_missing[fromosd].missing[soid].have != v);
+ const pg_missing_t &pmissing = peer_missing.find(fromosd)->second;
+ if (pmissing.is_missing(soid, v)) {
+ assert(pmissing.missing.find(soid)->second.have != v);
dout(10) << "pulling soid " << soid << " from osd " << fromosd
- << " at version " << peer_missing[fromosd].missing[soid].have
+ << " at version " << pmissing.missing.find(soid)->second.have
<< " rather than at version " << v << dendl;
- v = peer_missing[fromosd].missing[soid].have;
- assert(pg_log.get_log().objects.count(soid) &&
- pg_log.get_log().objects.find(soid)->second->op == pg_log_entry_t::LOST_REVERT &&
- pg_log.get_log().objects.find(soid)->second->reverting_to == v);
+ v = pmissing.missing.find(soid)->second.have;
+ assert(get_parent()->get_log().get_log().objects.count(soid) &&
+ (get_parent()->get_log().get_log().objects.find(soid)->second->op ==
+ pg_log_entry_t::LOST_REVERT) &&
+ (get_parent()->get_log().get_log().objects.find(
+ soid)->second->reverting_to ==
+ v));
}
- dout(7) << "pull " << soid
- << " v " << v
- << " on osds " << missing_loc[soid]
- << " from osd." << fromosd
- << dendl;
-
ObjectRecoveryInfo recovery_info;
- // is this a snapped object? if so, consult the snapset.. we may not need the entire object!
- if (soid.snap && soid.snap < CEPH_NOSNAP) {
- // do we have the head and/or snapdir?
- hobject_t head = soid;
- head.snap = CEPH_NOSNAP;
- if (pg_log.get_missing().is_missing(head)) {
- if (pulling.count(head)) {
- dout(10) << " missing but already pulling head " << head << dendl;
- return PULL_NONE;
- } else {
- int r = prepare_pull(
- head, pg_log.get_missing().missing.find(head)->second.need, priority,
- pulls);
- if (r != PULL_NONE)
- return PULL_OTHER;
- return PULL_NONE;
- }
- }
- head.snap = CEPH_SNAPDIR;
- if (pg_log.get_missing().is_missing(head)) {
- if (pulling.count(head)) {
- dout(10) << " missing but already pulling snapdir " << head << dendl;
- return PULL_NONE;
- } else {
- int r = prepare_pull(
- head, pg_log.get_missing().missing.find(head)->second.need, priority,
- pulls);
- if (r != PULL_NONE)
- return PULL_OTHER;
- return PULL_NONE;
- }
- }
-
+ if (soid.is_snap()) {
+ assert(!get_parent()->get_local_missing().is_missing(
+ soid.get_head()) ||
+ !get_parent()->get_local_missing().is_missing(
+ soid.get_snapdir()));
+ assert(headctx);
// check snapset
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
+ SnapSetContext *ssc = headctx->ssc;
assert(ssc);
dout(10) << " snapset " << ssc->snapset << dendl;
- calc_clone_subsets(ssc->snapset, soid, pg_log.get_missing(), info.last_backfill,
+ calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
recovery_info.copy_subset,
recovery_info.clone_subset);
- put_snapset_context(ssc);
// FIXME: this may overestimate if we are pulling multiple clones in parallel...
dout(10) << " pulling " << recovery_info << dendl;
} else {
@@ -5773,8 +5977,8 @@ int ReplicatedPG::prepare_pull(
recovery_info.size = ((uint64_t)-1);
}
- (*pulls)[fromosd].push_back(PullOp());
- PullOp &op = (*pulls)[fromosd].back();
+ h->pulls[fromosd].push_back(PullOp());
+ PullOp &op = h->pulls[fromosd].back();
op.soid = soid;
op.recovery_info = recovery_info;
@@ -5788,11 +5992,78 @@ int ReplicatedPG::prepare_pull(
assert(!pulling.count(soid));
pull_from_peer[fromosd].insert(soid);
PullInfo &pi = pulling[soid];
+ pi.head_ctx = headctx;
pi.recovery_info = op.recovery_info;
pi.recovery_progress = op.recovery_progress;
- pi.priority = priority;
+}
+int ReplicatedPG::recover_missing(
+ const hobject_t &soid, eversion_t v,
+ int priority,
+ PGBackend::RecoveryHandle *h)
+{
+ map<hobject_t,set<int> >::iterator q = missing_loc.find(soid);
+ if (q == missing_loc.end()) {
+ dout(7) << "pull " << soid
+ << " v " << v
+ << " but it is unfound" << dendl;
+ return PULL_NONE;
+ }
+
+ // is this a snapped object? if so, consult the snapset.. we may not need the entire object!
+ ObjectContextRef obc;
+ ObjectContextRef head_obc;
+ if (soid.snap && soid.snap < CEPH_NOSNAP) {
+ // do we have the head and/or snapdir?
+ hobject_t head = soid.get_head();
+ if (pg_log.get_missing().is_missing(head)) {
+ if (recovering.count(head)) {
+ dout(10) << " missing but already recovering head " << head << dendl;
+ return PULL_NONE;
+ } else {
+ int r = recover_missing(
+ head, pg_log.get_missing().missing.find(head)->second.need, priority,
+ h);
+ if (r != PULL_NONE)
+ return PULL_OTHER;
+ return PULL_NONE;
+ }
+ }
+ head = soid.get_snapdir();
+ if (pg_log.get_missing().is_missing(head)) {
+ if (recovering.count(head)) {
+ dout(10) << " missing but already recovering snapdir " << head << dendl;
+ return PULL_NONE;
+ } else {
+ int r = recover_missing(
+ head, pg_log.get_missing().missing.find(head)->second.need, priority,
+ h);
+ if (r != PULL_NONE)
+ return PULL_OTHER;
+ return PULL_NONE;
+ }
+ }
+
+ // we must have one or the other
+ head_obc = get_object_context(
+ soid.get_head(),
+ false,
+ 0);
+ if (!head_obc)
+ head_obc = get_object_context(
+ soid.get_snapdir(),
+ false,
+ 0);
+ assert(head_obc);
+ }
start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(soid);
+ pgbackend->recover_object(
+ soid,
+ head_obc,
+ obc,
+ h);
return PULL_YES;
}
@@ -5816,15 +6087,14 @@ void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
* intelligently push an object to a replica. make use of existing
* clones/heads and dup data ranges where possible.
*/
-void ReplicatedPG::prep_push_to_replica(
+void ReplicatedBackend::prep_push_to_replica(
ObjectContextRef obc, const hobject_t& soid, int peer,
- int prio,
PushOp *pop)
{
const object_info_t& oi = obc->obs.oi;
uint64_t size = obc->obs.oi.size;
- dout(10) << __func__ << soid << " v" << oi.version
+ dout(10) << __func__ << ": " << soid << " v" << oi.version
<< " size " << size << " to osd." << peer << dendl;
map<hobject_t, interval_set<uint64_t> > clone_subsets;
@@ -5837,41 +6107,48 @@ void ReplicatedPG::prep_push_to_replica(
// try to base push off of clones that succeed/preceed poid
// we need the head (and current SnapSet) locally to do that.
- if (pg_log.get_missing().is_missing(head)) {
+ if (get_parent()->get_local_missing().is_missing(head)) {
dout(15) << "push_to_replica missing head " << head << ", pushing raw clone" << dendl;
- return prep_push(prio, obc, soid, peer, pop);
+ return prep_push(obc, soid, peer, pop);
}
hobject_t snapdir = head;
snapdir.snap = CEPH_SNAPDIR;
- if (pg_log.get_missing().is_missing(snapdir)) {
- dout(15) << "push_to_replica missing snapdir " << snapdir << ", pushing raw clone" << dendl;
- return prep_push(prio, obc, soid, peer, pop);
+ if (get_parent()->get_local_missing().is_missing(snapdir)) {
+ dout(15) << "push_to_replica missing snapdir " << snapdir
+ << ", pushing raw clone" << dendl;
+ return prep_push(obc, soid, peer, pop);
}
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
+ SnapSetContext *ssc = obc->ssc;
assert(ssc);
dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
- calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
- peer_info[peer].last_backfill,
+ map<int, pg_missing_t>::const_iterator pm =
+ get_parent()->get_peer_missing().find(peer);
+ assert(pm != get_parent()->get_peer_missing().end());
+ map<int, pg_info_t>::const_iterator pi =
+ get_parent()->get_peer_info().find(peer);
+ assert(pi != get_parent()->get_peer_info().end());
+ calc_clone_subsets(ssc->snapset, soid,
+ pm->second,
+ pi->second.last_backfill,
data_subset, clone_subsets);
- put_snapset_context(ssc);
} else if (soid.snap == CEPH_NOSNAP) {
// pushing head or unversioned object.
// base this on partially on replica's clones?
- SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
+ SnapSetContext *ssc = obc->ssc;
assert(ssc);
dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
- calc_head_subsets(obc, ssc->snapset, soid, peer_missing[peer],
- peer_info[peer].last_backfill,
- data_subset, clone_subsets);
- put_snapset_context(ssc);
+ calc_head_subsets(
+ obc,
+ ssc->snapset, soid, get_parent()->get_peer_missing().find(peer)->second,
+ get_parent()->get_peer_info().find(peer)->second.last_backfill,
+ data_subset, clone_subsets);
}
- prep_push(prio, obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
+ prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop);
}
-void ReplicatedPG::prep_push(int prio,
- ObjectContextRef obc,
+void ReplicatedBackend::prep_push(ObjectContextRef obc,
const hobject_t& soid, int peer,
PushOp *pop)
{
@@ -5880,13 +6157,12 @@ void ReplicatedPG::prep_push(int prio,
data_subset.insert(0, obc->obs.oi.size);
map<hobject_t, interval_set<uint64_t> > clone_subsets;
- prep_push(prio, obc, soid, peer,
+ prep_push(obc, soid, peer,
obc->obs.oi.version, data_subset, clone_subsets,
pop);
}
-void ReplicatedPG::prep_push(
- int prio,
+void ReplicatedBackend::prep_push(
ObjectContextRef obc,
const hobject_t& soid, int peer,
eversion_t version,
@@ -5894,9 +6170,10 @@ void ReplicatedPG::prep_push(
map<hobject_t, interval_set<uint64_t> >& clone_subsets,
PushOp *pop)
{
- peer_missing[peer].revise_have(soid, eversion_t());
+ get_parent()->begin_peer_recover(peer, soid);
// take note.
PushInfo &pi = pushing[soid][peer];
+ pi.obc = obc;
pi.recovery_info.size = obc->obs.oi.size;
pi.recovery_info.copy_subset = data_subset;
pi.recovery_info.clone_subset = clone_subsets;
@@ -5907,19 +6184,20 @@ void ReplicatedPG::prep_push(
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
pi.recovery_progress.omap_complete = 0;
- pi.priority = prio;
ObjectRecoveryProgress new_progress;
- build_push_op(pi.recovery_info,
- pi.recovery_progress,
- &new_progress,
- pop);
+ int r = build_push_op(pi.recovery_info,
+ pi.recovery_progress,
+ &new_progress,
+ pop,
+ &(pi.stat));
+ assert(r == 0);
pi.recovery_progress = new_progress;
}
-int ReplicatedPG::send_pull_legacy(int prio, int peer,
- const ObjectRecoveryInfo &recovery_info,
- ObjectRecoveryProgress progress)
+int ReplicatedBackend::send_pull_legacy(int prio, int peer,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectRecoveryProgress progress)
{
// send op
tid_t tid = osd->get_tid();
@@ -5932,7 +6210,7 @@ int ReplicatedPG::send_pull_legacy(int prio, int peer,
<< " from osd." << peer
<< " tid " << tid << dendl;
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
+ MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, recovery_info.soid,
false, CEPH_OSD_FLAG_ACK,
get_osdmap()->get_epoch(), tid,
recovery_info.version);
@@ -5949,7 +6227,7 @@ int ReplicatedPG::send_pull_legacy(int prio, int peer,
return 0;
}
-void ReplicatedPG::submit_push_data(
+void ReplicatedBackend::submit_push_data(
ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
@@ -5971,9 +6249,7 @@ void ReplicatedPG::submit_push_data(
}
if (first) {
- pg_log.revise_have(recovery_info.soid, eversion_t());
- remove_snap_mapped_object(*t, recovery_info.soid);
- t->remove(coll, recovery_info.soid);
+ get_parent()->on_local_recover_start(recovery_info.soid, t);
t->remove(get_temp_coll(t), recovery_info.soid);
t->touch(target_coll, recovery_info.soid);
t->omap_setheader(target_coll, recovery_info.soid, omap_header);
@@ -6007,8 +6283,8 @@ void ReplicatedPG::submit_push_data(
}
}
-void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
- ObjectStore::Transaction *t)
+void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t)
{
for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
recovery_info.clone_subset.begin();
@@ -6023,67 +6299,29 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
q.get_start(), q.get_len(), q.get_start());
}
}
-
- if (recovery_info.soid.snap < CEPH_NOSNAP) {
- assert(recovery_info.oi.snaps.size());
- OSDriver::OSTransaction _t(osdriver.get_transaction(t));
- set<snapid_t> snaps(
- recovery_info.oi.snaps.begin(),
- recovery_info.oi.snaps.end());
- snap_mapper.add_oid(
- recovery_info.soid,
- snaps,
- &_t);
- }
-
- if (pg_log.get_missing().is_missing(recovery_info.soid) &&
- pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
- assert(is_primary());
- const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
- if (latest->op == pg_log_entry_t::LOST_REVERT &&
- latest->reverting_to == recovery_info.version) {
- dout(10) << " got old revert version " << recovery_info.version
- << " for " << *latest << dendl;
- recovery_info.version = latest->version;
- // update the attr to the revert event version
- recovery_info.oi.prior_version = recovery_info.oi.version;
- recovery_info.oi.version = latest->version;
- bufferlist bl;
- ::encode(recovery_info.oi, bl);
- t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
- }
- }
- recover_got(recovery_info.soid, recovery_info.version);
-
- // update pg
- dirty_info = true;
- write_if_dirty(*t);
}
-ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recovery_info)
+ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
+ const ObjectRecoveryInfo& recovery_info,
+ SnapSetContext *ssc)
{
if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
return recovery_info;
-
- SnapSetContext *ssc = get_snapset_context(recovery_info.soid.oid,
- recovery_info.soid.get_key(),
- recovery_info.soid.hash,
- false,
- recovery_info.soid.get_namespace());
- assert(ssc);
ObjectRecoveryInfo new_info = recovery_info;
new_info.copy_subset.clear();
new_info.clone_subset.clear();
assert(ssc);
- calc_clone_subsets(ssc->snapset, new_info.soid, pg_log.get_missing(), info.last_backfill,
+ calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
new_info.copy_subset, new_info.clone_subset);
- put_snapset_context(ssc);
return new_info;
}
-bool ReplicatedPG::handle_pull_response(
+bool ReplicatedBackend::handle_pull_response(
int from, PushOp &pop, PullOp *response,
- ObjectStore::Transaction *t)
+ list<ObjectContextRef> *to_continue,
+ ObjectStore::Transaction *t
+ )
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
@@ -6115,7 +6353,13 @@ bool ReplicatedPG::handle_pull_response(
pop.recovery_info.copy_subset);
}
- pi.recovery_info = recalc_subsets(pi.recovery_info);
+ bool first = pi.recovery_progress.first;
+ if (first) {
+ pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
+ pi.recovery_info.oi = pi.obc->obs.oi;
+ pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
+ }
+
interval_set<uint64_t> usable_intervals;
bufferlist usable_data;
@@ -6127,33 +6371,15 @@ bool ReplicatedPG::handle_pull_response(
data_included = usable_intervals;
data.claim(usable_data);
- info.stats.stats.sum.num_bytes_recovered += data.length();
- bool first = pi.recovery_progress.first;
pi.recovery_progress = pop.after_progress;
+ pi.stat.num_bytes_recovered += data.length();
+
dout(10) << "new recovery_info " << pi.recovery_info
<< ", new progress " << pi.recovery_progress
<< dendl;
- if (first) {
- bufferlist oibl;
- if (pop.attrset.count(OI_ATTR)) {
- oibl.push_back(pop.attrset[OI_ATTR]);
- ::decode(pi.recovery_info.oi, oibl);
- } else {
- assert(0);
- }
- bufferlist ssbl;
- if (pop.attrset.count(SS_ATTR)) {
- ssbl.push_back(pop.attrset[SS_ATTR]);
- ::decode(pi.recovery_info.ss, ssbl);
- } else {
- assert(pi.recovery_info.soid.snap != CEPH_NOSNAP &&
- pi.recovery_info.soid.snap != CEPH_SNAPDIR);
- }
- }
-
bool complete = pi.is_complete();
submit_push_data(pi.recovery_info, first,
@@ -6164,53 +6390,17 @@ bool ReplicatedPG::handle_pull_response(
pop.omap_entries,
t);
- info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
-
- if (complete) {
- info.stats.stats.sum.num_objects_recovered++;
-
- SnapSetContext *ssc;
- if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) {
- ssc = create_snapset_context(hoid.oid);
- ssc->snapset = pi.recovery_info.ss;
- } else {
- ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false,
- hoid.get_namespace());
- assert(ssc);
- }
- ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc);
- obc->obs.exists = true;
-
- obc->ondisk_write_lock();
-
- // keep track of active pushes for scrub
- ++active_pushes;
-
- t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
- t->register_on_complete(
- new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
- }
-
- t->register_on_commit(
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete));
+ pi.stat.num_keys_recovered += pop.omap_entries.size();
if (complete) {
+ to_continue->push_back(pi.obc);
+ pi.stat.num_objects_recovered++;
+ get_parent()->on_local_recover(
+ hoid, pi.stat, pi.recovery_info, pi.obc, t);
pulling.erase(hoid);
pull_from_peer[from].erase(hoid);
- publish_stats_to_osd();
- if (waiting_for_missing_object.count(hoid)) {
- dout(20) << " kicking waiters on " << hoid << dendl;
- requeue_ops(waiting_for_missing_object[hoid]);
- waiting_for_missing_object.erase(hoid);
- if (pg_log.get_missing().missing.size() == 0) {
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
- }
- }
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
return false;
} else {
response->soid = pop.soid;
@@ -6226,11 +6416,11 @@ struct C_OnPushCommit : public Context {
C_OnPushCommit(ReplicatedPG *pg, OpRequestRef op) : pg(pg), op(op) {}
void finish(int) {
op->mark_event("committed");
- pg->log_subop_stats(op, l_osd_push_inb, l_osd_sop_push_lat);
+ log_subop_stats(pg->osd, op, l_osd_push_inb, l_osd_sop_push_lat);
}
};
-void ReplicatedPG::handle_push(
+void ReplicatedBackend::handle_push(
int from, PushOp &pop, PushReplyOp *response,
ObjectStore::Transaction *t)
{
@@ -6244,12 +6434,7 @@ void ReplicatedPG::handle_push(
bool complete = pop.after_progress.data_complete &&
pop.after_progress.omap_complete;
- // keep track of active pushes for scrub
- ++active_pushes;
-
response->soid = pop.recovery_info.soid;
- t->register_on_applied(
- new C_OSD_AppliedRecoveredObjectReplica(this));
submit_push_data(pop.recovery_info,
first,
complete,
@@ -6260,14 +6445,16 @@ void ReplicatedPG::handle_push(
pop.omap_entries,
t);
- t->register_on_commit(
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete));
+ if (complete)
+ get_parent()->on_local_recover(
+ pop.recovery_info.soid,
+ object_stat_sum_t(),
+ pop.recovery_info,
+ ObjectContextRef(), // ok, is replica
+ t);
}
-void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
+void ReplicatedBackend::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
{
for (map<int, vector<PushOp> >::iterator i = pushes.begin();
i != pushes.end();
@@ -6291,7 +6478,7 @@ void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
uint64_t cost = 0;
uint64_t pushes = 0;
MOSDPGPush *msg = new MOSDPGPush();
- msg->pgid = info.pgid;
+ msg->pgid = get_info().pgid;
msg->map_epoch = get_osdmap()->get_epoch();
msg->set_priority(prio);
for (;
@@ -6312,7 +6499,7 @@ void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
}
}
-void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
+void ReplicatedBackend::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
{
for (map<int, vector<PullOp> >::iterator i = pulls.begin();
i != pulls.end();
@@ -6339,7 +6526,7 @@ void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
<< " to osd." << i->first << dendl;
MOSDPGPull *msg = new MOSDPGPull();
msg->set_priority(prio);
- msg->pgid = info.pgid;
+ msg->pgid = get_info().pgid;
msg->map_epoch = get_osdmap()->get_epoch();
msg->pulls.swap(i->second);
msg->compute_cost(cct);
@@ -6348,22 +6535,11 @@ void ReplicatedPG::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
}
}
-int ReplicatedPG::send_push(int prio, int peer,
- const ObjectRecoveryInfo &recovery_info,
- const ObjectRecoveryProgress &progress,
- ObjectRecoveryProgress *out_progress)
-{
- PushOp op;
- int r = build_push_op(recovery_info, progress, out_progress, &op);
- if (r < 0)
- return r;
- return send_push_op_legacy(prio, peer, op);
-}
-
-int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
- const ObjectRecoveryProgress &progress,
- ObjectRecoveryProgress *out_progress,
- PushOp *out_op)
+int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op,
+ object_stat_sum_t *stat)
{
ObjectRecoveryProgress _new_progress;
if (!out_progress)
@@ -6387,7 +6563,7 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
object_info_t oi(bv);
if (oi.version != recovery_info.version) {
- osd->clog.error() << info.pgid << " push "
+ osd->clog.error() << get_info().pgid << " push "
<< recovery_info.soid << " v "
<< " failed because local copy is "
<< oi.version << "\n";
@@ -6450,11 +6626,14 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
if (new_progress.is_complete(recovery_info)) {
new_progress.data_complete = true;
- info.stats.stats.sum.num_objects_recovered++;
+ if (stat)
+ stat->num_objects_recovered++;
}
- info.stats.stats.sum.num_keys_recovered += out_op->omap_entries.size();
- info.stats.stats.sum.num_bytes_recovered += out_op->data.length();
+ if (stat) {
+ stat->num_keys_recovered += out_op->omap_entries.size();
+ stat->num_bytes_recovered += out_op->data.length();
+ }
osd->logger->inc(l_osd_push);
osd->logger->inc(l_osd_push_outb, out_op->data.length());
@@ -6468,11 +6647,11 @@ int ReplicatedPG::build_push_op(const ObjectRecoveryInfo &recovery_info,
return 0;
}
-int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop)
+int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop)
{
tid_t tid = osd->get_tid();
osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
- MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, pop.soid,
+ MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, pop.soid,
false, 0, get_osdmap()->get_epoch(),
tid, pop.recovery_info.version);
subop->ops = vector<OSDOp>(1);
@@ -6493,14 +6672,14 @@ int ReplicatedPG::send_push_op_legacy(int prio, int peer, PushOp &pop)
return 0;
}
-void ReplicatedPG::prep_push_op_blank(const hobject_t& soid, PushOp *op)
+void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
{
op->recovery_info.version = eversion_t();
op->version = eversion_t();
op->soid = soid;
}
-void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
+void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
const hobject_t& soid = reply->get_poid();
@@ -6515,10 +6694,10 @@ void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
PushOp pop;
bool more = handle_push_reply(peer, rop, &pop);
if (more)
- send_push_op_legacy(pushing[soid][peer].priority, peer, pop);
+ send_push_op_legacy(op->request->get_priority(), peer, pop);
}
-bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
+bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
if (pushing.count(soid) == 0) {
@@ -6538,32 +6717,25 @@ bool ReplicatedPG::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
<< pi->recovery_progress.data_recovered_to
<< " of " << pi->recovery_info.copy_subset << dendl;
ObjectRecoveryProgress new_progress;
- build_push_op(
+ int r = build_push_op(
pi->recovery_info,
- pi->recovery_progress, &new_progress, reply);
+ pi->recovery_progress, &new_progress, reply,
+ &(pi->stat));
+ assert(r == 0);
pi->recovery_progress = new_progress;
return true;
} else {
// done!
- if (peer == backfill_target && backfills_in_flight.count(soid))
- backfills_in_flight.erase(soid);
- else
- peer_missing[peer].got(soid, pi->recovery_info.version);
+ get_parent()->on_peer_recover(
+ peer, soid, pi->recovery_info,
+ pi->stat);
pushing[soid].erase(peer);
pi = NULL;
- publish_stats_to_osd();
if (pushing[soid].empty()) {
- pushing.erase(soid);
- dout(10) << "pushed " << soid << " to all replicas" << dendl;
- finish_recovery_op(soid);
- if (waiting_for_degraded_object.count(soid)) {
- requeue_ops(waiting_for_degraded_object[soid]);
- waiting_for_degraded_object.erase(soid);
- }
- finish_degraded_object(soid);
+ get_parent()->on_global_recover(soid);
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
@@ -6601,7 +6773,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
* process request to pull an entire object.
* NOTE: called from opqueue.
*/
-void ReplicatedPG::sub_op_pull(OpRequestRef op)
+void ReplicatedBackend::sub_op_pull(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
assert(m->get_header().type == MSG_OSD_SUBOP);
@@ -6628,16 +6800,17 @@ void ReplicatedPG::sub_op_pull(OpRequestRef op)
m->get_source().num(),
reply);
- log_subop_stats(op, 0, l_osd_sop_pull_lat);
+ log_subop_stats(osd, op, 0, l_osd_sop_pull_lat);
}
-void ReplicatedPG::handle_pull(int peer, PullOp &op, PushOp *reply)
+void ReplicatedBackend::handle_pull(int peer, PullOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
struct stat st;
int r = osd->store->stat(coll, soid, &st);
if (r != 0) {
- osd->clog.error() << info.pgid << " " << peer << " tried to pull " << soid
+ osd->clog.error() << get_info().pgid << " "
+ << peer << " tried to pull " << soid
<< " but got " << cpp_strerror(-r) << "\n";
prep_push_op_blank(soid, reply);
} else {
@@ -6754,7 +6927,7 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
* @param intervals_usable intervals we want to keep
* @param data_usable matching data we want to keep
*/
-void ReplicatedPG::trim_pushed_data(
+void ReplicatedBackend::trim_pushed_data(
const interval_set<uint64_t> &copy_subset,
const interval_set<uint64_t> &intervals_received,
bufferlist data_received,
@@ -6792,7 +6965,7 @@ void ReplicatedPG::trim_pushed_data(
/** op_push
* NOTE: called from opqueue.
*/
-void ReplicatedPG::sub_op_push(OpRequestRef op)
+void ReplicatedBackend::sub_op_push(OpRequestRef op)
{
op->mark_started();
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
@@ -6812,14 +6985,29 @@ void ReplicatedPG::sub_op_push(OpRequestRef op)
if (is_primary()) {
PullOp resp;
- bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
+ RPGHandle *h = _open_recovery_op();
+ list<ObjectContextRef> to_continue;
+ bool more = handle_pull_response(
+ m->get_source().num(), pop, &resp,
+ &to_continue, t);
if (more) {
send_pull_legacy(
m->get_priority(),
m->get_source().num(),
resp.recovery_info,
resp.recovery_progress);
- }
+ } else {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ op->request->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ new C_QueueInWQ(
+ &osd->push_wq,
+ get_parent()->bless_gencontext(c)));
+ }
+ run_recovery_op(h, op->request->get_priority());
} else {
PushReplyOp resp;
MOSDSubOpReply *reply = new MOSDSubOpReply(
@@ -6828,15 +7016,16 @@ void ReplicatedPG::sub_op_push(OpRequestRef op)
assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
handle_push(m->get_source().num(), pop, &resp, t);
t->register_on_complete(new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ osd, reply, m->get_connection()));
}
- t->register_on_commit(new C_OnPushCommit(this, op));
- osd->store->queue_transaction(osr.get(), t);
+ get_parent()->queue_transaction(t);
return;
}
-void ReplicatedPG::_failed_push(int from, const hobject_t &soid)
+void ReplicatedPG::failed_push(int from, const hobject_t &soid)
{
+ assert(recovering.count(soid));
+ recovering.erase(soid);
map<hobject_t,set<int> >::iterator p = missing_loc.find(soid);
if (p != missing_loc.end()) {
dout(0) << "_failed_push " << soid << " from osd." << from
@@ -6849,9 +7038,15 @@ void ReplicatedPG::_failed_push(int from, const hobject_t &soid)
dout(0) << "_failed_push " << soid << " from osd." << from
<< " but not in missing_loc ???" << dendl;
}
-
finish_recovery_op(soid); // close out this attempt,
+}
+
+void ReplicatedBackend::_failed_push(int from, const hobject_t &soid)
+{
+ get_parent()->failed_push(from, soid);
pull_from_peer[from].erase(soid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
pulling.erase(soid);
}
@@ -7147,20 +7342,6 @@ void ReplicatedPG::on_shutdown()
cancel_recovery();
}
-void ReplicatedPG::on_flushed()
-{
- assert(object_contexts.empty());
- if (have_temp_coll() &&
- !osd->store->collection_empty(get_temp_coll())) {
- vector<hobject_t> objects;
- osd->store->collection_list(get_temp_coll(), objects);
- derr << __func__ << ": found objects in the temp collection: "
- << objects << ", crashing now"
- << dendl;
- assert(0 == "found garbage in the temp collection");
- }
-}
-
void ReplicatedPG::on_activate()
{
for (unsigned i = 1; i<acting.size(); i++) {
@@ -7223,20 +7404,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
// any dups
apply_and_flush_repops(is_primary());
- // clear pushing/pulling maps
- pushing.clear();
- pulling.clear();
- pull_from_peer.clear();
-
- // clear temp
- for (set<hobject_t>::iterator i = temp_contents.begin();
- i != temp_contents.end();
- ++i) {
- dout(10) << __func__ << ": Removing oid "
- << *i << " from the temp collection" << dendl;
- t->remove(get_temp_coll(t), *i);
- }
- temp_contents.clear();
+ pgbackend->on_change(t);
// clear snap_trimmer state
snap_trimmer_machine.process_event(Reset());
@@ -7262,9 +7430,16 @@ void ReplicatedPG::_clear_recovery_state()
backfill_pos = hobject_t();
backfills_in_flight.clear();
pending_backfill_updates.clear();
- pulling.clear();
- pushing.clear();
- pull_from_peer.clear();
+ recovering.clear();
+ pgbackend->clear_state();
+}
+
+void ReplicatedPG::cancel_pull(const hobject_t &soid)
+{
+ assert(recovering.count(soid));
+ recovering.erase(soid);
+ finish_recovery_op(soid);
+ pg_log.set_last_requested(0); // get recover_primary to start over
}
void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
@@ -7283,26 +7458,10 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
}
dout(10) << "check_recovery_sources source osd." << *p << " now down" << dendl;
now_down.insert(*p);
-
- // reset pulls?
- map<int, set<hobject_t> >::iterator j = pull_from_peer.find(*p);
- if (j != pull_from_peer.end()) {
- dout(10) << "check_recovery_sources resetting pulls from osd." << *p
- << ", osdmap has it marked down" << dendl;
- for (set<hobject_t>::iterator i = j->second.begin();
- i != j->second.end();
- ++i) {
- assert(pulling.count(*i) == 1);
- pulling.erase(*i);
- finish_recovery_op(*i);
- }
- pg_log.set_last_requested(0);
- pull_from_peer.erase(j++);
- }
-
- // remove from missing_loc_sources
missing_loc_sources.erase(p++);
}
+ pgbackend->check_recovery_sources(osdmap);
+
if (now_down.empty()) {
dout(10) << "check_recovery_sources no source osds (" << missing_loc_sources << ") went down" << dendl;
} else {
@@ -7388,7 +7547,8 @@ int ReplicatedPG::start_recovery_ops(
}
bool deferred_backfill = false;
- if (state_test(PG_STATE_BACKFILL) &&
+ if (recovering.empty() &&
+ state_test(PG_STATE_BACKFILL) &&
backfill_target >= 0 && started < max &&
missing.num_missing() == 0 &&
!waiting_on_backfill) {
@@ -7416,9 +7576,11 @@ int ReplicatedPG::start_recovery_ops(
dout(10) << " started " << started << dendl;
osd->logger->inc(l_osd_rop, started);
- if (started || recovery_ops_active > 0 || deferred_backfill)
+ if (!recovering.empty() ||
+ started || recovery_ops_active > 0 || deferred_backfill)
return started;
+ assert(recovering.empty());
assert(recovery_ops_active == 0);
int unfound = get_num_unfound();
@@ -7484,7 +7646,8 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
const pg_missing_t &missing = pg_log.get_missing();
- dout(10) << "recover_primary pulling " << pulling.size() << " in pg" << dendl;
+ dout(10) << "recover_primary recovering " << recovering.size()
+ << " in pg" << dendl;
dout(10) << "recover_primary " << missing << dendl;
dout(25) << "recover_primary " << missing.missing << dendl;
@@ -7493,7 +7656,7 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
int started = 0;
int skipped = 0;
- map<int, vector<PullOp> > pulls;
+ PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
map<version_t, hobject_t>::const_iterator p =
missing.rmissing.lower_bound(pg_log.get_log().last_requested);
while (p != missing.rmissing.end()) {
@@ -7524,8 +7687,8 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
<< (unfound ? " (unfound)":"")
<< (missing.is_missing(soid) ? " (missing)":"")
<< (missing.is_missing(head) ? " (missing head)":"")
- << (pulling.count(soid) ? " (pulling)":"")
- << (pulling.count(head) ? " (pulling head)":"")
+ << (recovering.count(soid) ? " (recovering)":"")
+ << (recovering.count(head) ? " (recovering head)":"")
<< dendl;
if (latest) {
@@ -7600,14 +7763,14 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
}
}
- if (!pulling.count(soid)) {
- if (pulling.count(head)) {
+ if (!recovering.count(soid)) {
+ if (recovering.count(head)) {
++skipped;
} else if (unfound) {
++skipped;
} else {
- int r = prepare_pull(
- soid, need, cct->_conf->osd_recovery_op_priority, &pulls);
+ int r = recover_missing(
+ soid, need, cct->_conf->osd_recovery_op_priority, h);
switch (r) {
case PULL_YES:
++started;
@@ -7629,14 +7792,14 @@ int ReplicatedPG::recover_primary(int max, ThreadPool::TPHandle &handle)
if (!skipped)
pg_log.set_last_requested(v);
}
-
- send_pulls(cct->_conf->osd_recovery_op_priority, pulls);
+
+ pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
return started;
}
int ReplicatedPG::prep_object_replica_pushes(
- const hobject_t& soid, eversion_t v, int prio,
- map<int, vector<PushOp> > *pushes)
+ const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h)
{
dout(10) << __func__ << ": on " << soid << dendl;
@@ -7663,30 +7826,46 @@ int ReplicatedPG::prep_object_replica_pushes(
return 0;
}
- dout(10) << " ondisk_read_lock for " << soid << dendl;
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(soid);
+
+ /* We need this in case there is an in progress write on the object. In fact,
+ * the only possible write is an update to the xattr due to a lost_revert --
+ * a client write would be blocked since the object is degraded.
+ * In almost all cases, therefore, this lock should be uncontended.
+ */
obc->ondisk_read_lock();
-
+ pgbackend->recover_object(
+ soid,
+ ObjectContextRef(),
+ obc, // has snapset context
+ h);
+ obc->ondisk_read_unlock();
+ return 1;
+}
+
+int ReplicatedBackend::start_pushes(
+ const hobject_t &soid,
+ ObjectContextRef obc,
+ RPGHandle *h)
+{
+ int pushes = 0;
// who needs it?
- bool started = false;
- for (unsigned i=1; i<acting.size(); i++) {
- int peer = acting[i];
- if (peer_missing.count(peer) &&
- peer_missing[peer].is_missing(soid)) {
- if (!started) {
- start_recovery_op(soid);
- started = true;
- }
- (*pushes)[peer].push_back(PushOp());
- prep_push_to_replica(obc, soid, peer, prio,
- &((*pushes)[peer].back())
+ for (unsigned i=1; i<get_parent()->get_acting().size(); i++) {
+ int peer = get_parent()->get_acting()[i];
+ map<int, pg_missing_t>::const_iterator j =
+ get_parent()->get_peer_missing().find(peer);
+ assert(j != get_parent()->get_peer_missing().end());
+ if (j->second.is_missing(soid)) {
+ ++pushes;
+ h->pushes[peer].push_back(PushOp());
+ prep_push_to_replica(obc, soid, peer,
+ &(h->pushes[peer].back())
);
}
}
-
- dout(10) << " ondisk_read_unlock on " << soid << dendl;
- obc->ondisk_read_unlock();
-
- return 1;
+ return pushes;
}
int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
@@ -7694,7 +7873,7 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
dout(10) << __func__ << "(" << max << ")" << dendl;
int started = 0;
- map<int, vector<PushOp> > pushes;
+ PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
// this is FAR from an optimal recovery order. pretty lame, really.
for (unsigned i=1; i<acting.size(); i++) {
@@ -7714,8 +7893,8 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
handle.reset_tp_timeout();
const hobject_t soid(p->second);
- if (pushing.count(soid)) {
- dout(10) << __func__ << ": already pushing " << soid << dendl;
+ if (recovering.count(soid)) {
+ dout(10) << __func__ << ": already recovering" << soid << dendl;
continue;
}
@@ -7730,13 +7909,11 @@ int ReplicatedPG::recover_replicas(int max, ThreadPool::TPHandle &handle)
dout(10) << __func__ << ": recover_object_replicas(" << soid << ")" << dendl;
map<hobject_t,pg_missing_t::item>::const_iterator r = m.missing.find(soid);
started += prep_object_replica_pushes(soid, r->second.need,
- cct->_conf->osd_recovery_op_priority,
- &pushes);
+ h);
}
}
- send_pushes(cct->_conf->osd_recovery_op_priority, pushes);
-
+ pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
return started;
}
@@ -7899,15 +8076,16 @@ int ReplicatedPG::recover_backfill(
send_remove_op(i->first, i->second, backfill_target);
}
+ PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
map<int, vector<PushOp> > pushes;
for (map<hobject_t, pair<eversion_t, eversion_t> >::iterator i = to_push.begin();
i != to_push.end();
++i) {
handle.reset_tp_timeout();
prep_backfill_object_push(
- i->first, i->second.first, i->second.second, backfill_target, &pushes);
+ i->first, i->second.first, i->second.second, backfill_target, h);
}
- send_pushes(cct->_conf->osd_recovery_op_priority, pushes);
+ pgbackend->run_recovery_op(h, cct->_conf->osd_recovery_op_priority);
release_waiting_for_backfill_pos();
dout(5) << "backfill_pos is " << backfill_pos << " and pinfo.last_backfill is "
@@ -7953,20 +8131,25 @@ int ReplicatedPG::recover_backfill(
void ReplicatedPG::prep_backfill_object_push(
hobject_t oid, eversion_t v, eversion_t have, int peer,
- map<int, vector<PushOp> > *pushes)
+ PGBackend::RecoveryHandle *h)
{
dout(10) << "push_backfill_object " << oid << " v " << v << " to osd." << peer << dendl;
backfills_in_flight.insert(oid);
+ map<int, pg_missing_t>::iterator bpm = peer_missing.find(backfill_target);
+ assert(bpm != peer_missing.end());
+ bpm->second.add(oid, eversion_t(), eversion_t());
- if (!pushing.count(oid))
- start_recovery_op(oid);
+ assert(!recovering.count(oid));
+
+ start_recovery_op(oid);
+ recovering.insert(oid);
ObjectContextRef obc = get_object_context(oid, false);
- obc->ondisk_read_lock();
- (*pushes)[peer].push_back(PushOp());
- prep_push_to_replica(obc, oid, peer, cct->_conf->osd_recovery_op_priority,
- &((*pushes)[peer].back()));
- obc->ondisk_read_unlock();
+ pgbackend->recover_object(
+ oid,
+ ObjectContextRef(),
+ obc,
+ h);
}
void ReplicatedPG::scan_range(
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index e880bdecade..e24592e932f 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -33,6 +33,9 @@
#include "common/sharedptr_registry.hpp"
+#include "PGBackend.h"
+#include "ReplicatedBackend.h"
+
class MOSDSubOpReply;
class ReplicatedPG;
@@ -80,7 +83,7 @@ public:
virtual bool filter(bufferlist& xattr_data, bufferlist& outdata);
};
-class ReplicatedPG : public PG {
+class ReplicatedPG : public PG, public PGBackend::Listener {
friend class OSD;
friend class Watch;
@@ -122,6 +125,119 @@ public:
};
typedef boost::shared_ptr<CopyOp> CopyOpRef;
+ boost::scoped_ptr<PGBackend> pgbackend;
+
+ /// Listener methods
+ void on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t);
+ void on_local_recover(
+ const hobject_t &oid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectContextRef obc,
+ ObjectStore::Transaction *t
+ );
+ void on_peer_recover(
+ int peer,
+ const hobject_t &oid,
+ const ObjectRecoveryInfo &recovery_info,
+ const object_stat_sum_t &stat
+ );
+ void begin_peer_recover(
+ int peer,
+ const hobject_t oid);
+ void on_global_recover(
+ const hobject_t &oid);
+ void failed_push(int from, const hobject_t &soid);
+ void cancel_pull(const hobject_t &soid);
+
+ template <typename T>
+ class BlessedGenContext : public GenContext<T> {
+ ReplicatedPG *pg;
+ GenContext<T> *c;
+ epoch_t e;
+ public:
+ BlessedGenContext(ReplicatedPG *pg, GenContext<T> *c, epoch_t e)
+ : pg(pg), c(c), e(e) {}
+ void finish(T t) {
+ pg->lock();
+ if (pg->pg_has_reset_since(e))
+ delete c;
+ else
+ c->complete(t);
+ pg->unlock();
+ }
+ };
+ class BlessedContext : public Context {
+ ReplicatedPG *pg;
+ Context *c;
+ epoch_t e;
+ public:
+ BlessedContext(ReplicatedPG *pg, Context *c, epoch_t e)
+ : pg(pg), c(c), e(e) {}
+ void finish(int r) {
+ pg->lock();
+ if (pg->pg_has_reset_since(e))
+ delete c;
+ else
+ c->complete(r);
+ pg->unlock();
+ }
+ };
+ Context *bless_context(Context *c) {
+ return new BlessedContext(this, c, get_osdmap()->get_epoch());
+ }
+ GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) {
+ return new BlessedGenContext<ThreadPool::TPHandle&>(
+ this, c, get_osdmap()->get_epoch());
+ }
+
+ void send_message(int to_osd, Message *m) {
+ osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
+ }
+ void queue_transaction(ObjectStore::Transaction *t) {
+ osd->store->queue_transaction(osr.get(), t);
+ }
+ epoch_t get_epoch() {
+ return get_osdmap()->get_epoch();
+ }
+ const vector<int> &get_acting() {
+ return acting;
+ }
+ std::string gen_dbg_prefix() const { return gen_prefix(); }
+
+ const map<hobject_t, set<int> > &get_missing_loc() {
+ return missing_loc;
+ }
+ const map<int, pg_missing_t> &get_peer_missing() {
+ return peer_missing;
+ }
+ const map<int, pg_info_t> &get_peer_info() {
+ return peer_info;
+ }
+ const pg_missing_t &get_local_missing() {
+ return pg_log.get_missing();
+ }
+ const PGLog &get_log() {
+ return pg_log;
+ }
+ bool pgb_is_primary() const {
+ return is_primary();
+ }
+ OSDMapRef pgb_get_osdmap() const {
+ return get_osdmap();
+ }
+ const pg_info_t &get_info() const {
+ return info;
+ }
+ ObjectContextRef get_obc(
+ const hobject_t &hoid,
+ map<string, bufferptr> &attrs) {
+ return get_object_context(hoid, true, &attrs);
+ }
+
/*
* Capture all object state associated with an in-progress read or write.
*/
@@ -339,7 +455,11 @@ public:
protected:
ObjectContextRef create_object_context(const object_info_t& oi, SnapSetContext *ssc);
- ObjectContextRef get_object_context(const hobject_t& soid, bool can_create);
+ ObjectContextRef get_object_context(
+ const hobject_t& soid,
+ bool can_create,
+ map<string, bufferptr> *attrs = 0
+ );
void context_registry_on_change();
void object_context_destructor_callback(ObjectContext *obc);
@@ -362,8 +482,11 @@ protected:
void get_src_oloc(const object_t& oid, const object_locator_t& oloc, object_locator_t& src_oloc);
SnapSetContext *create_snapset_context(const object_t& oid);
- SnapSetContext *get_snapset_context(const object_t& oid, const string &key,
- ps_t seed, bool can_create, const string &nspace);
+ SnapSetContext *get_snapset_context(
+ const object_t& oid, const string &key,
+ ps_t seed, bool can_create, const string &nspace,
+ map<string, bufferptr> *attrs = 0
+ );
void register_snapset_context(SnapSetContext *ssc) {
Mutex::Locker l(snapset_contexts_lock);
_register_snapset_context(ssc);
@@ -378,90 +501,7 @@ protected:
}
void put_snapset_context(SnapSetContext *ssc);
- // push
- struct PushInfo {
- ObjectRecoveryProgress recovery_progress;
- ObjectRecoveryInfo recovery_info;
- int priority;
-
- void dump(Formatter *f) const {
- {
- f->open_object_section("recovery_progress");
- recovery_progress.dump(f);
- f->close_section();
- }
- {
- f->open_object_section("recovery_info");
- recovery_info.dump(f);
- f->close_section();
- }
- }
- };
- map<hobject_t, map<int, PushInfo> > pushing;
-
- // pull
- struct PullInfo {
- ObjectRecoveryProgress recovery_progress;
- ObjectRecoveryInfo recovery_info;
- int priority;
-
- void dump(Formatter *f) const {
- {
- f->open_object_section("recovery_progress");
- recovery_progress.dump(f);
- f->close_section();
- }
- {
- f->open_object_section("recovery_info");
- recovery_info.dump(f);
- f->close_section();
- }
- }
-
- bool is_complete() const {
- return recovery_progress.is_complete(recovery_info);
- }
- };
- map<hobject_t, PullInfo> pulling;
-
- ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info);
- static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
- const interval_set<uint64_t> &intervals_received,
- bufferlist data_received,
- interval_set<uint64_t> *intervals_usable,
- bufferlist *data_usable);
- bool handle_pull_response(
- int from, PushOp &op, PullOp *response,
- ObjectStore::Transaction *t);
- void handle_push(
- int from, PushOp &op, PushReplyOp *response,
- ObjectStore::Transaction *t);
- void send_pushes(int prio, map<int, vector<PushOp> > &pushes);
- int send_push(int priority, int peer,
- const ObjectRecoveryInfo& recovery_info,
- const ObjectRecoveryProgress &progress,
- ObjectRecoveryProgress *out_progress = 0);
- int build_push_op(const ObjectRecoveryInfo &recovery_info,
- const ObjectRecoveryProgress &progress,
- ObjectRecoveryProgress *out_progress,
- PushOp *out_op);
- int send_push_op_legacy(int priority, int peer,
- PushOp &pop);
-
- int send_pull_legacy(int priority, int peer,
- const ObjectRecoveryInfo& recovery_info,
- ObjectRecoveryProgress progress);
- void submit_push_data(ObjectRecoveryInfo &recovery_info,
- bool first,
- bool complete,
- const interval_set<uint64_t> &intervals_included,
- bufferlist data_included,
- bufferlist omap_header,
- map<string, bufferptr> &attrs,
- map<string, bufferlist> &omap_entries,
- ObjectStore::Transaction *t);
- void submit_push_complete(ObjectRecoveryInfo &recovery_info,
- ObjectStore::Transaction *t);
+ set<hobject_t> recovering;
/*
* Backfill
@@ -504,54 +544,17 @@ protected:
f->close_section();
}
{
- f->open_array_section("pull_from_peer");
- for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
- i != pull_from_peer.end();
+ f->open_array_section("recovering");
+ for (set<hobject_t>::const_iterator i = recovering.begin();
+ i != recovering.end();
++i) {
- f->open_object_section("pulling_from");
- f->dump_int("pull_from", i->first);
- {
- f->open_array_section("pulls");
- for (set<hobject_t>::const_iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- f->open_object_section("pull_info");
- assert(pulling.count(*j));
- pulling.find(*j)->second.dump(f);
- f->close_section();
- }
- f->close_section();
- }
- f->close_section();
+ f->dump_stream("object") << *i;
}
f->close_section();
}
{
- f->open_array_section("pushing");
- for (map<hobject_t, map<int, PushInfo> >::const_iterator i =
- pushing.begin();
- i != pushing.end();
- ++i) {
- f->open_object_section("object");
- f->dump_stream("pushing") << i->first;
- {
- f->open_array_section("pushing_to");
- for (map<int, PushInfo>::const_iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- f->open_object_section("push_progress");
- f->dump_stream("object_pushing") << j->first;
- {
- f->open_object_section("push_info");
- j->second.dump(f);
- f->close_section();
- }
- f->close_section();
- }
- f->close_section();
- }
- f->close_section();
- }
+ f->open_object_section("pg_backend");
+ pgbackend->dump_recovery_info(f);
f->close_section();
}
}
@@ -559,53 +562,19 @@ protected:
/// leading edge of backfill
hobject_t backfill_pos;
- // Reverse mapping from osd peer to objects beging pulled from that peer
- map<int, set<hobject_t> > pull_from_peer;
-
int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
- int priority,
- map<int, vector<PushOp> > *pushes);
- void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
- pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets);
- void calc_clone_subsets(SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets);
- void prep_push_to_replica(
- ObjectContextRef obc,
- const hobject_t& oid,
- int dest,
- int priority,
- PushOp *push_op);
- void prep_push(int priority,
- ObjectContextRef obc,
- const hobject_t& oid, int dest,
- PushOp *op);
- void prep_push(int priority,
- ObjectContextRef obc,
- const hobject_t& soid, int peer,
- eversion_t version,
- interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t> >& clone_subsets,
- PushOp *op);
- void prep_push_op_blank(const hobject_t& soid, PushOp *op);
+ PGBackend::RecoveryHandle *h);
void finish_degraded_object(const hobject_t& oid);
// Cancels/resets pulls from peer
void check_recovery_sources(const OSDMapRef map);
- void send_pulls(
- int priority,
- map<int, vector<PullOp> > &pulls);
- int prepare_pull(
- const hobject_t& oid, eversion_t v,
+ int recover_missing(
+ const hobject_t& oid,
+ eversion_t v,
int priority,
- map<int, vector<PullOp> > *pulls
- );
+ PGBackend::RecoveryHandle *h);
// low level ops
@@ -657,7 +626,7 @@ protected:
void prep_backfill_object_push(
hobject_t oid, eversion_t v, eversion_t have, int peer,
- map<int, vector<PushOp> > *pushes);
+ PGBackend::RecoveryHandle *h);
void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
@@ -731,35 +700,6 @@ protected:
pg->_committed_pushed_object(epoch, last_complete);
}
};
- struct C_OSD_SendMessageOnConn: public Context {
- OSDService *osd;
- Message *reply;
- ConnectionRef conn;
- C_OSD_SendMessageOnConn(
- OSDService *osd,
- Message *reply,
- ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {}
- void finish(int) {
- osd->send_message_osd_cluster(reply, conn.get());
- }
- };
- struct C_OSD_CompletedPull : public Context {
- ReplicatedPGRef pg;
- hobject_t hoid;
- epoch_t epoch;
- C_OSD_CompletedPull(
- ReplicatedPG *pg,
- const hobject_t &hoid,
- epoch_t epoch) : pg(pg), hoid(hoid), epoch(epoch) {}
- void finish(int) {
- pg->lock();
- if (!pg->pg_has_reset_since(epoch)) {
- pg->finish_recovery_op(hoid);
- }
- pg->unlock();
- }
- };
- friend struct C_OSD_CompletedPull;
struct C_OSD_AppliedRecoveredObjectReplica : public Context {
ReplicatedPGRef pg;
C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p) :
@@ -780,14 +720,6 @@ protected:
void _applied_recovered_object_replica();
void _committed_pushed_object(epoch_t epoch, eversion_t lc);
void recover_got(hobject_t oid, eversion_t v);
- void sub_op_push(OpRequestRef op);
- void _failed_push(int from, const hobject_t &soid);
- void sub_op_push_reply(OpRequestRef op);
- bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
- void sub_op_pull(OpRequestRef op);
- void handle_pull(int peer, PullOp &op, PushOp *reply);
-
- void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
// -- copyfrom --
map<hobject_t, CopyOpRef> copy_ops;
@@ -828,6 +760,9 @@ public:
int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata,
bufferlist& odata);
+ void do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle);
void do_op(OpRequestRef op);
bool pg_op_must_wait(MOSDOp *op);
void do_pg_op(OpRequestRef op);
@@ -837,17 +772,7 @@ public:
OpRequestRef op,
ThreadPool::TPHandle &handle);
void do_backfill(OpRequestRef op);
- void _do_push(OpRequestRef op);
- void _do_pull_response(OpRequestRef op);
- void do_push(OpRequestRef op) {
- if (is_primary()) {
- _do_pull_response(op);
- } else {
- _do_push(op);
- }
- }
- void do_pull(OpRequestRef op);
- void do_push_reply(OpRequestRef op);
+
RepGather *trim_object(const hobject_t &coid);
void snap_trimmer();
int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
@@ -857,16 +782,27 @@ public:
void do_osd_op_effects(OpContext *ctx);
private:
- bool temp_created;
- coll_t temp_coll;
- set<hobject_t> temp_contents; ///< contents of temp collection, clear on reset
uint64_t temp_seq; ///< last id for naming temp objects
coll_t get_temp_coll(ObjectStore::Transaction *t);
hobject_t generate_temp_object(); ///< generate a new temp object name
public:
- bool have_temp_coll();
- coll_t get_temp_coll() {
- return temp_coll;
+ void get_colls(list<coll_t> *out) {
+ out->push_back(coll);
+ return pgbackend->temp_colls(out);
+ }
+ void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) {
+ coll_t target = coll_t(child);
+ t->create_collection(target);
+ t->split_collection(
+ coll,
+ split_bits,
+ seed,
+ target);
+ pgbackend->split_colls(child, split_bits, seed, t);
}
private:
struct NotTrimming;
@@ -952,7 +888,10 @@ public:
void on_role_change();
void on_change(ObjectStore::Transaction *t);
void on_activate();
- void on_flushed();
+ void on_flushed() {
+ assert(object_contexts.empty());
+ pgbackend->on_flushed();
+ }
void on_removal(ObjectStore::Transaction *t);
void on_shutdown();
};
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 66cce34b264..884b8ada8cc 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -46,6 +46,7 @@
typedef hobject_t collection_list_handle_t;
+typedef uint8_t shard_id_t;
/**
* osd request identifier