diff options
author | Sage Weil <sage@inktank.com> | 2013-08-30 17:02:49 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-08-30 17:02:49 -0700 |
commit | 46279327cb73672ade37d72a361b0ec2784591cc (patch) | |
tree | 13b12a75dbdfcacbefad84ce9650a772d4099739 | |
parent | 7ec0b4fb780b91b44427ed94eee82c3c6b6fff9f (diff) | |
parent | 4f6c6b2d747860b1424e71fec59f781750c99553 (diff) | |
download | ceph-46279327cb73672ade37d72a361b0ec2784591cc.tar.gz |
Merge pull request #541 from ceph/wip-6036
osd objecter; copy-get
Reviewed-by: Greg Farnum <greg@inktank.com>
Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/ceph_osd.cc | 133 | ||||
-rw-r--r-- | src/include/ceph_strings.cc | 2 | ||||
-rw-r--r-- | src/include/encoding.h | 11 | ||||
-rw-r--r-- | src/include/rados.h | 9 | ||||
-rw-r--r-- | src/os/ObjectStore.h | 15 | ||||
-rw-r--r-- | src/osd/OSD.cc | 81 | ||||
-rw-r--r-- | src/osd/OSD.h | 42 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 189 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 3 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 55 | ||||
-rw-r--r-- | src/osd/osd_types.h | 31 | ||||
-rw-r--r-- | src/osdc/Objecter.cc | 29 | ||||
-rw-r--r-- | src/osdc/Objecter.h | 83 | ||||
-rw-r--r-- | src/test/encoding/types.h | 1 | ||||
-rwxr-xr-x | src/vstart.sh | 1 |
16 files changed, 572 insertions, 121 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 4b09c23e872..3bc5cbe0e11 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -96,7 +96,7 @@ bin_PROGRAMS += ceph_mon_store_converter # osd ceph_osd_SOURCES = ceph_osd.cc -ceph_osd_LDADD = libosd.a $(LIBOS_LDA) $(LIBGLOBAL_LDA) +ceph_osd_LDADD = libosd.a libosdc.la $(LIBOS_LDA) $(LIBGLOBAL_LDA) ceph_osd_CXXFLAGS = ${AM_CXXFLAGS} bin_PROGRAMS += ceph-osd @@ -119,7 +119,7 @@ ceph_authtool_SOURCES = ceph_authtool.cc ceph_authtool_LDADD = $(LIBGLOBAL_LDA) ceph_filestore_dump_SOURCES = tools/ceph-filestore-dump.cc ceph_filestore_dump_SOURCES += perfglue/disabled_heap_profiler.cc -ceph_filestore_dump_LDADD = libosd.a $(LIBOS_LDA) $(LIBGLOBAL_LDA) -lboost_program_options +ceph_filestore_dump_LDADD = libosd.a libosdc.la $(LIBOS_LDA) $(LIBGLOBAL_LDA) -lboost_program_options if LINUX ceph_filestore_dump_LDADD += -ldl endif @@ -835,7 +835,7 @@ check_PROGRAMS += unittest_osd_types unittest_pglog_SOURCES = test/osd/TestPGLog.cc perfglue/disabled_heap_profiler.cc unittest_pglog_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} -unittest_pglog_LDADD = libosd.a $(LIBOS_LDA) $(LIBGLOBAL_LDA) ${UNITTEST_LDADD} +unittest_pglog_LDADD = libosd.a libosdc.la $(LIBOS_LDA) $(LIBGLOBAL_LDA) ${UNITTEST_LDADD} check_PROGRAMS += unittest_pglog if LINUX @@ -1733,7 +1733,7 @@ libosd_a_SOURCES = \ osd/OpRequest.cc \ osd/SnapMapper.cc \ objclass/class_api.cc -libosd_a_CXXFLAGS= ${AM_CXXFLAGS} +libosd_a_CXXFLAGS = ${AM_CXXFLAGS} noinst_LIBRARIES += libosd.a libosdc_la_SOURCES = \ diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index dc6f435bdcf..120f4f28f6e 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -316,31 +316,34 @@ int main(int argc, const char **argv) << TEXT_NORMAL << dendl; } - Messenger *client_messenger = Messenger::create(g_ceph_context, - entity_name_t::OSD(whoami), "client", - getpid()); - Messenger *cluster_messenger = Messenger::create(g_ceph_context, - entity_name_t::OSD(whoami), "cluster", + Messenger *ms_public = Messenger::create(g_ceph_context, + entity_name_t::OSD(whoami), "client", + getpid()); + Messenger *ms_cluster = Messenger::create(g_ceph_context, + entity_name_t::OSD(whoami), "cluster", + getpid()); + Messenger *ms_hbclient = Messenger::create(g_ceph_context, + entity_name_t::OSD(whoami), "hbclient", + getpid()); + Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, + entity_name_t::OSD(whoami), "hb_back_server", getpid()); - Messenger *messenger_hbclient = Messenger::create(g_ceph_context, - entity_name_t::OSD(whoami), "hbclient", - getpid()); - Messenger *messenger_hb_back_server = Messenger::create(g_ceph_context, - entity_name_t::OSD(whoami), "hb_back_server", - getpid()); - Messenger *messenger_hb_front_server = Messenger::create(g_ceph_context, + Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, entity_name_t::OSD(whoami), "hb_front_server", getpid()); - cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL); - messenger_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL); - messenger_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL); - messenger_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL); + Messenger *ms_objecter = Messenger::create(g_ceph_context, + entity_name_t::OSD(whoami), "hbclient", + getpid()); + ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL); + ms_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL); + ms_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL); + ms_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL); cout << "starting osd." << whoami - << " at " << client_messenger->get_myaddr() + << " at " << ms_public->get_myaddr() << " osd_data " << g_conf->osd_data << " " << ((g_conf->osd_journal.empty()) ? - "(no journal)" : g_conf->osd_journal) + "(no journal)" : g_conf->osd_journal) << std::endl; boost::scoped_ptr<Throttle> client_byte_throttler( @@ -356,40 +359,42 @@ int main(int argc, const char **argv) CEPH_FEATURE_PGID64 | CEPH_FEATURE_MSG_AUTH; - client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); - client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, - client_byte_throttler.get(), - client_msg_throttler.get()); - client_messenger->set_policy(entity_name_t::TYPE_MON, + ms_public->set_default_policy(Messenger::Policy::stateless_server(supported, 0)); + ms_public->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + client_byte_throttler.get(), + client_msg_throttler.get()); + ms_public->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID | CEPH_FEATURE_PGID64 | CEPH_FEATURE_OSDENC)); //try to poison pill any OSD connections on the wrong address - client_messenger->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::stateless_server(0,0)); + ms_public->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::stateless_server(0,0)); - cluster_messenger->set_default_policy(Messenger::Policy::stateless_server(0, 0)); - cluster_messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(0,0)); - cluster_messenger->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::lossless_peer(supported, - CEPH_FEATURE_UID | - CEPH_FEATURE_PGID64 | - CEPH_FEATURE_OSDENC)); - cluster_messenger->set_policy(entity_name_t::TYPE_CLIENT, + ms_cluster->set_default_policy(Messenger::Policy::stateless_server(0, 0)); + ms_cluster->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(0,0)); + ms_cluster->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::lossless_peer(supported, + CEPH_FEATURE_UID | + CEPH_FEATURE_PGID64 | + CEPH_FEATURE_OSDENC)); + ms_cluster->set_policy(entity_name_t::TYPE_CLIENT, + Messenger::Policy::stateless_server(0, 0)); + + ms_hbclient->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::lossy_client(0, 0)); + ms_hb_back_server->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::stateless_server(0, 0)); + ms_hb_front_server->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::stateless_server(0, 0)); - messenger_hbclient->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::lossy_client(0, 0)); - messenger_hb_back_server->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::stateless_server(0, 0)); - messenger_hb_front_server->set_policy(entity_name_t::TYPE_OSD, - Messenger::Policy::stateless_server(0, 0)); + ms_objecter->set_default_policy(Messenger::Policy::lossy_client(0, CEPH_FEATURE_OSDREPLYMUX)); - r = client_messenger->bind(g_conf->public_addr); + r = ms_public->bind(g_conf->public_addr); if (r < 0) exit(1); - r = cluster_messenger->bind(g_conf->cluster_addr); + r = ms_cluster->bind(g_conf->cluster_addr); if (r < 0) exit(1); @@ -400,7 +405,7 @@ int main(int argc, const char **argv) if (hb_back_addr.is_ip()) hb_back_addr.set_port(0); } - r = messenger_hb_back_server->bind(hb_back_addr); + r = ms_hb_back_server->bind(hb_back_addr); if (r < 0) exit(1); @@ -408,7 +413,7 @@ int main(int argc, const char **argv) entity_addr_t hb_front_addr = g_conf->public_addr; if (hb_front_addr.is_ip()) hb_front_addr.set_port(0); - r = messenger_hb_front_server->bind(hb_front_addr); + r = ms_hb_front_server->bind(hb_front_addr); if (r < 0) exit(1); @@ -430,8 +435,13 @@ int main(int argc, const char **argv) return -1; global_init_chdir(g_ceph_context); - osd = new OSD(whoami, cluster_messenger, client_messenger, - messenger_hbclient, messenger_hb_front_server, messenger_hb_back_server, + osd = new OSD(whoami, + ms_cluster, + ms_public, + ms_hbclient, + ms_hb_front_server, + ms_hb_back_server, + ms_objecter, &mc, g_conf->osd_data, g_conf->osd_journal); @@ -445,11 +455,12 @@ int main(int argc, const char **argv) // Now close the standard file descriptors global_init_shutdown_stderr(g_ceph_context); - client_messenger->start(); - messenger_hbclient->start(); - messenger_hb_front_server->start(); - messenger_hb_back_server->start(); - cluster_messenger->start(); + ms_public->start(); + ms_hbclient->start(); + ms_hb_front_server->start(); + ms_hb_back_server->start(); + ms_cluster->start(); + ms_objecter->start(); // start osd err = osd->init(); @@ -470,11 +481,12 @@ int main(int argc, const char **argv) if (g_conf->inject_early_sigterm) kill(getpid(), SIGTERM); - client_messenger->wait(); - messenger_hbclient->wait(); - messenger_hb_front_server->wait(); - messenger_hb_back_server->wait(); - cluster_messenger->wait(); + ms_public->wait(); + ms_hbclient->wait(); + ms_hb_front_server->wait(); + ms_hb_back_server->wait(); + ms_cluster->wait(); + ms_objecter->wait(); unregister_async_signal_handler(SIGHUP, sighup_handler); unregister_async_signal_handler(SIGINT, handle_osd_signal); @@ -483,11 +495,12 @@ int main(int argc, const char **argv) // done delete osd; - delete client_messenger; - delete messenger_hbclient; - delete messenger_hb_front_server; - delete messenger_hb_back_server; - delete cluster_messenger; + delete ms_public; + delete ms_hbclient; + delete ms_hb_front_server; + delete ms_hb_back_server; + delete ms_cluster; + delete ms_objecter; client_byte_throttler.reset(); client_msg_throttler.reset(); g_ceph_context->put(); diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc index d46eca6aaf8..f14f29ce0e9 100644 --- a/src/include/ceph_strings.cc +++ b/src/include/ceph_strings.cc @@ -48,6 +48,8 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TMAPPUT: return "tmapput"; case CEPH_OSD_OP_WATCH: return "watch"; + case CEPH_OSD_OP_COPY_GET: return "copy-get"; + case CEPH_OSD_OP_CLONERANGE: return "clonerange"; case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version"; case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr"; diff --git a/src/include/encoding.h b/src/include/encoding.h index 67c9af59d2b..a091f7f69e9 100644 --- a/src/include/encoding.h +++ b/src/include/encoding.h @@ -562,6 +562,17 @@ inline void decode(std::map<T,U>& m, bufferlist::iterator& p) } } template<class T, class U> +inline void decode_noclear(std::map<T,U>& m, bufferlist::iterator& p) +{ + __u32 n; + decode(n, p); + while (n--) { + T k; + decode(k, p); + decode(m[k], p); + } +} +template<class T, class U> inline void encode_nohead(const std::map<T,U>& m, bufferlist& bl) { for (typename std::map<T,U>::const_iterator p = m.begin(); p != m.end(); ++p) { diff --git a/src/include/rados.h b/src/include/rados.h index de9b449ed15..27291a7440e 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -217,6 +217,8 @@ enum { CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24, CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25, + CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27, + /** multi **/ CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1, CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2, @@ -398,9 +400,16 @@ struct ceph_osd_op { __u8 flag; /* 0 = unwatch, 1 = watch */ } __attribute__ ((packed)) watch; struct { + __le64 unused; + __le64 ver; + } __attribute__ ((packed)) assert_ver; + struct { __le64 offset, length; __le64 src_offset; } __attribute__ ((packed)) clonerange; + struct { + __le64 max; /* max data in reply */ + } __attribute__ ((packed)) copy_get; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index eb5b40c5a69..655afee004f 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -475,6 +475,14 @@ public: ::encode(attrset, tbl); ops++; } + void setattrs(coll_t cid, const hobject_t& oid, map<string,bufferlist>& attrset) { + __u32 op = OP_SETATTRS; + ::encode(op, tbl); + ::encode(cid, tbl); + ::encode(oid, tbl); + ::encode(attrset, tbl); + ops++; + } void rmattr(coll_t cid, const hobject_t& oid, const char *name) { string n(name); rmattr(cid, oid, n); @@ -578,6 +586,13 @@ public: ::encode(aset, tbl); ops++; } + void collection_setattrs(coll_t cid, map<string,bufferlist>& aset) { + __u32 op = OP_COLL_SETATTRS; + ::encode(op, tbl); + ::encode(cid, tbl); + ::encode(aset, tbl); + ops++; + } void collection_rename(coll_t cid, coll_t ncid) { __u32 op = OP_COLL_RENAME; ::encode(op, tbl); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 66022a3898a..cff0c8d6a52 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -33,6 +33,7 @@ #include "OSD.h" #include "OSDMap.h" #include "Watch.h" +#include "osdc/Objecter.h" #include "common/ceph_argparse.h" #include "common/version.h" @@ -175,6 +176,12 @@ OSDService::OSDService(OSD *osd) : pre_publish_lock("OSDService::pre_publish_lock"), sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0), scrubs_active(0), + objecter_lock("OSD::objecter_lock"), + objecter_timer(osd->client_messenger->cct, objecter_lock), + objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap, + objecter_lock, objecter_timer)), + objecter_finisher(osd->client_messenger->cct), + objecter_dispatcher(this), watch_lock("OSD::watch_lock"), watch_timer(osd->client_messenger->cct, watch_lock), next_notif_id(0), @@ -202,6 +209,11 @@ OSDService::OSDService(OSD *osd) : #endif {} +OSDService::~OSDService() +{ + delete objecter; +} + void OSDService::_start_split(pg_t parent, const set<pg_t> &children) { for (set<pg_t>::const_iterator i = children.begin(); @@ -385,6 +397,15 @@ void OSDService::shutdown() Mutex::Locker l(watch_lock); watch_timer.shutdown(); } + + { + Mutex::Locker l(objecter_lock); + objecter_timer.shutdown(); + objecter->shutdown_locked(); + } + objecter->shutdown_unlocked(); + objecter_finisher.stop(); + { Mutex::Locker l(backfill_request_lock); backfill_request_timer.shutdown(); @@ -396,6 +417,14 @@ void OSDService::shutdown() void OSDService::init() { reserver_finisher.start(); + { + objecter_finisher.start(); + objecter->init_unlocked(); + Mutex::Locker l(objecter_lock); + objecter_timer.init(); + objecter->set_client_incarnation(0); + objecter->init_locked(); + } watch_timer.init(); } @@ -882,6 +911,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, Messenger *hb_clientm, Messenger *hb_front_serverm, Messenger *hb_back_serverm, + Messenger *osdc_messenger, MonClient *mc, const std::string &dev, const std::string &jdev) : Dispatcher(external_messenger->cct), @@ -897,6 +927,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, cct->_conf->auth_service_required)), cluster_messenger(internal_messenger), client_messenger(external_messenger), + objecter_messenger(osdc_messenger), monc(mc), logger(NULL), recoverystate_perf(NULL), @@ -1212,6 +1243,8 @@ int OSD::init() hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher); hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher); + objecter_messenger->add_dispatcher_head(&service.objecter_dispatcher); + monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); r = monc->init(); if (r < 0) @@ -1646,6 +1679,7 @@ int OSD::shutdown() client_messenger->shutdown(); cluster_messenger->shutdown(); hbclient_messenger->shutdown(); + objecter_messenger->shutdown(); hb_front_server_messenger->shutdown(); hb_back_server_messenger->shutdown(); peering_wq.clear(); @@ -4303,7 +4337,7 @@ bool OSD::_share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch } // does peer have old map? - if (name.is_osd() && + if (con->get_messenger() == cluster_messenger && osdmap->is_up(name.num()) && (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { @@ -4376,6 +4410,37 @@ bool OSD::heartbeat_dispatch(Message *m) return true; } +bool OSDService::ObjecterDispatcher::ms_dispatch(Message *m) +{ + Mutex::Locker l(osd->objecter_lock); + osd->objecter->dispatch(m); + return true; +} + +bool OSDService::ObjecterDispatcher::ms_handle_reset(Connection *con) +{ + Mutex::Locker l(osd->objecter_lock); + osd->objecter->ms_handle_reset(con); + return true; +} + +void OSDService::ObjecterDispatcher::ms_handle_connect(Connection *con) +{ + Mutex::Locker l(osd->objecter_lock); + return osd->objecter->ms_handle_connect(con); +} + +bool OSDService::ObjecterDispatcher::ms_get_authorizer(int dest_type, + AuthAuthorizer **authorizer, + bool force_new) +{ + if (dest_type == CEPH_ENTITY_TYPE_MON) + return true; + *authorizer = osd->monc->auth->build_authorizer(dest_type); + return *authorizer != NULL; +} + + bool OSD::ms_dispatch(Message *m) { if (m->get_type() == MSG_OSD_MARK_ME_DOWN) { @@ -4969,6 +5034,13 @@ void OSD::handle_osd_map(MOSDMap *m) if (session) session->put(); + // share with the objecter + { + Mutex::Locker l(service.objecter_lock); + m->get(); + service.objecter->handle_osd_map(m); + } + epoch_t first = m->get_first(); epoch_t last = m->get_last(); dout(3) << "handle_osd_map epochs [" << first << "," << last << "], i have " @@ -5712,7 +5784,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) } // ok, our map is same or newer.. do they still exist? - if (m->get_source().is_osd()) { + if (m->get_connection()->get_messenger() == cluster_messenger) { int from = m->get_source().num(); if (!osdmap->have_inst(from) || osdmap->get_cluster_addr(from) != m->get_source_inst().addr) { @@ -6813,11 +6885,8 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags); - Messenger *msgr = client_messenger; reply->set_reply_versions(v, uv); - if (m->get_source().is_osd()) - msgr = cluster_messenger; - msgr->send_message(reply, m->get_connection()); + m->get_connection()->get_messenger()->send_message(reply, m->get_connection()); } void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index e23c19b8f93..bd5e3d0bbbd 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -161,6 +161,7 @@ class OSDMap; class MLog; class MClass; class MOSDPGMissing; +class Objecter; class Watch; class Notification; @@ -417,6 +418,26 @@ public: void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv); void handle_misdirected_op(PG *pg, OpRequestRef op); + // -- Objecter, for teiring reads/writes from/to other OSDs -- + Mutex objecter_lock; + SafeTimer objecter_timer; + OSDMap objecter_osdmap; + Objecter *objecter; + Finisher objecter_finisher; + struct ObjecterDispatcher : public Dispatcher { + OSDService *osd; + bool ms_dispatch(Message *m); + bool ms_handle_reset(Connection *con); + void ms_handle_remote_reset(Connection *con) {} + void ms_handle_connect(Connection *con); + bool ms_get_authorizer(int dest_type, + AuthAuthorizer **authorizer, + bool force_new); + ObjecterDispatcher(OSDService *o) : Dispatcher(g_ceph_context), osd(o) {} + } objecter_dispatcher; + friend class ObjecterDispatcher; + + // -- Watch -- Mutex watch_lock; SafeTimer watch_timer; @@ -608,6 +629,7 @@ public: #endif OSDService(OSD *osd); + ~OSDService(); }; class OSD : public Dispatcher, public md_config_obs_t { @@ -627,6 +649,7 @@ protected: Messenger *cluster_messenger; Messenger *client_messenger; + Messenger *objecter_messenger; MonClient *monc; PerfCounters *logger; PerfCounters *recoverystate_perf; @@ -832,7 +855,8 @@ public: bool heartbeat_dispatch(Message *m); struct HeartbeatDispatcher : public Dispatcher { - private: + OSD *osd; + HeartbeatDispatcher(OSD *o) : Dispatcher(g_ceph_context), osd(o) {} bool ms_dispatch(Message *m) { return osd->heartbeat_dispatch(m); }; @@ -846,15 +870,8 @@ public: isvalid = true; return true; } - public: - OSD *osd; - HeartbeatDispatcher(OSD *o) - : Dispatcher(g_ceph_context), osd(o) - { - } } heartbeat_dispatcher; - private: // -- stats -- Mutex stat_lock; @@ -1676,8 +1693,13 @@ protected: public: /* internal and external can point to the same messenger, they will still * be cleaned up properly*/ - OSD(int id, Messenger *internal, Messenger *external, - Messenger *hb_client, Messenger *hb_front_server, Messenger *hb_back_server, + OSD(int id, + Messenger *internal, + Messenger *external, + Messenger *hb_client, + Messenger *hb_front_server, + Messenger *hb_back_server, + Messenger *osdc_messenger, MonClient *mc, const std::string &dev, const std::string &jdev); ~OSD(); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 4a8f24dadd6..0eee7b314e2 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -50,6 +50,8 @@ #include "include/compat.h" #include "common/cmdparse.h" +#include "osdc/Objecter.h" + #include "json_spirit/json_spirit_value.h" #include "json_spirit/json_spirit_reader.h" #include "include/assert.h" // json_spirit clobbers it @@ -836,7 +838,6 @@ void ReplicatedPG::do_op(OpRequestRef op) dout(10) << "no src oid specified for multi op " << osd_op << dendl; osd->reply_op_error(op, -EINVAL); } - src_obc.clear(); return; } @@ -867,7 +868,6 @@ void ReplicatedPG::do_op(OpRequestRef op) src_obc[clone_oid] = sobc; continue; } - src_obc.clear(); return; } else { continue; @@ -877,43 +877,40 @@ void ReplicatedPG::do_op(OpRequestRef op) op->mark_started(); - const hobject_t& soid = obc->obs.oi.soid; OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, this); ctx->obc = obc; ctx->src_obc = src_obc; - if (op->may_write()) { - // snap - if (pool.info.is_pool_snaps_mode()) { - // use pool's snapc - ctx->snapc = pool.snapc; - } else { - // client specified snapc - ctx->snapc.seq = m->get_snap_seq(); - ctx->snapc.snaps = m->get_snaps(); - } - if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) && - ctx->snapc.seq < obc->ssc->snapset.seq) { - dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq - << " < snapset seq " << obc->ssc->snapset.seq - << " on " << soid << dendl; - delete ctx; - src_obc.clear(); - osd->reply_op_error(op, -EOLDSNAPC); - return; - } + execute_ctx(ctx); +} + +void ReplicatedPG::execute_ctx(OpContext *ctx) +{ + dout(10) << __func__ << " " << ctx << dendl; + OpRequestRef op = ctx->op; + MOSDOp *m = static_cast<MOSDOp*>(op->request); + ObjectContextRef obc = ctx->obc; + const hobject_t& soid = obc->obs.oi.soid; + map<hobject_t,ObjectContextRef>& src_obc = ctx->src_obc; + + // this method must be idempotent since we may call it several times + // before we finally apply the resulting transaction. + ctx->op_t = ObjectStore::Transaction(); + ctx->local_t = ObjectStore::Transaction(); + // dup/replay? + if (op->may_write()) { const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid); if (entry) { const eversion_t& oldv = entry->version; dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl; - delete ctx; - src_obc.clear(); if (already_complete(oldv)) { - osd->reply_op_error(op, 0, oldv, entry->user_version); + reply_ctx(ctx, 0, oldv, entry->user_version); } else { + delete ctx; + if (m->wants_ack()) { if (already_ack(oldv)) { MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); @@ -933,6 +930,24 @@ void ReplicatedPG::do_op(OpRequestRef op) op->mark_started(); + // snap + if (pool.info.is_pool_snaps_mode()) { + // use pool's snapc + ctx->snapc = pool.snapc; + } else { + // client specified snapc + ctx->snapc.seq = m->get_snap_seq(); + ctx->snapc.snaps = m->get_snaps(); + } + if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) && + ctx->snapc.seq < obc->ssc->snapset.seq) { + dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq + << " < snapset seq " << obc->ssc->snapset.seq + << " on " << obc->obs.oi.soid << dendl; + reply_ctx(ctx, -EOLDSNAPC); + return; + } + // version ctx->at_version = pg_log.get_head(); @@ -942,7 +957,7 @@ void ReplicatedPG::do_op(OpRequestRef op) assert(ctx->at_version > pg_log.get_head()); ctx->mtime = m->get_mtime(); - + dout(10) << "do_op " << soid << " " << ctx->ops << " ov " << obc->obs.oi.version << " av " << ctx->at_version << " snapc " << ctx->snapc @@ -988,16 +1003,13 @@ void ReplicatedPG::do_op(OpRequestRef op) if (result == -EAGAIN) { // clean up after the ctx delete ctx; - src_obc.clear(); return; } // check for full if (ctx->delta_stats.num_bytes > 0 && pool.info.get_flags() & pg_pool_t::FLAG_FULL) { - delete ctx; - src_obc.clear(); - osd->reply_op_error(op, -ENOSPC); + reply_ctx(ctx, -ENOSPC); return; } @@ -1043,7 +1055,6 @@ void ReplicatedPG::do_op(OpRequestRef op) reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); osd->send_message_osd_client(reply, m->get_connection()); delete ctx; - src_obc.clear(); return; } @@ -1088,6 +1099,17 @@ void ReplicatedPG::do_op(OpRequestRef op) repop->put(); } +void ReplicatedPG::reply_ctx(OpContext *ctx, int r) +{ + osd->reply_op_error(ctx->op, r); + delete ctx; +} + +void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv) +{ + osd->reply_op_error(ctx->op, r, v, uv); + delete ctx; +} void ReplicatedPG::log_op_stats(OpContext *ctx) { @@ -2542,7 +2564,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) case CEPH_OSD_OP_ASSERT_SRC_VERSION: ++ctx->num_read; { - uint64_t ver = op.watch.ver; + uint64_t ver = op.assert_ver.ver; if (!ver) result = -EINVAL; else if (ver < src_obc->obs.oi.user_version) @@ -3057,6 +3079,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_rd++; } break; + case CEPH_OSD_OP_OMAPGETVALS: ++ctx->num_read; { @@ -3121,6 +3144,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_rd++; } break; + case CEPH_OSD_OP_OMAPGETHEADER: ++ctx->num_read; { @@ -3142,6 +3166,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_rd++; } break; + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: ++ctx->num_read; { @@ -3182,6 +3207,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_rd++; } break; + case CEPH_OSD_OP_OMAP_CMP: ++ctx->num_read; { @@ -3247,6 +3273,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) } } break; + // OMAP Write ops case CEPH_OSD_OP_OMAPSETVALS: ++ctx->num_write; @@ -3277,6 +3304,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_wr++; } break; + case CEPH_OSD_OP_OMAPSETHEADER: ++ctx->num_write; { @@ -3292,6 +3320,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_wr++; } break; + case CEPH_OSD_OP_OMAPCLEAR: ++ctx->num_write; { @@ -3307,6 +3336,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_wr++; } break; + case CEPH_OSD_OP_OMAPRMKEYS: ++ctx->num_write; { @@ -3330,6 +3360,79 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) ctx->delta_stats.num_wr++; } break; + + case CEPH_OSD_OP_COPY_GET: + ++ctx->num_read; + { + object_copy_cursor_t cursor; + uint64_t out_max; + try { + ::decode(cursor, bp); + ::decode(out_max, bp); + } + catch (buffer::error& e) { + result = -EINVAL; + goto fail; + } + + // size, mtime + ::encode(oi.size, osd_op.outdata); + ::encode(oi.mtime, osd_op.outdata); + + // attrs + map<string,bufferptr> out_attrs; + if (!cursor.attr_complete) { + result = osd->store->getattrs(coll, soid, out_attrs, true); + if (result < 0) + break; + cursor.attr_complete = true; + } + ::encode(out_attrs, osd_op.outdata); + + int64_t left = out_max - osd_op.outdata.length(); + + // data + bufferlist bl; + if (left > 0 && !cursor.data_complete) { + if (cursor.data_offset < oi.size) { + result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl); + if (result < 0) + return result; + assert(result <= left); + left -= result; + cursor.data_offset += result; + } + if (cursor.data_offset == oi.size) + cursor.data_complete = true; + } + ::encode(bl, osd_op.outdata); + + // omap + std::map<std::string,bufferlist> out_omap; + if (left > 0 && !cursor.omap_complete) { + ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid); + assert(iter); + if (iter->valid()) { + iter->upper_bound(cursor.omap_offset); + for (; left > 0 && iter->valid(); iter->next()) { + out_omap.insert(make_pair(iter->key(), iter->value())); + left -= iter->key().length() + 4 + iter->value().length() + 4; + } + } + if (iter->valid()) { + cursor.omap_offset = iter->key(); + } else { + cursor.omap_complete = true; + } + } + ::encode(out_omap, osd_op.outdata); + + ::encode(cursor, osd_op.outdata); + result = 0; + } + break; + + default: dout(1) << "unrecognized osd op " << op.op << " " << ceph_osd_op_name(op.op) @@ -6682,17 +6785,27 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) context_registry_on_change(); // requeue object waiters - requeue_ops(waiting_for_backfill_pos); - requeue_object_waiters(waiting_for_missing_object); + if (is_primary()) { + requeue_ops(waiting_for_backfill_pos); + requeue_object_waiters(waiting_for_missing_object); + } else { + waiting_for_backfill_pos.clear(); + waiting_for_missing_object.clear(); + } for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin(); p != waiting_for_degraded_object.end(); waiting_for_degraded_object.erase(p++)) { - requeue_ops(p->second); + if (is_primary()) + requeue_ops(p->second); + else + p->second.clear(); finish_degraded_object(p->first); } - requeue_ops(waiting_for_all_missing); - waiting_for_all_missing.clear(); + if (is_primary()) + requeue_ops(waiting_for_all_missing); + else + waiting_for_all_missing.clear(); // this will requeue ops we were working on but didn't finish, and // any dups diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index bce141834ca..5dc7d882a8b 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -575,6 +575,9 @@ protected: void _make_clone(ObjectStore::Transaction& t, const hobject_t& head, const hobject_t& coid, object_info_t *poi); + void execute_ctx(OpContext *ctx); + void reply_ctx(OpContext *ctx, int err); + void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv); void make_writeable(OpContext *ctx); void log_op_stats(OpContext *ctx); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index fafea2c816e..1c1b457002c 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -2422,6 +2422,55 @@ void pg_missing_t::split_into( } } +// -- object_copy_cursor_t -- + +void object_copy_cursor_t::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(attr_complete, bl); + ::encode(data_offset, bl); + ::encode(data_complete, bl); + ::encode(omap_offset, bl); + ::encode(omap_complete, bl); + ENCODE_FINISH(bl); +} + +void object_copy_cursor_t::decode(bufferlist::iterator &bl) +{ + DECODE_START(1, bl); + ::decode(attr_complete, bl); + ::decode(data_offset, bl); + ::decode(data_complete, bl); + ::decode(omap_offset, bl); + ::decode(omap_complete, bl); + DECODE_FINISH(bl); +} + +void object_copy_cursor_t::dump(Formatter *f) const +{ + f->dump_unsigned("attr_complete", (int)attr_complete); + f->dump_unsigned("data_offset", data_offset); + f->dump_unsigned("data_complete", (int)data_complete); + f->dump_string("omap_offset", omap_offset); + f->dump_unsigned("omap_complete", (int)omap_complete); +} + +void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>& o) +{ + o.push_back(new object_copy_cursor_t); + o.push_back(new object_copy_cursor_t); + o.back()->attr_complete = true; + o.back()->data_offset = 123; + o.push_back(new object_copy_cursor_t); + o.back()->attr_complete = true; + o.back()->data_complete = true; + o.back()->omap_offset = "foo"; + o.push_back(new object_copy_cursor_t); + o.back()->attr_complete = true; + o.back()->data_complete = true; + o.back()->omap_complete = true; +} + // -- pg_create_t -- void pg_create_t::encode(bufferlist &bl) const @@ -3416,6 +3465,9 @@ ostream& operator<<(ostream& out, const OSDOp& op) case CEPH_OSD_OP_LIST_WATCHERS: case CEPH_OSD_OP_LIST_SNAPS: break; + case CEPH_OSD_OP_ASSERT_VER: + out << " v" << op.op.assert_ver.ver; + break; case CEPH_OSD_OP_TRUNCATE: out << " " << op.op.extent.offset; break; @@ -3430,6 +3482,9 @@ ostream& operator<<(ostream& out, const OSDOp& op) out << (op.op.watch.flag ? " add":" remove") << " cookie " << op.op.watch.cookie << " ver " << op.op.watch.ver; break; + case CEPH_OSD_OP_COPY_GET: + out << " max " << op.op.copy_get.max; + break; default: out << " " << op.op.extent.offset << "~" << op.op.extent.length; if (op.op.extent.truncate_seq) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 3eb14246cc5..00e9409c98a 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1816,6 +1816,37 @@ struct pg_ls_response_t { WRITE_CLASS_ENCODER(pg_ls_response_t) +/** + * object_copy_cursor_t + */ +struct object_copy_cursor_t { + bool attr_complete; + uint64_t data_offset; + bool data_complete; + string omap_offset; + bool omap_complete; + + object_copy_cursor_t() + : attr_complete(false), + data_offset(0), + data_complete(false), + omap_complete(false) + {} + + bool is_initial() const { + return !attr_complete && data_offset == 0 && omap_offset.empty(); + } + bool is_complete() const { + return attr_complete && data_complete && omap_complete; + } + + static void generate_test_instances(list<object_copy_cursor_t*>& o); + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator &bl); + void dump(Formatter *f) const; +}; +WRITE_CLASS_ENCODER(object_copy_cursor_t) + /** * pg creation info diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 39378521b09..75292d13ac3 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -229,7 +229,8 @@ void Objecter::init_locked() assert(!initialized); schedule_tick(); - maybe_request_map(); + if (osdmap->get_epoch() == 0) + maybe_request_map(); initialized = true; } @@ -1280,6 +1281,32 @@ tid_t Objecter::_op_submit(Op *op) return op->tid; } +int Objecter::op_cancel(tid_t tid) +{ + assert(client_lock.is_locked()); + assert(initialized); + + map<tid_t, Op*>::iterator p = ops.find(tid); + if (p == ops.end()) { + ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + return -ENOENT; + } + + ldout(cct, 10) << __func__ << " tid " << tid << dendl; + Op *op = p->second; + if (op->onack) { + op->onack->complete(-ECANCELED); + op->onack = NULL; + } + if (op->oncommit) { + op->oncommit->complete(-ECANCELED); + op->oncommit = NULL; + } + op_cancel_map_check(op); + finish_op(op); + return 0; +} + bool Objecter::is_pg_changed(vector<int>& o, vector<int>& n, bool any_change) { if (o.empty() && n.empty()) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index be756054497..91f62551729 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -567,6 +567,82 @@ struct ObjectOperation { } } + struct C_ObjectOperation_copyget : public Context { + bufferlist bl; + object_copy_cursor_t *cursor; + uint64_t *out_size; + utime_t *out_mtime; + std::map<std::string,bufferlist> *out_attrs; + bufferlist *out_data; + std::map<std::string,bufferlist> *out_omap; + int *prval; + C_ObjectOperation_copyget(object_copy_cursor_t *c, + uint64_t *s, + utime_t *m, + std::map<std::string,bufferlist> *a, + bufferlist *d, + std::map<std::string,bufferlist> *o, + int *r) + : cursor(c), + out_size(s), out_mtime(m), out_attrs(a), + out_data(d), out_omap(o), prval(r) {} + void finish(int r) { + if (r < 0) + return; + try { + bufferlist::iterator p = bl.begin(); + uint64_t size; + ::decode(size, p); + if (out_size) + *out_size = size; + utime_t mtime; + ::decode(mtime, p); + if (out_mtime) + *out_mtime = mtime; + if (out_attrs) { + ::decode_noclear(*out_attrs, p); + } else { + std::map<std::string,bufferlist> t; + ::decode(t, p); + } + bufferlist bl; + ::decode(bl, p); + if (out_data) + out_data->claim_append(bl); + if (out_omap) { + ::decode_noclear(*out_omap, p); + } else { + std::map<std::string,bufferlist> t; + ::decode(t, p); + } + ::decode(*cursor, p); + } catch (buffer::error& e) { + if (prval) + *prval = -EIO; + } + } + }; + + void copy_get(object_copy_cursor_t *cursor, + uint64_t max, + uint64_t *out_size, + utime_t *out_mtime, + std::map<std::string,bufferlist> *out_attrs, + bufferlist *out_data, + std::map<std::string,bufferlist> *out_omap, + int *prval) { + OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET); + osd_op.op.copy_get.max = max; + ::encode(*cursor, osd_op.indata); + ::encode(max, osd_op.indata); + unsigned p = ops.size() - 1; + out_rval[p] = prval; + C_ObjectOperation_copyget *h = + new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_attrs, out_data, out_omap, prval); + out_bl[p] = &h->bl; + out_handler[p] = h; + } + void omap_get_header(bufferlist *bl, int *prval) { add_op(CEPH_OSD_OP_OMAPGETHEADER); unsigned p = ops.size() - 1; @@ -647,8 +723,8 @@ struct ObjectOperation { } void assert_version(uint64_t ver) { - bufferlist bl; - add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl); + OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER); + osd_op.op.assert_ver.ver = ver; } void assert_src_version(const object_t& srcoid, snapid_t srcsnapid, uint64_t ver) { bufferlist bl; @@ -1255,6 +1331,9 @@ private: /** Clear the passed flags from the global op flag set */ void clear_global_op_flag(int flags) { global_op_flags &= ~flags; } + /// cancel an in-progress request + int op_cancel(tid_t tid); + // commands int osd_command(int osd, vector<string>& cmd, bufferlist& inbl, tid_t *ptid, bufferlist *poutbl, string *prs, Context *onfinish) { diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index 213da6fcccc..a6f7cfb7883 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -53,6 +53,7 @@ TYPE(pg_log_t) TYPE(pg_missing_t::item) TYPE(pg_missing_t) TYPE(pg_ls_response_t) +TYPE(object_copy_cursor_t) TYPE(pg_create_t) TYPE(watch_info_t) TYPE(object_info_t) diff --git a/src/vstart.sh b/src/vstart.sh index 1a6f4f957b9..c112bfc9138 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -199,6 +199,7 @@ else COSDDEBUG=' debug ms = 1 debug osd = 25 + debug objecter = 20 debug monc = 20 debug journal = 20 debug filestore = 20 |