summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Just <sam.just@inktank.com>2012-11-30 11:20:41 -0800
committerSamuel Just <sam.just@inktank.com>2012-11-30 11:20:41 -0800
commita928b6dbf630b63108aa7805adf9601253d40397 (patch)
treebde36821d26d55d194d6d995facd328e34337312
parent47699f39b9c185454ff86168e4a95b6e5280ae12 (diff)
downloadceph-a928b6dbf630b63108aa7805adf9601253d40397.tar.gz
OSDService: make messengers private
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/osd/OSD.h11
-rw-r--r--src/osd/PG.cc14
-rw-r--r--src/osd/ReplicatedPG.cc34
3 files changed, 35 insertions, 24 deletions
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index a749ec178db..07f1d24182e 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -169,8 +169,10 @@ public:
ObjectStore *&store;
LogClient &clog;
PGRecoveryStats &pg_recovery_stats;
+private:
Messenger *&cluster_messenger;
Messenger *&client_messenger;
+public:
PerfCounters *&logger;
MonClient *&monc;
ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq;
@@ -228,6 +230,15 @@ public:
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+ void send_message_osd_cluster(Message *m, Connection *con) {
+ cluster_messenger->send_message(m, con);
+ }
+ void send_message_osd_client(Message *m, Connection *con) {
+ client_messenger->send_message(m, con);
+ }
+ entity_name_t get_cluster_msgr_name() {
+ return cluster_messenger->get_myname();
+ }
// -- scrub scheduling --
Mutex sched_scrub_lock;
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 55348e6c786..d4dd95c959d 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -3029,7 +3029,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op)
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
::encode(scrubber.reserved, reply->get_data());
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
@@ -3087,7 +3087,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op)
scrubber.reserved = false;
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
void PG::clear_scrub_reserved()
@@ -3354,7 +3354,7 @@ void PG::replica_scrub(MOSDRepScrub *msg)
::encode(map, subop->get_data());
subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, msg->get_connection());
+ osd->send_message_osd_cluster(subop, msg->get_connection());
}
/* Scrub:
@@ -4227,7 +4227,7 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch)
ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
if (con) {
osd->osd->_share_map_outgoing(from, con.get(), get_osdmap());
- osd->cluster_messenger->send_message(mlog, con.get());
+ osd->send_message_osd_cluster(mlog, con.get());
} else {
mlog->put();
}
@@ -5249,7 +5249,7 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con
pg->backfill_target, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
new MBackfillReserve(
MBackfillReserve::REQUEST,
pg->info.pgid,
@@ -5495,7 +5495,7 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con
ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
new MRecoveryReserve(MRecoveryReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
@@ -5542,7 +5542,7 @@ void PG::RecoveryState::Recovering::release_reservations()
ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
- pg->osd->cluster_messenger->send_message(
+ pg->osd->send_message_osd_cluster(
new MRecoveryReserve(MRecoveryReserve::RELEASE,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index de259eb1673..1f97311b813 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -557,7 +557,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
reply->set_data(outdata);
reply->set_result(result);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
delete filter;
}
@@ -872,7 +872,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (already_ack(oldv)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->add_flags(CEPH_OSD_FLAG_ACK);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
} else {
dout(10) << " waiting for " << oldv << " to ack" << dendl;
waiting_for_ack[oldv].push_back(op);
@@ -981,7 +981,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
MOSDOpReply *reply = ctx->reply;
ctx->reply = NULL;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
@@ -1179,7 +1179,7 @@ void ReplicatedPG::do_scan(OpRequestRef op)
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid, bi.begin, bi.end);
::encode(bi.objects, reply->get_data());
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
break;
@@ -1238,7 +1238,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op)
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid);
reply->set_priority(g_conf->osd_recovery_op_priority);
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
queue_peering_event(
CephPeeringEvtRef(
new CephPeeringEvt(
@@ -1340,7 +1340,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
- osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
@@ -3367,7 +3367,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
/* there is a pending notification for this watcher, we should resend it anyway
even if we already sent it as it might not have received it */
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->client_messenger->send_message(notify_msg, session->con);
+ osd->send_message_osd_client(notify_msg, session->con);
}
}
}
@@ -3423,7 +3423,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
s->add_notif(notif, name);
MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
- osd->client_messenger->send_message(notify_msg, s->con);
+ osd->send_message_osd_client(notify_msg, s->con);
} else {
// unconnected
entity_name_t name = i->first;
@@ -3841,7 +3841,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
repop->sent_disk = true;
}
}
@@ -3858,7 +3858,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
MOSDOp *m = (MOSDOp*)(*i)->request;
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->add_flags(CEPH_OSD_FLAG_ACK);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
}
waiting_for_ack.erase(repop->v);
}
@@ -3873,7 +3873,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
- osd->client_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_client(reply, m->get_connection());
repop->sent_ack = true;
}
@@ -4142,7 +4142,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
- osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
&obc->obs, obc->ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
@@ -4965,7 +4965,7 @@ int ReplicatedPG::pull(
void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer)
{
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
dout(10) << "send_remove_op " << oid << " from osd." << peer
<< " tid " << tid << dendl;
@@ -5093,7 +5093,7 @@ int ReplicatedPG::send_pull(int prio, int peer,
{
// send op
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
dout(10) << "send_pull_op " << recovery_info.soid << " "
<< recovery_info.version
@@ -5405,7 +5405,7 @@ void ReplicatedPG::handle_push(OpRequestRef op)
MOSDSubOpReply *reply = new MOSDSubOpReply(
m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
- osd->cluster_messenger->send_message(reply, m->get_connection());
+ osd->send_message_osd_cluster(reply, m->get_connection());
}
int ReplicatedPG::send_push(int prio, int peer,
@@ -5416,7 +5416,7 @@ int ReplicatedPG::send_push(int prio, int peer,
ObjectRecoveryProgress new_progress = progress;
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid,
false, 0, get_osdmap()->get_epoch(),
tid, recovery_info.version);
@@ -5516,7 +5516,7 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
{
// send a blank push back to the primary
tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid);
+ osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0,
get_osdmap()->get_epoch(), tid, eversion_t());
subop->ops = vector<OSDOp>(1);