summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-09-09 15:41:10 -0700
committerSamuel Just <sam.just@inktank.com>2013-09-14 00:45:34 -0700
commit07f6c7d025cff67b516e72c130f6f1c1145171cc (patch)
treefd538b4b4dd9107cb78599bf6cec8bfd74e5b2b2
parent7e1aaeb742f385b6c9c52dacea33b17f77b433a0 (diff)
downloadceph-07f6c7d025cff67b516e72c130f6f1c1145171cc.tar.gz
ReplicatedBackend: wire in start_pushes
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/ReplicatedBackend.cc6
-rw-r--r--src/osd/ReplicatedBackend.h13
-rw-r--r--src/osd/ReplicatedPG.cc62
3 files changed, 71 insertions, 10 deletions
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index 9b80a31905c..59ce9bbcceb 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -62,7 +62,11 @@ void ReplicatedBackend::recover_object(
} else {
assert(obc);
assert(head);
- // TODOSAM: do the needfull!
+ int started = start_pushes(
+ hoid,
+ obc,
+ h);
+ assert(started > 0);
}
}
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
index 7329de0be9f..7f413ca2a62 100644
--- a/src/osd/ReplicatedBackend.h
+++ b/src/osd/ReplicatedBackend.h
@@ -19,11 +19,13 @@
#include "PGBackend.h"
#include "osd_types.h"
+struct C_ReplicatedBackend_OnPullComplete;
class ReplicatedBackend : public PGBackend {
struct RPGHandle : public PGBackend::RecoveryHandle {
map<int, vector<PushOp> > pushes;
map<int, vector<PullOp> > pulls;
};
+ friend struct C_ReplicatedBackend_OnPullComplete;
private:
bool temp_created;
const coll_t temp_coll;
@@ -43,9 +45,12 @@ public:
ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
/// @see PGBackend::open_recovery_op
- PGBackend::RecoveryHandle *open_recovery_op() {
+ RPGHandle *_open_recovery_op() {
return new RPGHandle();
}
+ PGBackend::RecoveryHandle *open_recovery_op() {
+ return _open_recovery_op();
+ }
/// @see PGBackend::run_recovery_op
void run_recovery_op(
@@ -215,8 +220,10 @@ private:
bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
void handle_pull(int peer, PullOp &op, PushOp *reply);
- bool handle_pull_response(int from, PushOp &op, PullOp *response,
- ObjectStore::Transaction *t);
+ bool handle_pull_response(
+ int from, PushOp &op, PullOp *response,
+ list<pair<hobject_t, ObjectContextRef> > *to_continue,
+ ObjectStore::Transaction *t);
void handle_push(int from, PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index f8cdf22a922..3bc8e9752bb 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -1594,6 +1594,28 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
get_parent()->queue_transaction(t);
}
+struct C_ReplicatedBackend_OnPullComplete : Context {
+ ReplicatedBackend *bc;
+ list<pair<hobject_t, ObjectContextRef> > to_continue;
+ int priority;
+ C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+ : bc(bc), priority(priority) {}
+
+ void finish(int) {
+ ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+ for (list<pair<hobject_t, ObjectContextRef> >::iterator i =
+ to_continue.begin();
+ i != to_continue.end();
+ ++i) {
+ if (!bc->start_pushes(i->first, i->second, h)) {
+ bc->get_parent()->on_global_recover(
+ i->first);
+ }
+ }
+ bc->run_recovery_op(h, priority);
+ }
+};
+
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
@@ -1602,13 +1624,23 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
vector<PullOp> replies(1);
ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ list<pair<hobject_t, ObjectContextRef> > to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
- bool more = handle_pull_response(from, *i, &(replies.back()), t);
+ bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
if (more)
replies.push_back(PullOp());
}
+ if (!to_continue.empty()) {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ m->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ get_parent()->bless_context(c));
+ }
replies.erase(replies.end() - 1);
if (replies.size()) {
@@ -6152,7 +6184,9 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
bool ReplicatedBackend::handle_pull_response(
int from, PushOp &pop, PullOp *response,
- ObjectStore::Transaction *t)
+ list<pair<hobject_t, ObjectContextRef> > *to_continue,
+ ObjectStore::Transaction *t
+ )
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
@@ -6225,11 +6259,14 @@ bool ReplicatedBackend::handle_pull_response(
pi.stat.num_keys_recovered += pop.omap_entries.size();
if (complete) {
- pulling.erase(hoid);
- pull_from_peer[from].erase(hoid);
+ to_continue->push_back(make_pair(hoid, pi.obc));
pi.stat.num_objects_recovered++;
get_parent()->on_local_recover(
hoid, pi.stat, pi.recovery_info, pi.obc, t);
+ pulling.erase(hoid);
+ pull_from_peer[from].erase(hoid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
return false;
} else {
response->soid = pop.soid;
@@ -6813,14 +6850,27 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
if (is_primary()) {
PullOp resp;
- bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
+ RPGHandle *h = _open_recovery_op();
+ list<pair<hobject_t, ObjectContextRef> > to_continue;
+ bool more = handle_pull_response(
+ m->get_source().num(), pop, &resp,
+ &to_continue, t);
if (more) {
send_pull_legacy(
m->get_priority(),
m->get_source().num(),
resp.recovery_info,
resp.recovery_progress);
- }
+ } else {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ op->request->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ get_parent()->bless_context(c));
+ }
+ run_recovery_op(h, op->request->get_priority());
} else {
PushReplyOp resp;
MOSDSubOpReply *reply = new MOSDSubOpReply(