summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2013-07-25 15:21:31 -0700
committerSage Weil <sage@inktank.com>2013-07-25 15:21:31 -0700
commit09a664e25391dbad9a479bae33904d28231f429d (patch)
tree615797c2efc22cf47ee3a1f90db11947a3dba37f
parent8f010aff684e820ecc837c25ac77c7a05d7191ff (diff)
parentb0535fcf854c5042d6b5ff481aabca08026d8f7f (diff)
downloadceph-09a664e25391dbad9a479bae33904d28231f429d.tar.gz
Merge remote-tracking branch 'gh/cuttlefish-next' into cuttlefish
-rw-r--r--src/common/config_opts.h2
-rw-r--r--src/msg/Accepter.cc5
-rw-r--r--src/msg/Pipe.cc191
-rw-r--r--src/msg/Pipe.h3
-rw-r--r--src/msg/SimpleMessenger.cc33
-rw-r--r--src/msg/SimpleMessenger.h6
-rw-r--r--src/os/FileStore.cc88
-rw-r--r--src/os/FileStore.h3
-rw-r--r--src/os/HashIndex.cc15
-rw-r--r--src/os/LFNIndex.cc11
-rw-r--r--src/osd/OSD.cc2
-rw-r--r--src/osd/OSD.h8
-rw-r--r--src/osd/PG.cc36
-rw-r--r--src/osd/PG.h7
-rw-r--r--src/osd/ReplicatedPG.cc55
-rw-r--r--src/osd/ReplicatedPG.h7
-rw-r--r--src/test/filestore/store_test.cc71
17 files changed, 412 insertions, 131 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index c31282c1997..7940cc760a1 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -388,6 +388,7 @@ OPTION(osd_map_cache_size, OPT_INT, 500)
OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message
OPTION(osd_map_share_max_epochs, OPT_INT, 100) // cap on # of inc maps we send to peers, clients
OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading
+OPTION(osd_peering_wq_batch_size, OPT_U64, 20)
OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304)
OPTION(osd_op_pq_min_cost, OPT_U64, 65536)
OPTION(osd_disk_threads, OPT_INT, 1)
@@ -452,6 +453,7 @@ OPTION(osd_debug_drop_pg_create_duration, OPT_INT, 1)
OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op
OPTION(osd_debug_op_order, OPT_BOOL, false)
OPTION(osd_debug_verify_snaps_on_info, OPT_BOOL, false)
+OPTION(osd_debug_verify_stray_on_activate, OPT_BOOL, false)
OPTION(osd_debug_skip_full_check_in_backfill_reservation, OPT_BOOL, false)
OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track
OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track
diff --git a/src/msg/Accepter.cc b/src/msg/Accepter.cc
index 90c68df6cf3..a3f5e3224c6 100644
--- a/src/msg/Accepter.cc
+++ b/src/msg/Accepter.cc
@@ -164,6 +164,11 @@ int Accepter::rebind(int avoid_port)
int old_port = addr.get_port();
addr.set_port(0);
+ // adjust the nonce; we want our entity_addr_t to be truly unique.
+ nonce += 1000000;
+ msgr->my_inst.addr.nonce = nonce;
+ ldout(msgr->cct,10) << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
+
ldout(msgr->cct,10) << " will try " << addr << dendl;
int r = bind(addr, old_port, avoid_port);
if (r == 0)
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 2a42b97d92d..87426218d48 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -211,75 +211,86 @@ void *Pipe::DelayedDelivery::entry()
int Pipe::accept()
{
ldout(msgr->cct,10) << "accept" << dendl;
+ assert(pipe_lock.is_locked());
+ assert(state == STATE_ACCEPTING);
+
+ pipe_lock.Unlock();
+
+ // vars
+ bufferlist addrs;
+ entity_addr_t socket_addr;
+ socklen_t len;
+ int r;
+ char banner[strlen(CEPH_BANNER)+1];
+ bufferlist addrbl;
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ Pipe *existing = 0;
+ bufferptr bp;
+ bufferlist authorizer, authorizer_reply;
+ bool authorizer_valid;
+ uint64_t feat_missing;
+ bool replaced = false;
+ CryptoKey session_key;
+ int removed; // single-use down below
+
+ // this should roughly mirror pseudocode at
+ // http://ceph.newdream.net/wiki/Messaging_protocol
+ int reply_tag = 0;
+ uint64_t existing_seq = -1;
+
+ // used for reading in the remote acked seq on connect
+ uint64_t newly_acked_seq = 0;
set_socket_options();
- // my creater gave me sd via accept()
- assert(state == STATE_ACCEPTING);
-
// announce myself.
- int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
- if (rc < 0) {
+ r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
+ if (r < 0) {
ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
// and my addr
- bufferlist addrs;
::encode(msgr->my_inst.addr, addrs);
port = msgr->my_inst.addr.get_port();
// and peer's socket addr (they might not know their ip)
- entity_addr_t socket_addr;
- socklen_t len = sizeof(socket_addr.ss_addr());
- int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
+ len = sizeof(socket_addr.ss_addr());
+ r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
if (r < 0) {
char buf[80];
ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
::encode(socket_addr, addrs);
- rc = tcp_write(addrs.c_str(), addrs.length());
- if (rc < 0) {
+ r = tcp_write(addrs.c_str(), addrs.length());
+ if (r < 0) {
ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
// identify peer
- char banner[strlen(CEPH_BANNER)+1];
if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
banner[strlen(CEPH_BANNER)] = 0;
ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
- bufferlist addrbl;
{
bufferptr tp(sizeof(peer_addr));
addrbl.push_back(tp);
}
if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
- state = STATE_CLOSED;
- state_closed.set(1);
- return -1;
+ goto fail_unlocked;
}
{
bufferlist::iterator ti = addrbl.begin();
@@ -297,31 +308,12 @@ int Pipe::accept()
}
set_peer_addr(peer_addr); // so that connection_state gets set up
- ceph_msg_connect connect;
- ceph_msg_connect_reply reply;
- Pipe *existing = 0;
- bufferptr bp;
- bufferlist authorizer, authorizer_reply;
- bool authorizer_valid;
- uint64_t feat_missing;
- bool replaced = false;
- CryptoKey session_key;
-
- // this should roughly mirror pseudocode at
- // http://ceph.newdream.net/wiki/Messaging_protocol
- int reply_tag = 0;
- uint64_t existing_seq = -1;
-
- // used for reading in the remote acked seq on connect
- uint64_t newly_acked_seq = 0;
-
while (1) {
if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
goto fail_unlocked;
}
-
authorizer.clear();
if (connect.authorizer_len) {
bp = buffer::create(connect.authorizer_len);
@@ -338,25 +330,32 @@ int Pipe::accept()
<< dendl;
msgr->lock.Lock(); // FIXME
+ pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
goto shutting_down;
+ if (state != STATE_ACCEPTING) {
+ goto shutting_down;
+ }
// note peer's type, flags
set_peer_type(connect.host_type);
policy = msgr->get_policy(connect.host_type);
ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
- << ", policy.lossy=" << policy.lossy
- << dendl;
+ << ", policy.lossy=" << policy.lossy
+ << " policy.server=" << policy.server
+ << " policy.standby=" << policy.standby
+ << " policy.resetcheck=" << policy.resetcheck
+ << dendl;
memset(&reply, 0, sizeof(reply));
reply.protocol_version = msgr->get_proto_version(peer_type, false);
+ msgr->lock.Unlock();
// mismatch?
ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
<< ", their proto " << connect.protocol_version << dendl;
if (connect.protocol_version != reply.protocol_version) {
reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
- msgr->lock.Unlock();
goto reply;
}
@@ -382,18 +381,20 @@ int Pipe::accept()
if (feat_missing) {
ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
reply.tag = CEPH_MSGR_TAG_FEATURES;
- msgr->lock.Unlock();
goto reply;
}
- msgr->lock.Unlock();
-
// Check the authorizer. If not good, bail out.
- if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer,
+ pipe_lock.Unlock();
+
+ if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer,
authorizer_reply, authorizer_valid, session_key) ||
!authorizer_valid) {
ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
+ pipe_lock.Lock();
+ if (state != STATE_ACCEPTING)
+ goto shutting_down_msgr_unlocked;
reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
delete session_security;
session_security = NULL;
@@ -405,14 +406,16 @@ int Pipe::accept()
ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
msgr->lock.Lock();
+ pipe_lock.Lock();
if (msgr->dispatch_queue.stop)
goto shutting_down;
-
+ if (state != STATE_ACCEPTING)
+ goto shutting_down;
// existing?
existing = msgr->_lookup_pipe(peer_addr);
if (existing) {
- existing->pipe_lock.Lock();
+ existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
if (connect.global_seq < existing->peer_global_seq) {
ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
@@ -536,6 +539,8 @@ int Pipe::accept()
assert(0);
retry_session:
+ assert(existing->pipe_lock.is_locked());
+ assert(pipe_lock.is_locked());
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq + 1;
existing->pipe_lock.Unlock();
@@ -543,19 +548,23 @@ int Pipe::accept()
goto reply;
reply:
+ assert(pipe_lock.is_locked());
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
- rc = tcp_write((char*)&reply, sizeof(reply));
- if (rc < 0)
+ pipe_lock.Unlock();
+ r = tcp_write((char*)&reply, sizeof(reply));
+ if (r < 0)
goto fail_unlocked;
if (reply.authorizer_len) {
- rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
- if (rc < 0)
+ r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+ if (r < 0)
goto fail_unlocked;
}
}
replace:
+ assert(existing->pipe_lock.is_locked());
+ assert(pipe_lock.is_locked());
if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) {
reply_tag = CEPH_MSGR_TAG_SEQ;
existing_seq = existing->in_seq;
@@ -599,8 +608,10 @@ int Pipe::accept()
open:
// open
+ assert(pipe_lock.is_locked());
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
+ assert(state == STATE_ACCEPTING);
state = STATE_OPEN;
ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
@@ -627,23 +638,26 @@ int Pipe::accept()
// ok!
if (msgr->dispatch_queue.stop)
goto shutting_down;
+ removed = msgr->accepting_pipes.erase(this);
+ assert(removed == 1);
register_pipe();
msgr->lock.Unlock();
+ pipe_lock.Unlock();
- rc = tcp_write((char*)&reply, sizeof(reply));
- if (rc < 0) {
+ r = tcp_write((char*)&reply, sizeof(reply));
+ if (r < 0) {
goto fail_registered;
}
if (reply.authorizer_len) {
- rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
- if (rc < 0) {
+ r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+ if (r < 0) {
goto fail_registered;
}
}
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
- if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
+ if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
goto fail_registered;
}
@@ -660,7 +674,6 @@ int Pipe::accept()
start_writer();
}
ldout(msgr->cct,20) << "accept done" << dendl;
- pipe_lock.Unlock();
maybe_start_delay_thread();
@@ -693,11 +706,12 @@ int Pipe::accept()
if (queued || replaced)
start_writer();
}
- pipe_lock.Unlock();
return -1;
shutting_down:
msgr->lock.Unlock();
+ shutting_down_msgr_unlocked:
+ assert(pipe_lock.is_locked());
if (msgr->cct->_conf->ms_inject_internal_delays) {
ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
@@ -706,11 +720,9 @@ int Pipe::accept()
t.sleep();
}
- pipe_lock.Lock();
state = STATE_CLOSED;
state_closed.set(1);
fault();
- pipe_lock.Unlock();
return -1;
}
@@ -1010,7 +1022,17 @@ int Pipe::connect()
ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl;
goto fail_locked;
}
- handle_ack(newly_acked_seq);
+ ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
+ << " vs out_seq " << out_seq << dendl;
+ while (newly_acked_seq > out_seq) {
+ Message *m = _get_next_outgoing();
+ assert(m);
+ ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
+ << " " << *m << dendl;
+ assert(m->get_seq() <= newly_acked_seq);
+ m->put();
+ ++out_seq;
+ }
if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
goto fail_locked;
@@ -1096,6 +1118,7 @@ void Pipe::unregister_pipe()
msgr->rank_pipe.erase(p);
} else {
ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
+ msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
}
}
@@ -1119,6 +1142,8 @@ void Pipe::requeue_sent()
void Pipe::discard_requeued_up_to(uint64_t seq)
{
ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
+ if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
+ return;
list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
while (!rq.empty()) {
Message *m = rq.front();
@@ -1130,6 +1155,8 @@ void Pipe::discard_requeued_up_to(uint64_t seq)
rq.pop_front();
out_seq++;
}
+ if (rq.empty())
+ out_q.erase(CEPH_MSG_PRIO_HIGHEST);
}
/*
@@ -1299,11 +1326,13 @@ void Pipe::stop()
*/
void Pipe::reader()
{
- if (state == STATE_ACCEPTING)
- accept();
-
pipe_lock.Lock();
+ if (state == STATE_ACCEPTING) {
+ accept();
+ assert(pipe_lock.is_locked());
+ }
+
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
@@ -1518,10 +1547,16 @@ void Pipe::writer()
// associate message with Connection (for benefit of encode_payload)
m->set_connection(connection_state->get());
- ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
+ uint64_t features = connection_state->get_features();
+ if (m->empty_payload())
+ ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
+ << " " << m << " " << *m << dendl;
+ else
+ ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
+ << " " << m << " " << *m << dendl;
// encode and copy out of *m
- m->encode(connection_state->get_features(), !msgr->cct->_conf->ms_nocrc);
+ m->encode(features, !msgr->cct->_conf->ms_nocrc);
// prepare everything
ceph_msg_header& header = m->get_header();
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index e2a155a6038..2ba3505c5a0 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -247,14 +247,17 @@ class DispatchQueue;
void stop();
void _send(Message *m) {
+ assert(pipe_lock.is_locked());
out_q[m->get_priority()].push_back(m);
cond.Signal();
}
void _send_keepalive() {
+ assert(pipe_lock.is_locked());
keepalive = true;
cond.Signal();
}
Message *_get_next_outgoing() {
+ assert(pipe_lock.is_locked());
Message *m = 0;
while (!m && !out_q.empty()) {
map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 148300f159c..82f8a508a64 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -223,6 +223,8 @@ void SimpleMessenger::reaper()
ldout(cct,10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
p->pipe_lock.Lock();
p->discard_out_queue();
+ if (p->connection_state)
+ p->connection_state->clear_pipe(p);
p->pipe_lock.Unlock();
p->unregister_pipe();
assert(pipes.count(p));
@@ -231,8 +233,6 @@ void SimpleMessenger::reaper()
if (p->sd >= 0)
::close(p->sd);
ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
- if (p->connection_state)
- p->connection_state->clear_pipe(p);
p->put();
ldout(cct,10) << "reaper deleted pipe " << p << dendl;
}
@@ -271,9 +271,10 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr)
int SimpleMessenger::rebind(int avoid_port)
{
ldout(cct,1) << "rebind avoid " << avoid_port << dendl;
- mark_down_all();
assert(did_bind);
- return accepter.rebind(avoid_port);
+ int r = accepter.rebind(avoid_port);
+ mark_down_all();
+ return r;
}
int SimpleMessenger::start()
@@ -306,6 +307,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd)
p->start_reader();
p->pipe_lock.Unlock();
pipes.insert(p);
+ accepting_pipes.insert(p);
lock.Unlock();
return p;
}
@@ -554,6 +556,17 @@ void SimpleMessenger::mark_down_all()
{
ldout(cct,1) << "mark_down_all" << dendl;
lock.Lock();
+ for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
+ Pipe *p = *q;
+ ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
+ p->pipe_lock.Lock();
+ p->stop();
+ if (p->connection_state)
+ p->connection_state->clear_pipe(p);
+ p->pipe_lock.Unlock();
+ }
+ accepting_pipes.clear();
+
while (!rank_pipe.empty()) {
hash_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
Pipe *p = it->second;
@@ -562,6 +575,8 @@ void SimpleMessenger::mark_down_all()
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
+ if (p->connection_state)
+ p->connection_state->clear_pipe(p);
p->pipe_lock.Unlock();
}
lock.Unlock();
@@ -576,6 +591,11 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr)
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
+ if (p->connection_state) {
+ // do not generate a reset event for the caller in this case,
+ // since they asked for it.
+ p->connection_state->clear_pipe(p);
+ }
p->pipe_lock.Unlock();
} else {
ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
@@ -595,6 +615,11 @@ void SimpleMessenger::mark_down(Connection *con)
p->unregister_pipe();
p->pipe_lock.Lock();
p->stop();
+ if (p->connection_state) {
+ // do not generate a reset event for the caller in this case,
+ // since they asked for it.
+ p->connection_state->clear_pipe(p);
+ }
p->pipe_lock.Unlock();
p->put();
} else {
diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h
index 6be1a0a9539..049c7edda65 100644
--- a/src/msg/SimpleMessenger.h
+++ b/src/msg/SimpleMessenger.h
@@ -514,6 +514,12 @@ private:
* invalid and can be replaced by anyone holding the msgr lock
*/
hash_map<entity_addr_t, Pipe*> rank_pipe;
+ /**
+ * list of pipes are in teh process of accepting
+ *
+ * These are not yet in the rank_pipe map.
+ */
+ set<Pipe*> accepting_pipes;
/// a set of all the Pipes we have which are somehow active
set<Pipe*> pipes;
/// a list of Pipes we want to tear down
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index a91c252e531..c764d6bef2a 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -96,6 +96,7 @@ static const __SWORD_TYPE BTRFS_SUPER_MAGIC(0x9123683E);
#define CLUSTER_SNAP_ITEM "clustersnap_%s"
#define REPLAY_GUARD_XATTR "user.cephos.seq"
+#define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq"
/*
* long file names will have the following format:
@@ -2158,6 +2159,78 @@ int FileStore::_do_transactions(
return r;
}
+void FileStore::_set_global_replay_guard(coll_t cid,
+ const SequencerPosition &spos)
+{
+ if (btrfs_stable_commits)
+ return;
+
+ // sync all previous operations on this sequencer
+ sync_filesystem(basedir_fd);
+
+ char fn[PATH_MAX];
+ get_cdir(cid, fn, sizeof(fn));
+ int fd = ::open(fn, O_RDONLY);
+ if (fd < 0) {
+ int err = errno;
+ derr << __func__ << ": " << cid << " error " << cpp_strerror(err) << dendl;
+ assert(0 == "_set_global_replay_guard failed");
+ }
+
+ _inject_failure();
+
+ // then record that we did it
+ bufferlist v;
+ ::encode(spos, v);
+ int r = chain_fsetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length());
+ if (r < 0) {
+ derr << __func__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR
+ << " got " << cpp_strerror(r) << dendl;
+ assert(0 == "fsetxattr failed");
+ }
+
+ // and make sure our xattr is durable.
+ ::fsync(fd);
+
+ _inject_failure();
+
+ TEMP_FAILURE_RETRY(::close(fd));
+ dout(10) << __func__ << ": " << spos << " done" << dendl;
+}
+
+int FileStore::_check_global_replay_guard(coll_t cid,
+ const SequencerPosition& spos)
+{
+ if (!replaying || btrfs_stable_commits)
+ return 1;
+
+ char fn[PATH_MAX];
+ get_cdir(cid, fn, sizeof(fn));
+ int fd = ::open(fn, O_RDONLY);
+ if (fd < 0) {
+ dout(10) << __func__ << ": " << cid << " dne" << dendl;
+ return 1; // if collection does not exist, there is no guard, and we can replay.
+ }
+
+ char buf[100];
+ int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf));
+ if (r < 0) {
+ dout(20) << __func__ << " no xattr" << dendl;
+ assert(!m_filestore_fail_eio || r != -EIO);
+ return 1; // no xattr
+ }
+ bufferlist bl;
+ bl.append(buf, r);
+
+ SequencerPosition opos;
+ bufferlist::iterator p = bl.begin();
+ ::decode(opos, p);
+
+ TEMP_FAILURE_RETRY(::close(fd));
+ return spos >= opos ? 1 : -1;
+}
+
+
void FileStore::_set_replay_guard(coll_t cid,
const SequencerPosition &spos,
bool in_progress=false)
@@ -2261,6 +2334,10 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos
if (!replaying || btrfs_stable_commits)
return 1;
+ int r = _check_global_replay_guard(cid, spos);
+ if (r < 0)
+ return r;
+
int fd = lfn_open(cid, oid, 0);
if (fd < 0) {
dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl;
@@ -4283,6 +4360,16 @@ int FileStore::_collection_rename(const coll_t &cid, const coll_t &ncid,
return _collection_remove_recursive(cid, spos);
}
+ if (!collection_exists(cid)) {
+ if (replaying) {
+ // already happened
+ return 0;
+ } else {
+ return -ENOENT;
+ }
+ }
+ _set_global_replay_guard(cid, spos);
+
int ret = 0;
if (::rename(old_coll, new_coll)) {
if (replaying && !btrfs_stable_commits &&
@@ -4782,6 +4869,7 @@ int FileStore::_split_collection(coll_t cid,
if (srccmp < 0)
return 0;
+ _set_global_replay_guard(cid, spos);
_set_replay_guard(cid, spos, true);
_set_replay_guard(dest, spos, true);
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index d5ca2a4c237..e4f7e81a502 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -333,6 +333,8 @@ public:
void _set_replay_guard(coll_t cid,
const SequencerPosition& spos,
bool in_progress);
+ void _set_global_replay_guard(coll_t cid,
+ const SequencerPosition &spos);
/// close a replay guard opened with in_progress=true
void _close_replay_guard(int fd, const SequencerPosition& spos);
@@ -357,6 +359,7 @@ public:
int _check_replay_guard(int fd, const SequencerPosition& spos);
int _check_replay_guard(coll_t cid, const SequencerPosition& spos);
int _check_replay_guard(coll_t cid, hobject_t oid, const SequencerPosition& pos);
+ int _check_global_replay_guard(coll_t cid, const SequencerPosition& spos);
// ------------------
// objects
diff --git a/src/os/HashIndex.cc b/src/os/HashIndex.cc
index 216f53ce72b..6281c6113cc 100644
--- a/src/os/HashIndex.cc
+++ b/src/os/HashIndex.cc
@@ -447,18 +447,7 @@ int HashIndex::complete_merge(const vector<string> &path, subdir_info_s info) {
r = move_objects(path, dst);
if (r < 0)
return r;
-
- map<string,hobject_t> objects_dst;
- r = list_objects(dst, 0, 0, &objects_dst);
- if (r < 0)
- return r;
- set<string> subdirs;
- r = list_subdirs(dst, &subdirs);
- if (r < 0)
- return r;
- dstinfo.objs = objects_dst.size();
- dstinfo.subdirs = subdirs.size() - 1;
- r = set_info(dst, dstinfo);
+ r = reset_attr(dst);
if (r < 0)
return r;
r = remove_path(path);
@@ -576,7 +565,7 @@ int HashIndex::complete_split(const vector<string> &path, subdir_info_s info) {
if (r < 0)
return r;
info.objs = objects.size();
- r = set_info(path, info);
+ r = reset_attr(path);
if (r < 0)
return r;
r = fsync_dir(path);
diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc
index 12aabfd8fd1..92b30ceae58 100644
--- a/src/os/LFNIndex.cc
+++ b/src/os/LFNIndex.cc
@@ -71,16 +71,19 @@ int LFNIndex::init() {
}
int LFNIndex::created(const hobject_t &hoid, const char *path) {
+ WRAP_RETRY(
vector<string> path_comp;
string short_name;
- int r;
r = decompose_full_path(path, &path_comp, 0, &short_name);
if (r < 0)
- return r;
+ goto out;
r = lfn_created(path_comp, hoid, short_name);
if (r < 0)
- return r;
- return _created(path_comp, hoid, short_name);
+ goto out;
+ r = _created(path_comp, hoid, short_name);
+ if (r < 0)
+ goto out;
+ );
}
int LFNIndex::unlink(const hobject_t &hoid) {
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 825a2fea99f..6202bac3461 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -908,7 +908,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
finished_lock("OSD::finished_lock"),
test_ops_hook(NULL),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
- peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
+ peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
debug_drop_pg_create_probability(g_conf->osd_debug_drop_pg_create_probability),
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index ac2c634c1f2..2db9d3b8c44 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -798,10 +798,9 @@ private:
list<PG*> peering_queue;
OSD *osd;
set<PG*> in_use;
- const size_t batch_size;
- PeeringWQ(OSD *o, time_t ti, ThreadPool *tp, size_t batch_size)
+ PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::BatchWorkQueue<PG>(
- "OSD::PeeringWQ", ti, ti*10, tp), osd(o), batch_size(batch_size) {}
+ "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
void _dequeue(PG *pg) {
for (list<PG*>::iterator i = peering_queue.begin();
@@ -826,7 +825,8 @@ private:
void _dequeue(list<PG*> *out) {
set<PG*> got;
for (list<PG*>::iterator i = peering_queue.begin();
- i != peering_queue.end() && out->size() < batch_size;
+ i != peering_queue.end() &&
+ out->size() < g_conf->osd_peering_wq_batch_size;
) {
if (in_use.count(*i)) {
++i;
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index fb8b26ff389..b547840a0f2 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -560,7 +560,6 @@ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead)
}
assert(p->version > newhead);
dout(10) << "rewind_divergent_log future divergent " << *p << dendl;
- log.unindex(*p);
}
log.head = newhead;
@@ -568,6 +567,8 @@ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead)
if (info.last_complete > newhead)
info.last_complete = newhead;
+ log.index();
+
for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d)
merge_old_entry(t, *d);
@@ -1527,8 +1528,9 @@ void PG::activate(ObjectStore::Transaction& t,
dirty_info = true;
dirty_big_info = true; // maybe
- // clean up stray objects
- clean_up_local(t);
+ // verify that there are no stray objects
+ if (is_primary())
+ check_local();
// find out when we commit
tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch));
@@ -5013,7 +5015,8 @@ void PG::start_flush(ObjectStore::Transaction *t,
/* Called before initializing peering during advance_map */
void PG::start_peering_interval(const OSDMapRef lastmap,
const vector<int>& newup,
- const vector<int>& newacting)
+ const vector<int>& newacting,
+ ObjectStore::Transaction *t)
{
const OSDMapRef osdmap = get_osdmap();
@@ -5102,7 +5105,7 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
// pg->on_*
- on_change();
+ on_change(t);
assert(!deleting);
@@ -5903,11 +5906,6 @@ PG::RecoveryState::Started::Started(my_context ctx)
{
state_name = "Started";
context< RecoveryMachine >().log_enter(state_name);
- PG *pg = context< RecoveryMachine >().pg;
- pg->start_flush(
- context< RecoveryMachine >().get_cur_transaction(),
- context< RecoveryMachine >().get_on_applied_context_list(),
- context< RecoveryMachine >().get_on_safe_context_list());
}
boost::statechart::result
@@ -5982,7 +5980,8 @@ boost::statechart::result PG::RecoveryState::Reset::react(const AdvMap& advmap)
pg->is_split(advmap.lastmap, advmap.osdmap)) {
dout(10) << "up or acting affected, calling start_peering_interval again"
<< dendl;
- pg->start_peering_interval(advmap.lastmap, advmap.newup, advmap.newacting);
+ pg->start_peering_interval(advmap.lastmap, advmap.newup, advmap.newacting,
+ context< RecoveryMachine >().get_cur_transaction());
}
return discard_event();
}
@@ -6862,6 +6861,12 @@ PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx)
state_name = "Started/ReplicaActive";
context< RecoveryMachine >().log_enter(state_name);
+
+ PG *pg = context< RecoveryMachine >().pg;
+ pg->start_flush(
+ context< RecoveryMachine >().get_cur_transaction(),
+ context< RecoveryMachine >().get_on_applied_context_list(),
+ context< RecoveryMachine >().get_on_safe_context_list());
}
@@ -6951,6 +6956,11 @@ PG::RecoveryState::Stray::Stray(my_context ctx)
assert(!pg->is_active());
assert(!pg->is_peering());
assert(!pg->is_primary());
+ if (!pg->is_replica()) // stray, need to flush for pulls
+ pg->start_flush(
+ context< RecoveryMachine >().get_cur_transaction(),
+ context< RecoveryMachine >().get_on_applied_context_list(),
+ context< RecoveryMachine >().get_on_safe_context_list());
}
boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt)
@@ -7298,6 +7308,10 @@ boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&)
msg->info, msg->log, msg->missing,
newest_update_osd);
}
+ pg->start_flush(
+ context< RecoveryMachine >().get_cur_transaction(),
+ context< RecoveryMachine >().get_on_applied_context_list(),
+ context< RecoveryMachine >().get_on_safe_context_list());
return transit< GetMissing >();
}
diff --git a/src/osd/PG.h b/src/osd/PG.h
index 9446334bb53..f56d96d0198 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -834,7 +834,7 @@ public:
return missing.num_missing() - missing_loc.size();
}
- virtual void clean_up_local(ObjectStore::Transaction& t) = 0;
+ virtual void check_local() = 0;
virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0;
@@ -1927,7 +1927,8 @@ public:
void start_peering_interval(const OSDMapRef lastmap,
const vector<int>& newup,
- const vector<int>& newacting);
+ const vector<int>& newacting,
+ ObjectStore::Transaction *t);
void start_flush(ObjectStore::Transaction *t,
list<Context *> *on_applied,
list<Context *> *on_safe);
@@ -2009,7 +2010,7 @@ public:
virtual bool same_for_rep_modify_since(epoch_t e) = 0;
virtual void on_role_change() = 0;
- virtual void on_change() = 0;
+ virtual void on_change(ObjectStore::Transaction *t) = 0;
virtual void on_activate() = 0;
virtual void on_flushed() = 0;
virtual void on_shutdown() = 0;
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index c01d328a512..7acbca323b8 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -5260,6 +5260,9 @@ void ReplicatedPG::submit_push_data(
ObjectStore::Transaction *t)
{
if (first) {
+ dout(10) << __func__ << ": Creating oid "
+ << recovery_info.soid << " in the temp collection" << dendl;
+ temp_contents.insert(recovery_info.soid);
missing.revise_have(recovery_info.soid, eversion_t());
remove_snap_mapped_object(*t, recovery_info.soid);
t->remove(get_temp_coll(t), recovery_info.soid);
@@ -5287,6 +5290,10 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t)
{
remove_snap_mapped_object(*t, recovery_info.soid);
+ assert(temp_contents.count(recovery_info.soid));
+ dout(10) << __func__ << ": Removing oid "
+ << recovery_info.soid << " from the temp collection" << dendl;
+ temp_contents.erase(recovery_info.soid);
t->collection_move(coll, get_temp_coll(t), recovery_info.soid);
for (map<hobject_t, interval_set<uint64_t> >::const_iterator p =
recovery_info.clone_subset.begin();
@@ -6315,6 +6322,15 @@ void ReplicatedPG::on_shutdown()
void ReplicatedPG::on_flushed()
{
assert(object_contexts.empty());
+ if (have_temp_coll() &&
+ !osd->store->collection_empty(get_temp_coll())) {
+ vector<hobject_t> objects;
+ osd->store->collection_list(get_temp_coll(), objects);
+ derr << __func__ << ": found objects in the temp collection: "
+ << objects << ", crashing now"
+ << dendl;
+ assert(0 == "found garbage in the temp collection");
+ }
}
void ReplicatedPG::on_activate()
@@ -6330,7 +6346,7 @@ void ReplicatedPG::on_activate()
}
}
-void ReplicatedPG::on_change()
+void ReplicatedPG::on_change(ObjectStore::Transaction *t)
{
dout(10) << "on_change" << dendl;
@@ -6365,6 +6381,16 @@ void ReplicatedPG::on_change()
pulling.clear();
pull_from_peer.clear();
+ // clear temp
+ for (set<hobject_t>::iterator i = temp_contents.begin();
+ i != temp_contents.end();
+ ++i) {
+ dout(10) << __func__ << ": Removing oid "
+ << *i << " from the temp collection" << dendl;
+ t->remove(get_temp_coll(t), *i);
+ }
+ temp_contents.clear();
+
// clear snap_trimmer state
snap_trimmer_machine.process_event(Reset());
@@ -7095,16 +7121,19 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva
}
-/** clean_up_local
- * remove any objects that we're storing but shouldn't.
- * as determined by log.
+/** check_local
+ *
+ * verifies that stray objects have been deleted
*/
-void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
+void ReplicatedPG::check_local()
{
- dout(10) << "clean_up_local" << dendl;
+ dout(10) << __func__ << dendl;
assert(info.last_update >= log.tail); // otherwise we need some help!
+ if (!g_conf->osd_debug_verify_stray_on_activate)
+ return;
+
// just scan the log.
set<hobject_t> did;
for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin();
@@ -7115,11 +7144,17 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
did.insert(p->soid);
if (p->is_delete()) {
- dout(10) << " deleting " << p->soid
- << " when " << p->version << dendl;
- remove_snap_mapped_object(t, p->soid);
+ dout(10) << " checking " << p->soid
+ << " at " << p->version << dendl;
+ struct stat st;
+ int r = osd->store->stat(coll, p->soid, &st);
+ if (r != -ENOENT) {
+ dout(10) << "Object " << p->soid << " exists, but should have been "
+ << "deleted" << dendl;
+ assert(0 == "erroneously present object");
+ }
} else {
- // keep old(+missing) objects, just for kicks.
+ // ignore old(+missing) objects
}
}
}
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 644a277f0dc..5b989442305 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -545,6 +545,9 @@ protected:
};
map<hobject_t, PullInfo> pulling;
+ // Track contents of temp collection, clear on reset
+ set<hobject_t> temp_contents;
+
ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info);
static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
const interval_set<uint64_t> &intervals_received,
@@ -721,7 +724,7 @@ protected:
int prepare_transaction(OpContext *ctx);
// pg on-disk content
- void clean_up_local(ObjectStore::Transaction& t);
+ void check_local();
void _clear_recovery_state();
@@ -1001,7 +1004,7 @@ public:
void _finish_mark_all_unfound_lost(list<ObjectContext*>& obcs);
void on_role_change();
- void on_change();
+ void on_change(ObjectStore::Transaction *t);
void on_activate();
void on_flushed();
void on_removal(ObjectStore::Transaction *t);
diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc
index c98ffb047ac..c0b009bb95c 100644
--- a/src/test/filestore/store_test.cc
+++ b/src/test/filestore/store_test.cc
@@ -823,6 +823,75 @@ TEST_F(StoreTest, ColSplitTest3) {
}
#endif
+/**
+ * This test tests adding two different groups
+ * of objects, each with 1 common prefix and 1
+ * different prefix. We then remove half
+ * in order to verify that the merging correctly
+ * stops at the common prefix subdir. See bug
+ * #5273 */
+TEST_F(StoreTest, TwoHash) {
+ coll_t cid("asdf");
+ int r;
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(cid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ std::cout << "Making objects" << std::endl;
+ for (int i = 0; i < 360; ++i) {
+ ObjectStore::Transaction t;
+ hobject_t o;
+ if (i < 8) {
+ o.hash = (i << 16) | 0xA1;
+ t.touch(cid, o);
+ }
+ o.hash = (i << 16) | 0xB1;
+ t.touch(cid, o);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ std::cout << "Removing half" << std::endl;
+ for (int i = 1; i < 8; ++i) {
+ ObjectStore::Transaction t;
+ hobject_t o;
+ o.hash = (i << 16) | 0xA1;
+ t.remove(cid, o);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ std::cout << "Checking" << std::endl;
+ for (int i = 1; i < 8; ++i) {
+ ObjectStore::Transaction t;
+ hobject_t o;
+ o.hash = (i << 16) | 0xA1;
+ bool exists = store->exists(cid, o);
+ ASSERT_EQ(exists, false);
+ }
+ {
+ hobject_t o;
+ o.hash = 0xA1;
+ bool exists = store->exists(cid, o);
+ ASSERT_EQ(exists, true);
+ }
+ std::cout << "Cleanup" << std::endl;
+ for (int i = 0; i < 360; ++i) {
+ ObjectStore::Transaction t;
+ hobject_t o;
+ o.hash = (i << 16) | 0xA1;
+ t.remove(cid, o);
+ o.hash = (i << 16) | 0xB1;
+ t.remove(cid, o);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ ObjectStore::Transaction t;
+ t.remove_collection(cid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+}
+
int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);
@@ -830,7 +899,7 @@ int main(int argc, char **argv) {
global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
g_ceph_context->_conf->set_val("osd_journal_size", "400");
- g_ceph_context->_conf->set_val("filestore_index_retry_probability", "1");
+ g_ceph_context->_conf->set_val("filestore_index_retry_probability", "0.5");
g_ceph_context->_conf->set_val("filestore_op_thread_timeout", "1000");
g_ceph_context->_conf->set_val("filestore_op_thread_suicide_timeout", "10000");
g_ceph_context->_conf->apply_changes(NULL);