summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2013-08-29 18:16:55 -0700
committerSamuel Just <sam.just@inktank.com>2013-09-26 11:24:25 -0700
commit3148b12141c8d61d43295bfd7ad3aafc3b3cd484 (patch)
tree4fb035b98a834f63dd4f426afcf6f3c4facf7b9e
parent30ac934e22e4686b326609371a8c362ebfbfc075 (diff)
downloadceph-3148b12141c8d61d43295bfd7ad3aafc3b3cd484.tar.gz
PG,ReplicatedPG: handle do_request in ReplicatedPG,PGBackend
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/PG.cc70
-rw-r--r--src/osd/PG.h7
-rw-r--r--src/osd/ReplicatedBackend.cc8
-rw-r--r--src/osd/ReplicatedPG.cc69
-rw-r--r--src/osd/ReplicatedPG.h3
5 files changed, 68 insertions, 89 deletions
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index f319d160a39..919d3e3913a 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1399,76 +1399,6 @@ void PG::queue_op(OpRequestRef op)
osd->op_wq.queue(make_pair(PGRef(this), op));
}
-void PG::do_request(
- OpRequestRef op,
- ThreadPool::TPHandle &handle)
-{
- // do any pending flush
- do_pending_flush();
-
- if (!op_has_sufficient_caps(op)) {
- osd->reply_op_error(op, -EPERM);
- return;
- }
- assert(!op_must_wait_for_map(get_osdmap(), op));
- if (can_discard_request(op)) {
- return;
- }
- if (!flushed) {
- dout(20) << " !flushed, waiting for active on " << op << dendl;
- waiting_for_active.push_back(op);
- return;
- }
-
- switch (op->request->get_type()) {
- case CEPH_MSG_OSD_OP:
- if (is_replay() || !is_active()) {
- dout(20) << " replay, waiting for active on " << op << dendl;
- waiting_for_active.push_back(op);
- return;
- }
- do_op(op); // do it now
- break;
-
- case MSG_OSD_SUBOP:
- do_sub_op(op);
- break;
-
- case MSG_OSD_SUBOPREPLY:
- do_sub_op_reply(op);
- break;
-
- case MSG_OSD_PG_SCAN:
- do_scan(op, handle);
- break;
-
- case MSG_OSD_PG_BACKFILL:
- do_backfill(op);
- break;
-
- case MSG_OSD_PG_PUSH:
- if (!is_active()) {
- waiting_for_active.push_back(op);
- op->mark_delayed("waiting for active");
- return;
- }
- do_push(op);
- break;
-
- case MSG_OSD_PG_PULL:
- do_pull(op);
- break;
-
- case MSG_OSD_PG_PUSH_REPLY:
- do_push_reply(op);
- break;
-
- default:
- assert(0 == "bad message type in do_request");
- }
-}
-
-
void PG::replay_queued_ops()
{
assert(is_replay() && is_active());
diff --git a/src/osd/PG.h b/src/osd/PG.h
index b869a0e5e23..74809eea268 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -1793,10 +1793,10 @@ public:
// abstract bits
- void do_request(
+ virtual void do_request(
OpRequestRef op,
ThreadPool::TPHandle &handle
- );
+ ) = 0;
virtual void do_op(OpRequestRef op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
@@ -1806,9 +1806,6 @@ public:
ThreadPool::TPHandle &handle
) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
- virtual void do_push(OpRequestRef op) = 0;
- virtual void do_pull(OpRequestRef op) = 0;
- virtual void do_push_reply(OpRequestRef op) = 0;
virtual void snap_trimmer() = 0;
virtual int do_command(cmdmap_t cmdmap, ostream& ss,
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
index d020b18d901..da57630e78b 100644
--- a/src/osd/ReplicatedBackend.cc
+++ b/src/osd/ReplicatedBackend.cc
@@ -92,12 +92,14 @@ bool ReplicatedBackend::handle_message(
// TODOXXX: needs to be active possibly
sub_op_push(op);
return true;
+ default:
+ break;
}
}
break;
}
- case MSG_OSD_SUBOPREPLY:
+ case MSG_OSD_SUBOPREPLY: {
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
if (r->ops.size() >= 1) {
OSDOp &first = r->ops[0];
@@ -110,6 +112,10 @@ bool ReplicatedBackend::handle_message(
}
break;
}
+
+ default:
+ break;
+ }
return false;
}
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 787a6082b45..f7bcdd2949b 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -643,6 +643,62 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
src_oloc.key = oid.name;
}
+void ReplicatedPG::do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
+{
+ // do any pending flush
+ do_pending_flush();
+
+ if (!op_has_sufficient_caps(op)) {
+ osd->reply_op_error(op, -EPERM);
+ return;
+ }
+ assert(!op_must_wait_for_map(get_osdmap(), op));
+ if (can_discard_request(op)) {
+ return;
+ }
+ if (!flushed) {
+ dout(20) << " !flushed, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+
+ if (pgbackend->handle_message(op))
+ return;
+
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ if (is_replay() || !is_active()) {
+ dout(20) << " replay, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+ do_op(op); // do it now
+ break;
+
+ case MSG_OSD_SUBOP:
+ do_sub_op(op);
+ break;
+
+ case MSG_OSD_SUBOPREPLY:
+ do_sub_op_reply(op);
+ break;
+
+ case MSG_OSD_PG_SCAN:
+ do_scan(op, handle);
+ break;
+
+ case MSG_OSD_PG_BACKFILL:
+ do_backfill(op);
+ break;
+
+ default:
+ assert(0 == "bad message type in do_request");
+ }
+}
+
+
/** do_op - do an op
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
@@ -1258,11 +1314,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
OSDOp *first = NULL;
if (m->ops.size() >= 1) {
first = &m->ops[0];
- switch (first->op.op) {
- case CEPH_OSD_OP_PULL:
- sub_op_pull(op);
- return;
- }
}
if (!is_active()) {
@@ -1273,9 +1324,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
if (first) {
switch (first->op.op) {
- case CEPH_OSD_OP_PUSH:
- sub_op_push(op);
- return;
case CEPH_OSD_OP_DELETE:
sub_op_remove(op);
return;
@@ -1304,11 +1352,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
if (r->ops.size() >= 1) {
OSDOp& first = r->ops[0];
switch (first.op.op) {
- case CEPH_OSD_OP_PUSH:
- // continue peer recovery
- sub_op_push_reply(op);
- return;
-
case CEPH_OSD_OP_SCRUB_RESERVE:
sub_op_scrub_reserve_reply(op);
return;
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 32300222b05..24f001b0fba 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -903,6 +903,9 @@ public:
int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata,
bufferlist& odata);
+ void do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle);
void do_op(OpRequestRef op);
bool pg_op_must_wait(MOSDOp *op);
void do_pg_op(OpRequestRef op);