summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-08-29 18:13:26 -0700
committerSamuel Just <sam.just@inktank.com>2013-09-26 11:24:25 -0700
commit62a1eb6f693a41dcb558d7c34e07e2aa5487de03 (patch)
treeccb71e959cd2a67f09ed7e3f0b2dc1be273cc93b
parentf94e6a30a746f9c3d93bb13d45da2b06ccc49673 (diff)
downloadceph-62a1eb6f693a41dcb558d7c34e07e2aa5487de03.tar.gz
osd/: add PGBackend interfaces and stubs
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--doc/dev/osd_internals/erasure_coding/recovery.rst4
-rw-r--r--src/osd/Makefile.am3
-rw-r--r--src/osd/PGBackend.h194
-rw-r--r--src/osd/ReplicatedBackend.cc158
-rw-r--r--src/osd/ReplicatedBackend.h252
-rw-r--r--src/osd/ReplicatedPG.cc3
-rw-r--r--src/osd/ReplicatedPG.h113
-rw-r--r--src/osd/osd_types.h1
8 files changed, 726 insertions, 2 deletions
diff --git a/doc/dev/osd_internals/erasure_coding/recovery.rst b/doc/dev/osd_internals/erasure_coding/recovery.rst
new file mode 100644
index 00000000000..793a5b003dc
--- /dev/null
+++ b/doc/dev/osd_internals/erasure_coding/recovery.rst
@@ -0,0 +1,4 @@
+===================
+PGBackend Recovery
+===================
+
diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am
index ea7c036f858..9d3bc1d5e47 100644
--- a/src/osd/Makefile.am
+++ b/src/osd/Makefile.am
@@ -9,6 +9,7 @@ libosd_la_SOURCES = \
osd/PG.cc \
osd/PGLog.cc \
osd/ReplicatedPG.cc \
+ osd/ReplicatedBackend.cc \
osd/Ager.cc \
osd/OSD.cc \
osd/OSDCap.cc \
@@ -35,6 +36,8 @@ noinst_HEADERS += \
osd/PG.h \
osd/PGLog.h \
osd/ReplicatedPG.h \
+ osd/PGBackend.h \
+ osd/ReplicatedBackend.h \
osd/Watch.h \
osd/osd_types.h
diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h
new file mode 100644
index 00000000000..6a77c72438d
--- /dev/null
+++ b/src/osd/PGBackend.h
@@ -0,0 +1,194 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef PGBACKEND_H
+#define PGBACKEND_H
+
+#include "osd_types.h"
+#include "include/Context.h"
+#include <string>
+
+ /**
+ * PGBackend
+ *
+ * PGBackend defines an interface for logic handling IO and
+ * replication on RADOS objects. The PGBackend implementation
+ * is responsible for:
+ *
+ * 1) Handling client operations
+ * 2) Handling object recovery
+ * 3) Handling object access
+ */
+ class PGBackend {
+ public:
+ /**
+ * Provides interfaces for PGBackend callbacks
+ *
+ * The intention is that the parent calls into the PGBackend
+ * implementation holding a lock and that the callbacks are
+ * called under the same locks.
+ */
+ class Listener {
+ public:
+ /// Recovery
+
+ virtual void on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t) = 0;
+ /**
+ * Called with the transaction recovering oid
+ */
+ virtual void on_local_recover(
+ const hobject_t &oid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t
+ ) = 0;
+
+ /**
+ * Called when transaction recovering oid is durable and
+ * applied on all replicas
+ */
+ virtual void on_global_recover(const hobject_t &oid) = 0;
+
+ /**
+ * Called when peer is recovered
+ */
+ virtual void on_peer_recover(
+ int peer,
+ const hobject_t &oid,
+ const ObjectRecoveryInfo &recovery_info
+ ) = 0;
+
+ virtual void failed_push(int from, const hobject_t &soid) = 0;
+
+ /**
+ * Bless a context
+ *
+ * Wraps a context in whatever outer layers the parent usually
+ * uses to call into the PGBackend
+ */
+ virtual Context *bless_context(Context *c) = 0;
+ virtual GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) = 0;
+
+ virtual void send_message(int to_osd, Message *m) = 0;
+ virtual void queue_transaction(ObjectStore::Transaction *t) = 0;
+ virtual epoch_t get_epoch() = 0;
+ virtual const vector<int> &get_acting() = 0;
+ virtual std::string gen_dbg_prefix() const = 0;
+
+ virtual const map<hobject_t, set<int> > &get_missing_loc() = 0;
+ virtual const map<int, pg_missing_t> &get_peer_missing() = 0;
+ virtual const pg_missing_t &get_local_missing() = 0;
+ virtual const PGLog &get_log() = 0;
+ virtual bool pgb_is_primary() const = 0;
+ virtual OSDMapRef pgb_get_osdmap() const = 0;
+ virtual const pg_info_t &get_info() const = 0;
+
+ virtual ObjectContextRef get_obc(
+ const hobject_t &hoid,
+ map<string, bufferptr> &attrs) = 0;
+
+ virtual ~Listener() {}
+ };
+ Listener *parent;
+ Listener *get_parent() const { return parent; }
+ PGBackend(Listener *l) : parent(l) {}
+ bool is_primary() const { return get_parent()->pgb_is_primary(); }
+ OSDMapRef get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+ std::string gen_prefix() const {
+ return parent->gen_dbg_prefix();
+ }
+
+ /**
+ * RecoveryHandle
+ *
+ * We may want to recover multiple objects in the same set of
+ * messages. RecoveryHandle is an interface for the opaque
+ * object used by the implementation to store the details of
+ * the pending recovery operations.
+ */
+ struct RecoveryHandle {
+ virtual ~RecoveryHandle() {}
+ };
+
+ /// Get a fresh recovery operation
+ virtual RecoveryHandle *open_recovery_op() = 0;
+
+ /// run_recovery_op: finish the operation represented by h
+ virtual void run_recovery_op(
+ RecoveryHandle *h, ///< [in] op to finish
+ int priority ///< [in] msg priority
+ ) = 0;
+
+ /**
+ * recover_object
+ *
+ * Triggers a recovery operation on the specified hobject_t
+ * onreadable must be called before onwriteable
+ *
+ * On each replica (primary included), get_parent()->on_not_missing()
+ * must be called when the transaction finalizing the recovery
+ * is queued. Similarly, get_parent()->on_readable() must be called
+ * when the transaction is applied in the backing store.
+ *
+ * get_parent()->on_not_degraded() should be called on the primary
+ * when writes can resume on the object.
+ *
+ * obc may be NULL if the primary lacks the object.
+ *
+ * head may be NULL only if the head/snapdir is missing
+ *
+ * @param missing [in] set of info, missing pairs for queried nodes
+ * @param overlaps [in] mapping of object to file offset overlaps
+ */
+ virtual void recover_object(
+ const hobject_t &hoid, ///< [in] object to recover
+ ObjectContextRef head, ///< [in] context of the head/snapdir object
+ ObjectContextRef obc, ///< [in] context of the object
+ RecoveryHandle *h ///< [in,out] handle to attach recovery op to
+ ) = 0;
+
+ /// gives PGBackend a crack at an incoming message
+ virtual bool handle_message(
+ OpRequestRef op ///< [in] message received
+ ) = 0; ///< @return true if the message was handled
+
+ /**
+ * implementation should clear itself, contexts blessed prior to on_change
+ * won't be called after on_change()
+ */
+ virtual void on_change(ObjectStore::Transaction *t) = 0;
+ virtual void clear_state() = 0;
+
+ virtual void on_flushed() = 0;
+
+
+ virtual void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) = 0;
+
+ virtual void temp_colls(list<coll_t> *out) = 0;
+
+ virtual void dump_recovery_info(Formatter *f) const = 0;
+
+ virtual ~PGBackend() {}
+ };
+
+#endif
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
new file mode 100644
index 00000000000..ecbfea9149b
--- /dev/null
+++ b/src/osd/ReplicatedBackend.cc
@@ -0,0 +1,158 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+#include "ReplicatedBackend.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPushReply.h"
+
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
+ return *_dout << pgb->get_parent()->gen_dbg_prefix();
+}
+
+ReplicatedBackend::ReplicatedBackend(
+ PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
+ PGBackend(pg), temp_created(false), coll(coll), osd(osd) {}
+
+void ReplicatedBackend::run_recovery_op(
+ PGBackend::RecoveryHandle *h,
+ int priority)
+{
+}
+
+void ReplicatedBackend::recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *h
+ )
+{
+#if 0
+ op.recovery_progress.data_complete = false;
+ op.recovery_progress.omap_complete = false;
+ op.recovery_progress.data_recovered_to = 0;
+ op.recovery_progress.first = true;
+
+ assert(!pulling.count(soid));
+ pull_from_peer[fromosd].insert(soid);
+ PullInfo &pi = pulling[soid];
+ pi.recovery_info = op.recovery_info;
+ pi.recovery_progress = op.recovery_progress;
+ pi.priority = priority;
+#endif
+ dout(10) << __func__ << dendl;
+}
+
+bool ReplicatedBackend::handle_message(
+ OpRequestRef op
+ )
+{
+ dout(10) << __func__ << ": " << op << dendl;
+ switch (op->request->get_type()) {
+ case MSG_OSD_PG_PUSH:
+ // TODOXXX: needs to be active possibly
+ do_push(op);
+ return true;
+
+ case MSG_OSD_PG_PULL:
+ do_pull(op);
+ return true;
+
+ case MSG_OSD_PG_PUSH_REPLY:
+ do_push_reply(op);
+ return true;
+
+ case MSG_OSD_SUBOP: {
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+ if (m->ops.size() >= 1) {
+ OSDOp *first = &m->ops[0];
+ switch (first->op.op) {
+ case CEPH_OSD_OP_PULL:
+ sub_op_pull(op);
+ return true;
+ case CEPH_OSD_OP_PUSH:
+ // TODOXXX: needs to be active possibly
+ sub_op_push(op);
+ return true;
+ }
+ }
+ break;
+ }
+
+ case MSG_OSD_SUBOPREPLY:
+ MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
+ if (r->ops.size() >= 1) {
+ OSDOp &first = r->ops[0];
+ switch (first.op.op) {
+ case CEPH_OSD_OP_PUSH:
+ // continue peer recovery
+ sub_op_push_reply(op);
+ return true;
+ }
+ }
+ break;
+ }
+ return false;
+}
+
+void ReplicatedBackend::clear_state()
+{
+ // clear pushing/pulling maps
+ pushing.clear();
+ pulling.clear();
+ pull_from_peer.clear();
+}
+
+void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
+{
+ dout(10) << __func__ << dendl;
+ // clear temp
+ for (set<hobject_t>::iterator i = temp_contents.begin();
+ i != temp_contents.end();
+ ++i) {
+ dout(10) << __func__ << ": Removing oid "
+ << *i << " from the temp collection" << dendl;
+ t->remove(get_temp_coll(t), *i);
+ }
+ temp_contents.clear();
+ clear_state();
+}
+
+coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t)
+{
+ if (temp_created)
+ return temp_coll;
+ if (!osd->store->collection_exists(temp_coll))
+ t->create_collection(temp_coll);
+ temp_created = true;
+ return temp_coll;
+}
+
+void ReplicatedBackend::on_flushed()
+{
+ if (have_temp_coll() &&
+ !osd->store->collection_empty(get_temp_coll())) {
+ vector<hobject_t> objects;
+ osd->store->collection_list(get_temp_coll(), objects);
+ derr << __func__ << ": found objects in the temp collection: "
+ << objects << ", crashing now"
+ << dendl;
+ assert(0 == "found garbage in the temp collection");
+ }
+}
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
new file mode 100644
index 00000000000..f2a7e4ca9e0
--- /dev/null
+++ b/src/osd/ReplicatedBackend.h
@@ -0,0 +1,252 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef REPBACKEND_H
+#define REPBACKEND_H
+
+#include "OSD.h"
+#include "PGBackend.h"
+#include "osd_types.h"
+
+class ReplicatedBackend : public PGBackend {
+ struct RPGHandle : public PGBackend::RecoveryHandle {
+ map<int, vector<PushOp> > pushes;
+ map<int, vector<PushReplyOp> > push_replies;
+ map<int, vector<PullOp> > pulls;
+ };
+private:
+ bool temp_created;
+ coll_t temp_coll;
+ coll_t get_temp_coll(ObjectStore::Transaction *t);
+ coll_t get_temp_coll() const {
+ return temp_coll;
+ }
+ bool have_temp_coll() const { return temp_created; }
+
+ // Track contents of temp collection, clear on reset
+ set<hobject_t> temp_contents;
+public:
+ coll_t coll;
+ OSDService *osd;
+
+ ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
+
+ /// @see PGBackend::open_recovery_op
+ PGBackend::RecoveryHandle *open_recovery_op() {
+ return new RPGHandle();
+ }
+
+ /// @see PGBackend::run_recovery_op
+ void run_recovery_op(
+ PGBackend::RecoveryHandle *h,
+ int priority);
+
+ /// @see PGBackend::recover_object
+ void recover_object(
+ const hobject_t &hoid,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *h
+ );
+
+ /// @see PGBackend::handle_message
+ bool handle_message(
+ OpRequestRef op
+ );
+
+ void on_change(ObjectStore::Transaction *t);
+ void clear_state();
+ void on_flushed();
+
+ void temp_colls(list<coll_t> *out) {
+ if (temp_created)
+ out->push_back(temp_coll);
+ }
+ void split_colls(
+ pg_t child,
+ int split_bits,
+ int seed,
+ ObjectStore::Transaction *t) {
+ if (!temp_created)
+ return;
+ t->create_collection(temp_coll);
+ t->split_collection(
+ temp_coll,
+ split_bits,
+ seed,
+ coll_t::make_temp_coll(child));
+ }
+
+ virtual void dump_recovery_info(Formatter *f) const {
+ {
+ f->open_array_section("pull_from_peer");
+ for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
+ i != pull_from_peer.end();
+ ++i) {
+ f->open_object_section("pulling_from");
+ f->dump_int("pull_from", i->first);
+ {
+ f->open_array_section("pulls");
+ for (set<hobject_t>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("pull_info");
+ assert(pulling.count(*j));
+ pulling.find(*j)->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ {
+ f->open_array_section("pushing");
+ for (map<hobject_t, map<int, PushInfo> >::const_iterator i =
+ pushing.begin();
+ i != pushing.end();
+ ++i) {
+ f->open_object_section("object");
+ f->dump_stream("pushing") << i->first;
+ {
+ f->open_array_section("pushing_to");
+ for (map<int, PushInfo>::const_iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ f->open_object_section("push_progress");
+ f->dump_stream("object_pushing") << j->first;
+ {
+ f->open_object_section("push_info");
+ j->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ f->close_section();
+ }
+ }
+private:
+ // push
+ struct PushInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ int priority;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+ };
+ map<hobject_t, map<int, PushInfo> > pushing;
+
+ // pull
+ struct PullInfo {
+ ObjectRecoveryProgress recovery_progress;
+ ObjectRecoveryInfo recovery_info;
+ int priority;
+
+ void dump(Formatter *f) const {
+ {
+ f->open_object_section("recovery_progress");
+ recovery_progress.dump(f);
+ f->close_section();
+ }
+ {
+ f->open_object_section("recovery_info");
+ recovery_info.dump(f);
+ f->close_section();
+ }
+ }
+
+ bool is_complete() const {
+ return recovery_progress.is_complete(recovery_info);
+ }
+ };
+ map<hobject_t, PullInfo> pulling;
+
+ // Reverse mapping from osd peer to objects beging pulled from that peer
+ map<int, set<hobject_t> > pull_from_peer;
+
+ void sub_op_push(OpRequestRef op) {}
+ void sub_op_push_reply(OpRequestRef op) {}
+ void sub_op_pull(OpRequestRef op) {}
+
+ void _do_push(OpRequestRef op) {}
+ void _do_pull_response(OpRequestRef op) {}
+ void do_push(OpRequestRef op) {
+ if (is_primary()) {
+ _do_pull_response(op);
+ } else {
+ _do_push(op);
+ }
+ }
+ void do_pull(OpRequestRef op) {}
+ void do_push_reply(OpRequestRef op) {}
+
+ bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) { return true; }
+ void handle_pull(int peer, PullOp &op, PushOp *reply) {}
+ bool handle_pull_response(int from, PushOp &op, PullOp *response,
+ ObjectStore::Transaction *t) { return true; }
+ void handle_push(int from, PushOp &op, PushReplyOp *response,
+ ObjectStore::Transaction *t) {}
+
+ static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
+ const interval_set<uint64_t> &intervals_received,
+ bufferlist data_received,
+ interval_set<uint64_t> *intervals_usable,
+ bufferlist *data_usable) {}
+ void _failed_push(int from, const hobject_t &soid) {}
+
+ void send_pushes(int prio, map<int, vector<PushOp> > &pushes) {}
+ void prep_push_op_blank(const hobject_t& soid, PushOp *op) {}
+ int send_push_op_legacy(int priority, int peer,
+ PushOp &pop) { return 1; }
+ int send_pull_legacy(int priority, int peer,
+ const ObjectRecoveryInfo& recovery_info,
+ ObjectRecoveryProgress progress) { return 1;}
+ void send_pulls(
+ int priority,
+ map<int, vector<PullOp> > &pulls) {}
+
+ int build_push_op(const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryProgress &progress,
+ ObjectRecoveryProgress *out_progress,
+ PushOp *out_op) { return 1; }
+ void submit_push_data(ObjectRecoveryInfo &recovery_info,
+ bool first,
+ bool complete,
+ const interval_set<uint64_t> &intervals_included,
+ bufferlist data_included,
+ bufferlist omap_header,
+ map<string, bufferptr> &attrs,
+ map<string, bufferlist> &omap_entries,
+ ObjectStore::Transaction *t) {}
+ void submit_push_complete(ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t) {}
+};
+
+#endif
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 8b72c8494ed..f158a3b4d74 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -628,6 +628,7 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid),
+ pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
temp_created(false),
temp_coll(coll_t::make_temp_coll(p)),
@@ -7228,6 +7229,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
// any dups
apply_and_flush_repops(is_primary());
+ pgbackend->on_change(t);
// clear pushing/pulling maps
pushing.clear();
pulling.clear();
@@ -7267,6 +7269,7 @@ void ReplicatedPG::_clear_recovery_state()
backfill_pos = hobject_t();
backfills_in_flight.clear();
pending_backfill_updates.clear();
+ pgbackend->clear_state();
pulling.clear();
pushing.clear();
pull_from_peer.clear();
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 4fb5e4d2d8e..595fec5eed4 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -33,6 +33,9 @@
#include "common/sharedptr_registry.hpp"
+#include "PGBackend.h"
+#include "ReplicatedBackend.h"
+
class MOSDSubOpReply;
class ReplicatedPG;
@@ -80,7 +83,7 @@ public:
virtual bool filter(bufferlist& xattr_data, bufferlist& outdata);
};
-class ReplicatedPG : public PG {
+class ReplicatedPG : public PG, public PGBackend::Listener {
friend class OSD;
friend class Watch;
@@ -122,6 +125,109 @@ public:
};
typedef boost::shared_ptr<CopyOp> CopyOpRef;
+ boost::scoped_ptr<PGBackend> pgbackend;
+
+ /// Listener methods
+ void on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t) {}
+ void on_local_recover(
+ const hobject_t &oid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &recovery_info,
+ ObjectStore::Transaction *t
+ ) {}
+ void on_peer_recover(
+ int peer,
+ const hobject_t &oid,
+ const ObjectRecoveryInfo &recovery_info) {}
+ void on_global_recover(
+ const hobject_t &oid) {}
+ void failed_push(int from, const hobject_t &soid);
+
+ template <typename T>
+ class BlessedGenContext : public GenContext<T> {
+ ReplicatedPG *pg;
+ GenContext<T> *c;
+ epoch_t e;
+ public:
+ BlessedGenContext(ReplicatedPG *pg, GenContext<T> *c, epoch_t e)
+ : pg(pg), c(c), e(e) {}
+ void finish(T t) {
+ pg->lock();
+ if (pg->pg_has_reset_since(e))
+ delete c;
+ else
+ c->complete(t);
+ pg->unlock();
+ }
+ };
+ class BlessedContext : public Context {
+ ReplicatedPG *pg;
+ Context *c;
+ epoch_t e;
+ public:
+ BlessedContext(ReplicatedPG *pg, Context *c, epoch_t e)
+ : pg(pg), c(c), e(e) {}
+ void finish(int r) {
+ pg->lock();
+ if (pg->pg_has_reset_since(e))
+ delete c;
+ else
+ c->complete(r);
+ pg->unlock();
+ }
+ };
+ Context *bless_context(Context *c) {
+ return new BlessedContext(this, c, get_osdmap()->get_epoch());
+ }
+ GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) {
+ return new BlessedGenContext<ThreadPool::TPHandle&>(
+ this, c, get_osdmap()->get_epoch());
+ }
+
+ void send_message(int to_osd, Message *m) {
+ osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
+ }
+ void queue_transaction(ObjectStore::Transaction *t) {
+ osd->store->queue_transaction(osr.get(), t);
+ }
+ epoch_t get_epoch() {
+ return get_osdmap()->get_epoch();
+ }
+ const vector<int> &get_acting() {
+ return acting;
+ }
+ std::string gen_dbg_prefix() const { return gen_prefix(); }
+
+ const map<hobject_t, set<int> > &get_missing_loc() {
+ return missing_loc;
+ }
+ const map<int, pg_missing_t> &get_peer_missing() {
+ return peer_missing;
+ }
+ const pg_missing_t &get_local_missing() {
+ return pg_log.get_missing();
+ }
+ const PGLog &get_log() {
+ return pg_log;
+ }
+ bool pgb_is_primary() const {
+ return is_primary();
+ }
+ OSDMapRef pgb_get_osdmap() const {
+ return get_osdmap();
+ }
+ const pg_info_t &get_info() const {
+ return info;
+ }
+ ObjectContextRef get_obc(
+ const hobject_t &hoid,
+ map<string, bufferptr> &attrs) {
+ return get_object_context(hoid, true, &attrs);
+ }
+
/*
* Capture all object state associated with an in-progress read or write.
*/
@@ -955,7 +1061,10 @@ public:
void on_role_change();
void on_change(ObjectStore::Transaction *t);
void on_activate();
- void on_flushed();
+ void on_flushed() {
+ assert(object_contexts.empty());
+ pgbackend->on_flushed();
+ }
void on_removal(ObjectStore::Transaction *t);
void on_shutdown();
};
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 091b2b95e8f..1df55ce5cab 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -45,6 +45,7 @@
typedef hobject_t collection_list_handle_t;
+typedef uint8_t shard_id_t;
/**
* osd request identifier