summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-08-30 17:02:49 -0700
committerSage Weil <sage@inktank.com>2013-08-30 17:02:49 -0700
commit46279327cb73672ade37d72a361b0ec2784591cc (patch)
tree13b12a75dbdfcacbefad84ce9650a772d4099739
parent7ec0b4fb780b91b44427ed94eee82c3c6b6fff9f (diff)
parent4f6c6b2d747860b1424e71fec59f781750c99553 (diff)
downloadceph-46279327cb73672ade37d72a361b0ec2784591cc.tar.gz
Merge pull request #541 from ceph/wip-6036
osd objecter; copy-get Reviewed-by: Greg Farnum <greg@inktank.com> Reviewed-by: Samuel Just <sam.just@inktank.com>
-rw-r--r--src/Makefile.am8
-rw-r--r--src/ceph_osd.cc133
-rw-r--r--src/include/ceph_strings.cc2
-rw-r--r--src/include/encoding.h11
-rw-r--r--src/include/rados.h9
-rw-r--r--src/os/ObjectStore.h15
-rw-r--r--src/osd/OSD.cc81
-rw-r--r--src/osd/OSD.h42
-rw-r--r--src/osd/ReplicatedPG.cc189
-rw-r--r--src/osd/ReplicatedPG.h3
-rw-r--r--src/osd/osd_types.cc55
-rw-r--r--src/osd/osd_types.h31
-rw-r--r--src/osdc/Objecter.cc29
-rw-r--r--src/osdc/Objecter.h83
-rw-r--r--src/test/encoding/types.h1
-rwxr-xr-xsrc/vstart.sh1
16 files changed, 572 insertions, 121 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 4b09c23e872..3bc5cbe0e11 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -96,7 +96,7 @@ bin_PROGRAMS += ceph_mon_store_converter
# osd
ceph_osd_SOURCES = ceph_osd.cc
-ceph_osd_LDADD = libosd.a $(LIBOS_LDA) $(LIBGLOBAL_LDA)
+ceph_osd_LDADD = libosd.a libosdc.la $(LIBOS_LDA) $(LIBGLOBAL_LDA)
ceph_osd_CXXFLAGS = ${AM_CXXFLAGS}
bin_PROGRAMS += ceph-osd
@@ -119,7 +119,7 @@ ceph_authtool_SOURCES = ceph_authtool.cc
ceph_authtool_LDADD = $(LIBGLOBAL_LDA)
ceph_filestore_dump_SOURCES = tools/ceph-filestore-dump.cc
ceph_filestore_dump_SOURCES += perfglue/disabled_heap_profiler.cc
-ceph_filestore_dump_LDADD = libosd.a $(LIBOS_LDA) $(LIBGLOBAL_LDA) -lboost_program_options
+ceph_filestore_dump_LDADD = libosd.a libosdc.la $(LIBOS_LDA) $(LIBGLOBAL_LDA) -lboost_program_options
if LINUX
ceph_filestore_dump_LDADD += -ldl
endif
@@ -835,7 +835,7 @@ check_PROGRAMS += unittest_osd_types
unittest_pglog_SOURCES = test/osd/TestPGLog.cc perfglue/disabled_heap_profiler.cc
unittest_pglog_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
-unittest_pglog_LDADD = libosd.a $(LIBOS_LDA) $(LIBGLOBAL_LDA) ${UNITTEST_LDADD}
+unittest_pglog_LDADD = libosd.a libosdc.la $(LIBOS_LDA) $(LIBGLOBAL_LDA) ${UNITTEST_LDADD}
check_PROGRAMS += unittest_pglog
if LINUX
@@ -1733,7 +1733,7 @@ libosd_a_SOURCES = \
osd/OpRequest.cc \
osd/SnapMapper.cc \
objclass/class_api.cc
-libosd_a_CXXFLAGS= ${AM_CXXFLAGS}
+libosd_a_CXXFLAGS = ${AM_CXXFLAGS}
noinst_LIBRARIES += libosd.a
libosdc_la_SOURCES = \
diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc
index dc6f435bdcf..120f4f28f6e 100644
--- a/src/ceph_osd.cc
+++ b/src/ceph_osd.cc
@@ -316,31 +316,34 @@ int main(int argc, const char **argv)
<< TEXT_NORMAL << dendl;
}
- Messenger *client_messenger = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "client",
- getpid());
- Messenger *cluster_messenger = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "cluster",
+ Messenger *ms_public = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "client",
+ getpid());
+ Messenger *ms_cluster = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "cluster",
+ getpid());
+ Messenger *ms_hbclient = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hbclient",
+ getpid());
+ Messenger *ms_hb_back_server = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hb_back_server",
getpid());
- Messenger *messenger_hbclient = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "hbclient",
- getpid());
- Messenger *messenger_hb_back_server = Messenger::create(g_ceph_context,
- entity_name_t::OSD(whoami), "hb_back_server",
- getpid());
- Messenger *messenger_hb_front_server = Messenger::create(g_ceph_context,
+ Messenger *ms_hb_front_server = Messenger::create(g_ceph_context,
entity_name_t::OSD(whoami), "hb_front_server",
getpid());
- cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- messenger_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- messenger_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
- messenger_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ Messenger *ms_objecter = Messenger::create(g_ceph_context,
+ entity_name_t::OSD(whoami), "hbclient",
+ getpid());
+ ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ ms_hbclient->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ ms_hb_back_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
+ ms_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cout << "starting osd." << whoami
- << " at " << client_messenger->get_myaddr()
+ << " at " << ms_public->get_myaddr()
<< " osd_data " << g_conf->osd_data
<< " " << ((g_conf->osd_journal.empty()) ?
- "(no journal)" : g_conf->osd_journal)
+ "(no journal)" : g_conf->osd_journal)
<< std::endl;
boost::scoped_ptr<Throttle> client_byte_throttler(
@@ -356,40 +359,42 @@ int main(int argc, const char **argv)
CEPH_FEATURE_PGID64 |
CEPH_FEATURE_MSG_AUTH;
- client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
- client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
- client_byte_throttler.get(),
- client_msg_throttler.get());
- client_messenger->set_policy(entity_name_t::TYPE_MON,
+ ms_public->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
+ ms_public->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
+ client_byte_throttler.get(),
+ client_msg_throttler.get());
+ ms_public->set_policy(entity_name_t::TYPE_MON,
Messenger::Policy::lossy_client(supported,
CEPH_FEATURE_UID |
CEPH_FEATURE_PGID64 |
CEPH_FEATURE_OSDENC));
//try to poison pill any OSD connections on the wrong address
- client_messenger->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(0,0));
+ ms_public->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0,0));
- cluster_messenger->set_default_policy(Messenger::Policy::stateless_server(0, 0));
- cluster_messenger->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(0,0));
- cluster_messenger->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::lossless_peer(supported,
- CEPH_FEATURE_UID |
- CEPH_FEATURE_PGID64 |
- CEPH_FEATURE_OSDENC));
- cluster_messenger->set_policy(entity_name_t::TYPE_CLIENT,
+ ms_cluster->set_default_policy(Messenger::Policy::stateless_server(0, 0));
+ ms_cluster->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(0,0));
+ ms_cluster->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::lossless_peer(supported,
+ CEPH_FEATURE_UID |
+ CEPH_FEATURE_PGID64 |
+ CEPH_FEATURE_OSDENC));
+ ms_cluster->set_policy(entity_name_t::TYPE_CLIENT,
+ Messenger::Policy::stateless_server(0, 0));
+
+ ms_hbclient->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::lossy_client(0, 0));
+ ms_hb_back_server->set_policy(entity_name_t::TYPE_OSD,
Messenger::Policy::stateless_server(0, 0));
+ ms_hb_front_server->set_policy(entity_name_t::TYPE_OSD,
+ Messenger::Policy::stateless_server(0, 0));
- messenger_hbclient->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::lossy_client(0, 0));
- messenger_hb_back_server->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(0, 0));
- messenger_hb_front_server->set_policy(entity_name_t::TYPE_OSD,
- Messenger::Policy::stateless_server(0, 0));
+ ms_objecter->set_default_policy(Messenger::Policy::lossy_client(0, CEPH_FEATURE_OSDREPLYMUX));
- r = client_messenger->bind(g_conf->public_addr);
+ r = ms_public->bind(g_conf->public_addr);
if (r < 0)
exit(1);
- r = cluster_messenger->bind(g_conf->cluster_addr);
+ r = ms_cluster->bind(g_conf->cluster_addr);
if (r < 0)
exit(1);
@@ -400,7 +405,7 @@ int main(int argc, const char **argv)
if (hb_back_addr.is_ip())
hb_back_addr.set_port(0);
}
- r = messenger_hb_back_server->bind(hb_back_addr);
+ r = ms_hb_back_server->bind(hb_back_addr);
if (r < 0)
exit(1);
@@ -408,7 +413,7 @@ int main(int argc, const char **argv)
entity_addr_t hb_front_addr = g_conf->public_addr;
if (hb_front_addr.is_ip())
hb_front_addr.set_port(0);
- r = messenger_hb_front_server->bind(hb_front_addr);
+ r = ms_hb_front_server->bind(hb_front_addr);
if (r < 0)
exit(1);
@@ -430,8 +435,13 @@ int main(int argc, const char **argv)
return -1;
global_init_chdir(g_ceph_context);
- osd = new OSD(whoami, cluster_messenger, client_messenger,
- messenger_hbclient, messenger_hb_front_server, messenger_hb_back_server,
+ osd = new OSD(whoami,
+ ms_cluster,
+ ms_public,
+ ms_hbclient,
+ ms_hb_front_server,
+ ms_hb_back_server,
+ ms_objecter,
&mc,
g_conf->osd_data, g_conf->osd_journal);
@@ -445,11 +455,12 @@ int main(int argc, const char **argv)
// Now close the standard file descriptors
global_init_shutdown_stderr(g_ceph_context);
- client_messenger->start();
- messenger_hbclient->start();
- messenger_hb_front_server->start();
- messenger_hb_back_server->start();
- cluster_messenger->start();
+ ms_public->start();
+ ms_hbclient->start();
+ ms_hb_front_server->start();
+ ms_hb_back_server->start();
+ ms_cluster->start();
+ ms_objecter->start();
// start osd
err = osd->init();
@@ -470,11 +481,12 @@ int main(int argc, const char **argv)
if (g_conf->inject_early_sigterm)
kill(getpid(), SIGTERM);
- client_messenger->wait();
- messenger_hbclient->wait();
- messenger_hb_front_server->wait();
- messenger_hb_back_server->wait();
- cluster_messenger->wait();
+ ms_public->wait();
+ ms_hbclient->wait();
+ ms_hb_front_server->wait();
+ ms_hb_back_server->wait();
+ ms_cluster->wait();
+ ms_objecter->wait();
unregister_async_signal_handler(SIGHUP, sighup_handler);
unregister_async_signal_handler(SIGINT, handle_osd_signal);
@@ -483,11 +495,12 @@ int main(int argc, const char **argv)
// done
delete osd;
- delete client_messenger;
- delete messenger_hbclient;
- delete messenger_hb_front_server;
- delete messenger_hb_back_server;
- delete cluster_messenger;
+ delete ms_public;
+ delete ms_hbclient;
+ delete ms_hb_front_server;
+ delete ms_hb_back_server;
+ delete ms_cluster;
+ delete ms_objecter;
client_byte_throttler.reset();
client_msg_throttler.reset();
g_ceph_context->put();
diff --git a/src/include/ceph_strings.cc b/src/include/ceph_strings.cc
index d46eca6aaf8..f14f29ce0e9 100644
--- a/src/include/ceph_strings.cc
+++ b/src/include/ceph_strings.cc
@@ -48,6 +48,8 @@ const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_TMAPPUT: return "tmapput";
case CEPH_OSD_OP_WATCH: return "watch";
+ case CEPH_OSD_OP_COPY_GET: return "copy-get";
+
case CEPH_OSD_OP_CLONERANGE: return "clonerange";
case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
diff --git a/src/include/encoding.h b/src/include/encoding.h
index 67c9af59d2b..a091f7f69e9 100644
--- a/src/include/encoding.h
+++ b/src/include/encoding.h
@@ -562,6 +562,17 @@ inline void decode(std::map<T,U>& m, bufferlist::iterator& p)
}
}
template<class T, class U>
+inline void decode_noclear(std::map<T,U>& m, bufferlist::iterator& p)
+{
+ __u32 n;
+ decode(n, p);
+ while (n--) {
+ T k;
+ decode(k, p);
+ decode(m[k], p);
+ }
+}
+template<class T, class U>
inline void encode_nohead(const std::map<T,U>& m, bufferlist& bl)
{
for (typename std::map<T,U>::const_iterator p = m.begin(); p != m.end(); ++p) {
diff --git a/src/include/rados.h b/src/include/rados.h
index de9b449ed15..27291a7440e 100644
--- a/src/include/rados.h
+++ b/src/include/rados.h
@@ -217,6 +217,8 @@ enum {
CEPH_OSD_OP_OMAPRMKEYS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
CEPH_OSD_OP_OMAP_CMP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
+ CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27,
+
/** multi **/
CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
@@ -398,9 +400,16 @@ struct ceph_osd_op {
__u8 flag; /* 0 = unwatch, 1 = watch */
} __attribute__ ((packed)) watch;
struct {
+ __le64 unused;
+ __le64 ver;
+ } __attribute__ ((packed)) assert_ver;
+ struct {
__le64 offset, length;
__le64 src_offset;
} __attribute__ ((packed)) clonerange;
+ struct {
+ __le64 max; /* max data in reply */
+ } __attribute__ ((packed)) copy_get;
};
__le32 payload_len;
} __attribute__ ((packed));
diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h
index eb5b40c5a69..655afee004f 100644
--- a/src/os/ObjectStore.h
+++ b/src/os/ObjectStore.h
@@ -475,6 +475,14 @@ public:
::encode(attrset, tbl);
ops++;
}
+ void setattrs(coll_t cid, const hobject_t& oid, map<string,bufferlist>& attrset) {
+ __u32 op = OP_SETATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(attrset, tbl);
+ ops++;
+ }
void rmattr(coll_t cid, const hobject_t& oid, const char *name) {
string n(name);
rmattr(cid, oid, n);
@@ -578,6 +586,13 @@ public:
::encode(aset, tbl);
ops++;
}
+ void collection_setattrs(coll_t cid, map<string,bufferlist>& aset) {
+ __u32 op = OP_COLL_SETATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(aset, tbl);
+ ops++;
+ }
void collection_rename(coll_t cid, coll_t ncid) {
__u32 op = OP_COLL_RENAME;
::encode(op, tbl);
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 66022a3898a..cff0c8d6a52 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -33,6 +33,7 @@
#include "OSD.h"
#include "OSDMap.h"
#include "Watch.h"
+#include "osdc/Objecter.h"
#include "common/ceph_argparse.h"
#include "common/version.h"
@@ -175,6 +176,12 @@ OSDService::OSDService(OSD *osd) :
pre_publish_lock("OSDService::pre_publish_lock"),
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
scrubs_active(0),
+ objecter_lock("OSD::objecter_lock"),
+ objecter_timer(osd->client_messenger->cct, objecter_lock),
+ objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap,
+ objecter_lock, objecter_timer)),
+ objecter_finisher(osd->client_messenger->cct),
+ objecter_dispatcher(this),
watch_lock("OSD::watch_lock"),
watch_timer(osd->client_messenger->cct, watch_lock),
next_notif_id(0),
@@ -202,6 +209,11 @@ OSDService::OSDService(OSD *osd) :
#endif
{}
+OSDService::~OSDService()
+{
+ delete objecter;
+}
+
void OSDService::_start_split(pg_t parent, const set<pg_t> &children)
{
for (set<pg_t>::const_iterator i = children.begin();
@@ -385,6 +397,15 @@ void OSDService::shutdown()
Mutex::Locker l(watch_lock);
watch_timer.shutdown();
}
+
+ {
+ Mutex::Locker l(objecter_lock);
+ objecter_timer.shutdown();
+ objecter->shutdown_locked();
+ }
+ objecter->shutdown_unlocked();
+ objecter_finisher.stop();
+
{
Mutex::Locker l(backfill_request_lock);
backfill_request_timer.shutdown();
@@ -396,6 +417,14 @@ void OSDService::shutdown()
void OSDService::init()
{
reserver_finisher.start();
+ {
+ objecter_finisher.start();
+ objecter->init_unlocked();
+ Mutex::Locker l(objecter_lock);
+ objecter_timer.init();
+ objecter->set_client_incarnation(0);
+ objecter->init_locked();
+ }
watch_timer.init();
}
@@ -882,6 +911,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
Messenger *hb_clientm,
Messenger *hb_front_serverm,
Messenger *hb_back_serverm,
+ Messenger *osdc_messenger,
MonClient *mc,
const std::string &dev, const std::string &jdev) :
Dispatcher(external_messenger->cct),
@@ -897,6 +927,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
cct->_conf->auth_service_required)),
cluster_messenger(internal_messenger),
client_messenger(external_messenger),
+ objecter_messenger(osdc_messenger),
monc(mc),
logger(NULL),
recoverystate_perf(NULL),
@@ -1212,6 +1243,8 @@ int OSD::init()
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
+ objecter_messenger->add_dispatcher_head(&service.objecter_dispatcher);
+
monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
r = monc->init();
if (r < 0)
@@ -1646,6 +1679,7 @@ int OSD::shutdown()
client_messenger->shutdown();
cluster_messenger->shutdown();
hbclient_messenger->shutdown();
+ objecter_messenger->shutdown();
hb_front_server_messenger->shutdown();
hb_back_server_messenger->shutdown();
peering_wq.clear();
@@ -4303,7 +4337,7 @@ bool OSD::_share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch
}
// does peer have old map?
- if (name.is_osd() &&
+ if (con->get_messenger() == cluster_messenger &&
osdmap->is_up(name.num()) &&
(osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
@@ -4376,6 +4410,37 @@ bool OSD::heartbeat_dispatch(Message *m)
return true;
}
+bool OSDService::ObjecterDispatcher::ms_dispatch(Message *m)
+{
+ Mutex::Locker l(osd->objecter_lock);
+ osd->objecter->dispatch(m);
+ return true;
+}
+
+bool OSDService::ObjecterDispatcher::ms_handle_reset(Connection *con)
+{
+ Mutex::Locker l(osd->objecter_lock);
+ osd->objecter->ms_handle_reset(con);
+ return true;
+}
+
+void OSDService::ObjecterDispatcher::ms_handle_connect(Connection *con)
+{
+ Mutex::Locker l(osd->objecter_lock);
+ return osd->objecter->ms_handle_connect(con);
+}
+
+bool OSDService::ObjecterDispatcher::ms_get_authorizer(int dest_type,
+ AuthAuthorizer **authorizer,
+ bool force_new)
+{
+ if (dest_type == CEPH_ENTITY_TYPE_MON)
+ return true;
+ *authorizer = osd->monc->auth->build_authorizer(dest_type);
+ return *authorizer != NULL;
+}
+
+
bool OSD::ms_dispatch(Message *m)
{
if (m->get_type() == MSG_OSD_MARK_ME_DOWN) {
@@ -4969,6 +5034,13 @@ void OSD::handle_osd_map(MOSDMap *m)
if (session)
session->put();
+ // share with the objecter
+ {
+ Mutex::Locker l(service.objecter_lock);
+ m->get();
+ service.objecter->handle_osd_map(m);
+ }
+
epoch_t first = m->get_first();
epoch_t last = m->get_last();
dout(3) << "handle_osd_map epochs [" << first << "," << last << "], i have "
@@ -5712,7 +5784,7 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
}
// ok, our map is same or newer.. do they still exist?
- if (m->get_source().is_osd()) {
+ if (m->get_connection()->get_messenger() == cluster_messenger) {
int from = m->get_source().num();
if (!osdmap->have_inst(from) ||
osdmap->get_cluster_addr(from) != m->get_source_inst().addr) {
@@ -6813,11 +6885,8 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags);
- Messenger *msgr = client_messenger;
reply->set_reply_versions(v, uv);
- if (m->get_source().is_osd())
- msgr = cluster_messenger;
- msgr->send_message(reply, m->get_connection());
+ m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
}
void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index e23c19b8f93..bd5e3d0bbbd 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -161,6 +161,7 @@ class OSDMap;
class MLog;
class MClass;
class MOSDPGMissing;
+class Objecter;
class Watch;
class Notification;
@@ -417,6 +418,26 @@ public:
void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
void handle_misdirected_op(PG *pg, OpRequestRef op);
+ // -- Objecter, for teiring reads/writes from/to other OSDs --
+ Mutex objecter_lock;
+ SafeTimer objecter_timer;
+ OSDMap objecter_osdmap;
+ Objecter *objecter;
+ Finisher objecter_finisher;
+ struct ObjecterDispatcher : public Dispatcher {
+ OSDService *osd;
+ bool ms_dispatch(Message *m);
+ bool ms_handle_reset(Connection *con);
+ void ms_handle_remote_reset(Connection *con) {}
+ void ms_handle_connect(Connection *con);
+ bool ms_get_authorizer(int dest_type,
+ AuthAuthorizer **authorizer,
+ bool force_new);
+ ObjecterDispatcher(OSDService *o) : Dispatcher(g_ceph_context), osd(o) {}
+ } objecter_dispatcher;
+ friend class ObjecterDispatcher;
+
+
// -- Watch --
Mutex watch_lock;
SafeTimer watch_timer;
@@ -608,6 +629,7 @@ public:
#endif
OSDService(OSD *osd);
+ ~OSDService();
};
class OSD : public Dispatcher,
public md_config_obs_t {
@@ -627,6 +649,7 @@ protected:
Messenger *cluster_messenger;
Messenger *client_messenger;
+ Messenger *objecter_messenger;
MonClient *monc;
PerfCounters *logger;
PerfCounters *recoverystate_perf;
@@ -832,7 +855,8 @@ public:
bool heartbeat_dispatch(Message *m);
struct HeartbeatDispatcher : public Dispatcher {
- private:
+ OSD *osd;
+ HeartbeatDispatcher(OSD *o) : Dispatcher(g_ceph_context), osd(o) {}
bool ms_dispatch(Message *m) {
return osd->heartbeat_dispatch(m);
};
@@ -846,15 +870,8 @@ public:
isvalid = true;
return true;
}
- public:
- OSD *osd;
- HeartbeatDispatcher(OSD *o)
- : Dispatcher(g_ceph_context), osd(o)
- {
- }
} heartbeat_dispatcher;
-
private:
// -- stats --
Mutex stat_lock;
@@ -1676,8 +1693,13 @@ protected:
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
- OSD(int id, Messenger *internal, Messenger *external,
- Messenger *hb_client, Messenger *hb_front_server, Messenger *hb_back_server,
+ OSD(int id,
+ Messenger *internal,
+ Messenger *external,
+ Messenger *hb_client,
+ Messenger *hb_front_server,
+ Messenger *hb_back_server,
+ Messenger *osdc_messenger,
MonClient *mc, const std::string &dev, const std::string &jdev);
~OSD();
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 4a8f24dadd6..0eee7b314e2 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -50,6 +50,8 @@
#include "include/compat.h"
#include "common/cmdparse.h"
+#include "osdc/Objecter.h"
+
#include "json_spirit/json_spirit_value.h"
#include "json_spirit/json_spirit_reader.h"
#include "include/assert.h" // json_spirit clobbers it
@@ -836,7 +838,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
dout(10) << "no src oid specified for multi op " << osd_op << dendl;
osd->reply_op_error(op, -EINVAL);
}
- src_obc.clear();
return;
}
@@ -867,7 +868,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
src_obc[clone_oid] = sobc;
continue;
}
- src_obc.clear();
return;
} else {
continue;
@@ -877,43 +877,40 @@ void ReplicatedPG::do_op(OpRequestRef op)
op->mark_started();
- const hobject_t& soid = obc->obs.oi.soid;
OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
&obc->obs, obc->ssc,
this);
ctx->obc = obc;
ctx->src_obc = src_obc;
- if (op->may_write()) {
- // snap
- if (pool.info.is_pool_snaps_mode()) {
- // use pool's snapc
- ctx->snapc = pool.snapc;
- } else {
- // client specified snapc
- ctx->snapc.seq = m->get_snap_seq();
- ctx->snapc.snaps = m->get_snaps();
- }
- if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
- ctx->snapc.seq < obc->ssc->snapset.seq) {
- dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
- << " < snapset seq " << obc->ssc->snapset.seq
- << " on " << soid << dendl;
- delete ctx;
- src_obc.clear();
- osd->reply_op_error(op, -EOLDSNAPC);
- return;
- }
+ execute_ctx(ctx);
+}
+
+void ReplicatedPG::execute_ctx(OpContext *ctx)
+{
+ dout(10) << __func__ << " " << ctx << dendl;
+ OpRequestRef op = ctx->op;
+ MOSDOp *m = static_cast<MOSDOp*>(op->request);
+ ObjectContextRef obc = ctx->obc;
+ const hobject_t& soid = obc->obs.oi.soid;
+ map<hobject_t,ObjectContextRef>& src_obc = ctx->src_obc;
+
+ // this method must be idempotent since we may call it several times
+ // before we finally apply the resulting transaction.
+ ctx->op_t = ObjectStore::Transaction();
+ ctx->local_t = ObjectStore::Transaction();
+ // dup/replay?
+ if (op->may_write()) {
const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid);
if (entry) {
const eversion_t& oldv = entry->version;
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
- delete ctx;
- src_obc.clear();
if (already_complete(oldv)) {
- osd->reply_op_error(op, 0, oldv, entry->user_version);
+ reply_ctx(ctx, 0, oldv, entry->user_version);
} else {
+ delete ctx;
+
if (m->wants_ack()) {
if (already_ack(oldv)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
@@ -933,6 +930,24 @@ void ReplicatedPG::do_op(OpRequestRef op)
op->mark_started();
+ // snap
+ if (pool.info.is_pool_snaps_mode()) {
+ // use pool's snapc
+ ctx->snapc = pool.snapc;
+ } else {
+ // client specified snapc
+ ctx->snapc.seq = m->get_snap_seq();
+ ctx->snapc.snaps = m->get_snaps();
+ }
+ if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
+ ctx->snapc.seq < obc->ssc->snapset.seq) {
+ dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
+ << " < snapset seq " << obc->ssc->snapset.seq
+ << " on " << obc->obs.oi.soid << dendl;
+ reply_ctx(ctx, -EOLDSNAPC);
+ return;
+ }
+
// version
ctx->at_version = pg_log.get_head();
@@ -942,7 +957,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
assert(ctx->at_version > pg_log.get_head());
ctx->mtime = m->get_mtime();
-
+
dout(10) << "do_op " << soid << " " << ctx->ops
<< " ov " << obc->obs.oi.version << " av " << ctx->at_version
<< " snapc " << ctx->snapc
@@ -988,16 +1003,13 @@ void ReplicatedPG::do_op(OpRequestRef op)
if (result == -EAGAIN) {
// clean up after the ctx
delete ctx;
- src_obc.clear();
return;
}
// check for full
if (ctx->delta_stats.num_bytes > 0 &&
pool.info.get_flags() & pg_pool_t::FLAG_FULL) {
- delete ctx;
- src_obc.clear();
- osd->reply_op_error(op, -ENOSPC);
+ reply_ctx(ctx, -ENOSPC);
return;
}
@@ -1043,7 +1055,6 @@ void ReplicatedPG::do_op(OpRequestRef op)
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->send_message_osd_client(reply, m->get_connection());
delete ctx;
- src_obc.clear();
return;
}
@@ -1088,6 +1099,17 @@ void ReplicatedPG::do_op(OpRequestRef op)
repop->put();
}
+void ReplicatedPG::reply_ctx(OpContext *ctx, int r)
+{
+ osd->reply_op_error(ctx->op, r);
+ delete ctx;
+}
+
+void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
+{
+ osd->reply_op_error(ctx->op, r, v, uv);
+ delete ctx;
+}
void ReplicatedPG::log_op_stats(OpContext *ctx)
{
@@ -2542,7 +2564,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
case CEPH_OSD_OP_ASSERT_SRC_VERSION:
++ctx->num_read;
{
- uint64_t ver = op.watch.ver;
+ uint64_t ver = op.assert_ver.ver;
if (!ver)
result = -EINVAL;
else if (ver < src_obc->obs.oi.user_version)
@@ -3057,6 +3079,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_rd++;
}
break;
+
case CEPH_OSD_OP_OMAPGETVALS:
++ctx->num_read;
{
@@ -3121,6 +3144,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_rd++;
}
break;
+
case CEPH_OSD_OP_OMAPGETHEADER:
++ctx->num_read;
{
@@ -3142,6 +3166,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_rd++;
}
break;
+
case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
++ctx->num_read;
{
@@ -3182,6 +3207,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_rd++;
}
break;
+
case CEPH_OSD_OP_OMAP_CMP:
++ctx->num_read;
{
@@ -3247,6 +3273,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
}
break;
+
// OMAP Write ops
case CEPH_OSD_OP_OMAPSETVALS:
++ctx->num_write;
@@ -3277,6 +3304,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_wr++;
}
break;
+
case CEPH_OSD_OP_OMAPSETHEADER:
++ctx->num_write;
{
@@ -3292,6 +3320,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_wr++;
}
break;
+
case CEPH_OSD_OP_OMAPCLEAR:
++ctx->num_write;
{
@@ -3307,6 +3336,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_wr++;
}
break;
+
case CEPH_OSD_OP_OMAPRMKEYS:
++ctx->num_write;
{
@@ -3330,6 +3360,79 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
ctx->delta_stats.num_wr++;
}
break;
+
+ case CEPH_OSD_OP_COPY_GET:
+ ++ctx->num_read;
+ {
+ object_copy_cursor_t cursor;
+ uint64_t out_max;
+ try {
+ ::decode(cursor, bp);
+ ::decode(out_max, bp);
+ }
+ catch (buffer::error& e) {
+ result = -EINVAL;
+ goto fail;
+ }
+
+ // size, mtime
+ ::encode(oi.size, osd_op.outdata);
+ ::encode(oi.mtime, osd_op.outdata);
+
+ // attrs
+ map<string,bufferptr> out_attrs;
+ if (!cursor.attr_complete) {
+ result = osd->store->getattrs(coll, soid, out_attrs, true);
+ if (result < 0)
+ break;
+ cursor.attr_complete = true;
+ }
+ ::encode(out_attrs, osd_op.outdata);
+
+ int64_t left = out_max - osd_op.outdata.length();
+
+ // data
+ bufferlist bl;
+ if (left > 0 && !cursor.data_complete) {
+ if (cursor.data_offset < oi.size) {
+ result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl);
+ if (result < 0)
+ return result;
+ assert(result <= left);
+ left -= result;
+ cursor.data_offset += result;
+ }
+ if (cursor.data_offset == oi.size)
+ cursor.data_complete = true;
+ }
+ ::encode(bl, osd_op.outdata);
+
+ // omap
+ std::map<std::string,bufferlist> out_omap;
+ if (left > 0 && !cursor.omap_complete) {
+ ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid);
+ assert(iter);
+ if (iter->valid()) {
+ iter->upper_bound(cursor.omap_offset);
+ for (; left > 0 && iter->valid(); iter->next()) {
+ out_omap.insert(make_pair(iter->key(), iter->value()));
+ left -= iter->key().length() + 4 + iter->value().length() + 4;
+ }
+ }
+ if (iter->valid()) {
+ cursor.omap_offset = iter->key();
+ } else {
+ cursor.omap_complete = true;
+ }
+ }
+ ::encode(out_omap, osd_op.outdata);
+
+ ::encode(cursor, osd_op.outdata);
+ result = 0;
+ }
+ break;
+
+
default:
dout(1) << "unrecognized osd op " << op.op
<< " " << ceph_osd_op_name(op.op)
@@ -6682,17 +6785,27 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
context_registry_on_change();
// requeue object waiters
- requeue_ops(waiting_for_backfill_pos);
- requeue_object_waiters(waiting_for_missing_object);
+ if (is_primary()) {
+ requeue_ops(waiting_for_backfill_pos);
+ requeue_object_waiters(waiting_for_missing_object);
+ } else {
+ waiting_for_backfill_pos.clear();
+ waiting_for_missing_object.clear();
+ }
for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
waiting_for_degraded_object.erase(p++)) {
- requeue_ops(p->second);
+ if (is_primary())
+ requeue_ops(p->second);
+ else
+ p->second.clear();
finish_degraded_object(p->first);
}
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
+ if (is_primary())
+ requeue_ops(waiting_for_all_missing);
+ else
+ waiting_for_all_missing.clear();
// this will requeue ops we were working on but didn't finish, and
// any dups
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index bce141834ca..5dc7d882a8b 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -575,6 +575,9 @@ protected:
void _make_clone(ObjectStore::Transaction& t,
const hobject_t& head, const hobject_t& coid,
object_info_t *poi);
+ void execute_ctx(OpContext *ctx);
+ void reply_ctx(OpContext *ctx, int err);
+ void reply_ctx(OpContext *ctx, int err, eversion_t v, version_t uv);
void make_writeable(OpContext *ctx);
void log_op_stats(OpContext *ctx);
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index fafea2c816e..1c1b457002c 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -2422,6 +2422,55 @@ void pg_missing_t::split_into(
}
}
+// -- object_copy_cursor_t --
+
+void object_copy_cursor_t::encode(bufferlist& bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(attr_complete, bl);
+ ::encode(data_offset, bl);
+ ::encode(data_complete, bl);
+ ::encode(omap_offset, bl);
+ ::encode(omap_complete, bl);
+ ENCODE_FINISH(bl);
+}
+
+void object_copy_cursor_t::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(attr_complete, bl);
+ ::decode(data_offset, bl);
+ ::decode(data_complete, bl);
+ ::decode(omap_offset, bl);
+ ::decode(omap_complete, bl);
+ DECODE_FINISH(bl);
+}
+
+void object_copy_cursor_t::dump(Formatter *f) const
+{
+ f->dump_unsigned("attr_complete", (int)attr_complete);
+ f->dump_unsigned("data_offset", data_offset);
+ f->dump_unsigned("data_complete", (int)data_complete);
+ f->dump_string("omap_offset", omap_offset);
+ f->dump_unsigned("omap_complete", (int)omap_complete);
+}
+
+void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>& o)
+{
+ o.push_back(new object_copy_cursor_t);
+ o.push_back(new object_copy_cursor_t);
+ o.back()->attr_complete = true;
+ o.back()->data_offset = 123;
+ o.push_back(new object_copy_cursor_t);
+ o.back()->attr_complete = true;
+ o.back()->data_complete = true;
+ o.back()->omap_offset = "foo";
+ o.push_back(new object_copy_cursor_t);
+ o.back()->attr_complete = true;
+ o.back()->data_complete = true;
+ o.back()->omap_complete = true;
+}
+
// -- pg_create_t --
void pg_create_t::encode(bufferlist &bl) const
@@ -3416,6 +3465,9 @@ ostream& operator<<(ostream& out, const OSDOp& op)
case CEPH_OSD_OP_LIST_WATCHERS:
case CEPH_OSD_OP_LIST_SNAPS:
break;
+ case CEPH_OSD_OP_ASSERT_VER:
+ out << " v" << op.op.assert_ver.ver;
+ break;
case CEPH_OSD_OP_TRUNCATE:
out << " " << op.op.extent.offset;
break;
@@ -3430,6 +3482,9 @@ ostream& operator<<(ostream& out, const OSDOp& op)
out << (op.op.watch.flag ? " add":" remove")
<< " cookie " << op.op.watch.cookie << " ver " << op.op.watch.ver;
break;
+ case CEPH_OSD_OP_COPY_GET:
+ out << " max " << op.op.copy_get.max;
+ break;
default:
out << " " << op.op.extent.offset << "~" << op.op.extent.length;
if (op.op.extent.truncate_seq)
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 3eb14246cc5..00e9409c98a 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -1816,6 +1816,37 @@ struct pg_ls_response_t {
WRITE_CLASS_ENCODER(pg_ls_response_t)
+/**
+ * object_copy_cursor_t
+ */
+struct object_copy_cursor_t {
+ bool attr_complete;
+ uint64_t data_offset;
+ bool data_complete;
+ string omap_offset;
+ bool omap_complete;
+
+ object_copy_cursor_t()
+ : attr_complete(false),
+ data_offset(0),
+ data_complete(false),
+ omap_complete(false)
+ {}
+
+ bool is_initial() const {
+ return !attr_complete && data_offset == 0 && omap_offset.empty();
+ }
+ bool is_complete() const {
+ return attr_complete && data_complete && omap_complete;
+ }
+
+ static void generate_test_instances(list<object_copy_cursor_t*>& o);
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator &bl);
+ void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(object_copy_cursor_t)
+
/**
* pg creation info
diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc
index 39378521b09..75292d13ac3 100644
--- a/src/osdc/Objecter.cc
+++ b/src/osdc/Objecter.cc
@@ -229,7 +229,8 @@ void Objecter::init_locked()
assert(!initialized);
schedule_tick();
- maybe_request_map();
+ if (osdmap->get_epoch() == 0)
+ maybe_request_map();
initialized = true;
}
@@ -1280,6 +1281,32 @@ tid_t Objecter::_op_submit(Op *op)
return op->tid;
}
+int Objecter::op_cancel(tid_t tid)
+{
+ assert(client_lock.is_locked());
+ assert(initialized);
+
+ map<tid_t, Op*>::iterator p = ops.find(tid);
+ if (p == ops.end()) {
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ return -ENOENT;
+ }
+
+ ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+ Op *op = p->second;
+ if (op->onack) {
+ op->onack->complete(-ECANCELED);
+ op->onack = NULL;
+ }
+ if (op->oncommit) {
+ op->oncommit->complete(-ECANCELED);
+ op->oncommit = NULL;
+ }
+ op_cancel_map_check(op);
+ finish_op(op);
+ return 0;
+}
+
bool Objecter::is_pg_changed(vector<int>& o, vector<int>& n, bool any_change)
{
if (o.empty() && n.empty())
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index be756054497..91f62551729 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -567,6 +567,82 @@ struct ObjectOperation {
}
}
+ struct C_ObjectOperation_copyget : public Context {
+ bufferlist bl;
+ object_copy_cursor_t *cursor;
+ uint64_t *out_size;
+ utime_t *out_mtime;
+ std::map<std::string,bufferlist> *out_attrs;
+ bufferlist *out_data;
+ std::map<std::string,bufferlist> *out_omap;
+ int *prval;
+ C_ObjectOperation_copyget(object_copy_cursor_t *c,
+ uint64_t *s,
+ utime_t *m,
+ std::map<std::string,bufferlist> *a,
+ bufferlist *d,
+ std::map<std::string,bufferlist> *o,
+ int *r)
+ : cursor(c),
+ out_size(s), out_mtime(m), out_attrs(a),
+ out_data(d), out_omap(o), prval(r) {}
+ void finish(int r) {
+ if (r < 0)
+ return;
+ try {
+ bufferlist::iterator p = bl.begin();
+ uint64_t size;
+ ::decode(size, p);
+ if (out_size)
+ *out_size = size;
+ utime_t mtime;
+ ::decode(mtime, p);
+ if (out_mtime)
+ *out_mtime = mtime;
+ if (out_attrs) {
+ ::decode_noclear(*out_attrs, p);
+ } else {
+ std::map<std::string,bufferlist> t;
+ ::decode(t, p);
+ }
+ bufferlist bl;
+ ::decode(bl, p);
+ if (out_data)
+ out_data->claim_append(bl);
+ if (out_omap) {
+ ::decode_noclear(*out_omap, p);
+ } else {
+ std::map<std::string,bufferlist> t;
+ ::decode(t, p);
+ }
+ ::decode(*cursor, p);
+ } catch (buffer::error& e) {
+ if (prval)
+ *prval = -EIO;
+ }
+ }
+ };
+
+ void copy_get(object_copy_cursor_t *cursor,
+ uint64_t max,
+ uint64_t *out_size,
+ utime_t *out_mtime,
+ std::map<std::string,bufferlist> *out_attrs,
+ bufferlist *out_data,
+ std::map<std::string,bufferlist> *out_omap,
+ int *prval) {
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
+ osd_op.op.copy_get.max = max;
+ ::encode(*cursor, osd_op.indata);
+ ::encode(max, osd_op.indata);
+ unsigned p = ops.size() - 1;
+ out_rval[p] = prval;
+ C_ObjectOperation_copyget *h =
+ new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_attrs, out_data, out_omap, prval);
+ out_bl[p] = &h->bl;
+ out_handler[p] = h;
+ }
+
void omap_get_header(bufferlist *bl, int *prval) {
add_op(CEPH_OSD_OP_OMAPGETHEADER);
unsigned p = ops.size() - 1;
@@ -647,8 +723,8 @@ struct ObjectOperation {
}
void assert_version(uint64_t ver) {
- bufferlist bl;
- add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0, bl);
+ OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
+ osd_op.op.assert_ver.ver = ver;
}
void assert_src_version(const object_t& srcoid, snapid_t srcsnapid, uint64_t ver) {
bufferlist bl;
@@ -1255,6 +1331,9 @@ private:
/** Clear the passed flags from the global op flag set */
void clear_global_op_flag(int flags) { global_op_flags &= ~flags; }
+ /// cancel an in-progress request
+ int op_cancel(tid_t tid);
+
// commands
int osd_command(int osd, vector<string>& cmd, bufferlist& inbl, tid_t *ptid,
bufferlist *poutbl, string *prs, Context *onfinish) {
diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h
index 213da6fcccc..a6f7cfb7883 100644
--- a/src/test/encoding/types.h
+++ b/src/test/encoding/types.h
@@ -53,6 +53,7 @@ TYPE(pg_log_t)
TYPE(pg_missing_t::item)
TYPE(pg_missing_t)
TYPE(pg_ls_response_t)
+TYPE(object_copy_cursor_t)
TYPE(pg_create_t)
TYPE(watch_info_t)
TYPE(object_info_t)
diff --git a/src/vstart.sh b/src/vstart.sh
index 1a6f4f957b9..c112bfc9138 100755
--- a/src/vstart.sh
+++ b/src/vstart.sh
@@ -199,6 +199,7 @@ else
COSDDEBUG='
debug ms = 1
debug osd = 25
+ debug objecter = 20
debug monc = 20
debug journal = 20
debug filestore = 20