diff options
author | Samuel Just <sam.just@inktank.com> | 2012-11-30 11:20:41 -0800 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2012-11-30 11:20:41 -0800 |
commit | a928b6dbf630b63108aa7805adf9601253d40397 (patch) | |
tree | bde36821d26d55d194d6d995facd328e34337312 | |
parent | 47699f39b9c185454ff86168e4a95b6e5280ae12 (diff) | |
download | ceph-a928b6dbf630b63108aa7805adf9601253d40397.tar.gz |
OSDService: make messengers private
Signed-off-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/osd/OSD.h | 11 | ||||
-rw-r--r-- | src/osd/PG.cc | 14 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 34 |
3 files changed, 35 insertions, 24 deletions
diff --git a/src/osd/OSD.h b/src/osd/OSD.h index a749ec178db..07f1d24182e 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -169,8 +169,10 @@ public: ObjectStore *&store; LogClient &clog; PGRecoveryStats &pg_recovery_stats; +private: Messenger *&cluster_messenger; Messenger *&client_messenger; +public: PerfCounters *&logger; MonClient *&monc; ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq; @@ -228,6 +230,15 @@ public: ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch); void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch); + void send_message_osd_cluster(Message *m, Connection *con) { + cluster_messenger->send_message(m, con); + } + void send_message_osd_client(Message *m, Connection *con) { + client_messenger->send_message(m, con); + } + entity_name_t get_cluster_msgr_name() { + return cluster_messenger->get_myname(); + } // -- scrub scheduling -- Mutex sched_scrub_lock; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 55348e6c786..d4dd95c959d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3029,7 +3029,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op) MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); ::encode(scrubber.reserved, reply->get_data()); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } void PG::sub_op_scrub_reserve_reply(OpRequestRef op) @@ -3087,7 +3087,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op) scrubber.reserved = false; MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } void PG::clear_scrub_reserved() @@ -3354,7 +3354,7 @@ void PG::replica_scrub(MOSDRepScrub *msg) ::encode(map, subop->get_data()); subop->ops = scrub; - osd->cluster_messenger->send_message(subop, msg->get_connection()); + osd->send_message_osd_cluster(subop, msg->get_connection()); } /* Scrub: @@ -4227,7 +4227,7 @@ void PG::fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch) ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch()); if (con) { osd->osd->_share_map_outgoing(from, con.get(), get_osdmap()); - osd->cluster_messenger->send_message(mlog, con.get()); + osd->send_message_osd_cluster(mlog, con.get()); } else { mlog->put(); } @@ -5249,7 +5249,7 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con pg->backfill_target, pg->get_osdmap()->get_epoch()); if (con) { if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) { - pg->osd->cluster_messenger->send_message( + pg->osd->send_message_osd_cluster( new MBackfillReserve( MBackfillReserve::REQUEST, pg->info.pgid, @@ -5495,7 +5495,7 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::WaitRemoteRecoveryReserved(my_con ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch()); if (con) { if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { - pg->osd->cluster_messenger->send_message( + pg->osd->send_message_osd_cluster( new MRecoveryReserve(MRecoveryReserve::REQUEST, pg->info.pgid, pg->get_osdmap()->get_epoch()), @@ -5542,7 +5542,7 @@ void PG::RecoveryState::Recovering::release_reservations() ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch()); if (con) { if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { - pg->osd->cluster_messenger->send_message( + pg->osd->send_message_osd_cluster( new MRecoveryReserve(MRecoveryReserve::RELEASE, pg->info.pgid, pg->get_osdmap()->get_epoch()), diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index de259eb1673..1f97311b813 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -557,7 +557,7 @@ void ReplicatedPG::do_pg_op(OpRequestRef op) CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); reply->set_data(outdata); reply->set_result(result); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); delete filter; } @@ -872,7 +872,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (already_ack(oldv)) { MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->add_flags(CEPH_OSD_FLAG_ACK); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); } else { dout(10) << " waiting for " << oldv << " to ack" << dendl; waiting_for_ack[oldv].push_back(op); @@ -981,7 +981,7 @@ void ReplicatedPG::do_op(OpRequestRef op) MOSDOpReply *reply = ctx->reply; ctx->reply = NULL; reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); delete ctx; put_object_context(obc); put_object_contexts(src_obc); @@ -1179,7 +1179,7 @@ void ReplicatedPG::do_scan(OpRequestRef op) get_osdmap()->get_epoch(), m->query_epoch, info.pgid, bi.begin, bi.end); ::encode(bi.objects, reply->get_data()); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } break; @@ -1238,7 +1238,7 @@ void ReplicatedPG::do_backfill(OpRequestRef op) get_osdmap()->get_epoch(), m->query_epoch, info.pgid); reply->set_priority(g_conf->osd_recovery_op_priority); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); queue_peering_event( CephPeeringEvtRef( new CephPeeringEvt( @@ -1340,7 +1340,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, vector<OSDOp> ops; tid_t rep_tid = osd->get_tid(); - osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this); ctx->mtime = ceph_clock_now(g_ceph_context); @@ -3367,7 +3367,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) /* there is a pending notification for this watcher, we should resend it anyway even if we already sent it as it might not have received it */ MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->client_messenger->send_message(notify_msg, session->con); + osd->send_message_osd_client(notify_msg, session->con); } } } @@ -3423,7 +3423,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) s->add_notif(notif, name); MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); - osd->client_messenger->send_message(notify_msg, s->con); + osd->send_message_osd_client(notify_msg, s->con); } else { // unconnected entity_name_t name = i->first; @@ -3841,7 +3841,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); dout(10) << " sending commit on " << *repop << " " << reply << dendl; assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); repop->sent_disk = true; } } @@ -3858,7 +3858,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) MOSDOp *m = (MOSDOp*)(*i)->request; MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->add_flags(CEPH_OSD_FLAG_ACK); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); } waiting_for_ack.erase(repop->v); } @@ -3873,7 +3873,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) reply->add_flags(CEPH_OSD_FLAG_ACK); dout(10) << " sending ack on " << *repop << " " << reply << dendl; assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); - osd->client_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_client(reply, m->get_connection()); repop->sent_ack = true; } @@ -4142,7 +4142,7 @@ void ReplicatedPG::handle_watch_timeout(void *_obc, vector<OSDOp> ops; tid_t rep_tid = osd->get_tid(); - osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this); ctx->mtime = ceph_clock_now(g_ceph_context); @@ -4965,7 +4965,7 @@ int ReplicatedPG::pull( void ReplicatedPG::send_remove_op(const hobject_t& oid, eversion_t v, int peer) { tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); dout(10) << "send_remove_op " << oid << " from osd." << peer << " tid " << tid << dendl; @@ -5093,7 +5093,7 @@ int ReplicatedPG::send_pull(int prio, int peer, { // send op tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); dout(10) << "send_pull_op " << recovery_info.soid << " " << recovery_info.version @@ -5405,7 +5405,7 @@ void ReplicatedPG::handle_push(OpRequestRef op) MOSDSubOpReply *reply = new MOSDSubOpReply( m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); - osd->cluster_messenger->send_message(reply, m->get_connection()); + osd->send_message_osd_cluster(reply, m->get_connection()); } int ReplicatedPG::send_push(int prio, int peer, @@ -5416,7 +5416,7 @@ int ReplicatedPG::send_push(int prio, int peer, ObjectRecoveryProgress new_progress = progress; tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, recovery_info.soid, false, 0, get_osdmap()->get_epoch(), tid, recovery_info.version); @@ -5516,7 +5516,7 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer) { // send a blank push back to the primary tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->cluster_messenger->get_myname(), 0, tid); + osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, soid, false, 0, get_osdmap()->get_epoch(), tid, eversion_t()); subop->ops = vector<OSDOp>(1); |