diff options
author | Sage Weil <sage@inktank.com> | 2013-07-25 15:21:31 -0700 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-07-25 15:21:31 -0700 |
commit | 09a664e25391dbad9a479bae33904d28231f429d (patch) | |
tree | 615797c2efc22cf47ee3a1f90db11947a3dba37f | |
parent | 8f010aff684e820ecc837c25ac77c7a05d7191ff (diff) | |
parent | b0535fcf854c5042d6b5ff481aabca08026d8f7f (diff) | |
download | ceph-09a664e25391dbad9a479bae33904d28231f429d.tar.gz |
Merge remote-tracking branch 'gh/cuttlefish-next' into cuttlefish
-rw-r--r-- | src/common/config_opts.h | 2 | ||||
-rw-r--r-- | src/msg/Accepter.cc | 5 | ||||
-rw-r--r-- | src/msg/Pipe.cc | 191 | ||||
-rw-r--r-- | src/msg/Pipe.h | 3 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.cc | 33 | ||||
-rw-r--r-- | src/msg/SimpleMessenger.h | 6 | ||||
-rw-r--r-- | src/os/FileStore.cc | 88 | ||||
-rw-r--r-- | src/os/FileStore.h | 3 | ||||
-rw-r--r-- | src/os/HashIndex.cc | 15 | ||||
-rw-r--r-- | src/os/LFNIndex.cc | 11 | ||||
-rw-r--r-- | src/osd/OSD.cc | 2 | ||||
-rw-r--r-- | src/osd/OSD.h | 8 | ||||
-rw-r--r-- | src/osd/PG.cc | 36 | ||||
-rw-r--r-- | src/osd/PG.h | 7 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 55 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 7 | ||||
-rw-r--r-- | src/test/filestore/store_test.cc | 71 |
17 files changed, 412 insertions, 131 deletions
diff --git a/src/common/config_opts.h b/src/common/config_opts.h index c31282c1997..7940cc760a1 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -388,6 +388,7 @@ OPTION(osd_map_cache_size, OPT_INT, 500) OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message OPTION(osd_map_share_max_epochs, OPT_INT, 100) // cap on # of inc maps we send to peers, clients OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading +OPTION(osd_peering_wq_batch_size, OPT_U64, 20) OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304) OPTION(osd_op_pq_min_cost, OPT_U64, 65536) OPTION(osd_disk_threads, OPT_INT, 1) @@ -452,6 +453,7 @@ OPTION(osd_debug_drop_pg_create_duration, OPT_INT, 1) OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op OPTION(osd_debug_op_order, OPT_BOOL, false) OPTION(osd_debug_verify_snaps_on_info, OPT_BOOL, false) +OPTION(osd_debug_verify_stray_on_activate, OPT_BOOL, false) OPTION(osd_debug_skip_full_check_in_backfill_reservation, OPT_BOOL, false) OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track diff --git a/src/msg/Accepter.cc b/src/msg/Accepter.cc index 90c68df6cf3..a3f5e3224c6 100644 --- a/src/msg/Accepter.cc +++ b/src/msg/Accepter.cc @@ -164,6 +164,11 @@ int Accepter::rebind(int avoid_port) int old_port = addr.get_port(); addr.set_port(0); + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + msgr->my_inst.addr.nonce = nonce; + ldout(msgr->cct,10) << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl; + ldout(msgr->cct,10) << " will try " << addr << dendl; int r = bind(addr, old_port, avoid_port); if (r == 0) diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 2a42b97d92d..87426218d48 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -211,75 +211,86 @@ void *Pipe::DelayedDelivery::entry() int Pipe::accept() { ldout(msgr->cct,10) << "accept" << dendl; + assert(pipe_lock.is_locked()); + assert(state == STATE_ACCEPTING); + + pipe_lock.Unlock(); + + // vars + bufferlist addrs; + entity_addr_t socket_addr; + socklen_t len; + int r; + char banner[strlen(CEPH_BANNER)+1]; + bufferlist addrbl; + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + Pipe *existing = 0; + bufferptr bp; + bufferlist authorizer, authorizer_reply; + bool authorizer_valid; + uint64_t feat_missing; + bool replaced = false; + CryptoKey session_key; + int removed; // single-use down below + + // this should roughly mirror pseudocode at + // http://ceph.newdream.net/wiki/Messaging_protocol + int reply_tag = 0; + uint64_t existing_seq = -1; + + // used for reading in the remote acked seq on connect + uint64_t newly_acked_seq = 0; set_socket_options(); - // my creater gave me sd via accept() - assert(state == STATE_ACCEPTING); - // announce myself. - int rc = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); - if (rc < 0) { + r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); + if (r < 0) { ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } // and my addr - bufferlist addrs; ::encode(msgr->my_inst.addr, addrs); port = msgr->my_inst.addr.get_port(); // and peer's socket addr (they might not know their ip) - entity_addr_t socket_addr; - socklen_t len = sizeof(socket_addr.ss_addr()); - int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); + len = sizeof(socket_addr.ss_addr()); + r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); if (r < 0) { char buf[80]; ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } ::encode(socket_addr, addrs); - rc = tcp_write(addrs.c_str(), addrs.length()); - if (rc < 0) { + r = tcp_write(addrs.c_str(), addrs.length()); + if (r < 0) { ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl; // identify peer - char banner[strlen(CEPH_BANNER)+1]; if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) { ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { banner[strlen(CEPH_BANNER)] = 0; ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } - bufferlist addrbl; { bufferptr tp(sizeof(peer_addr)); addrbl.push_back(tp); } if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) { ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; - state = STATE_CLOSED; - state_closed.set(1); - return -1; + goto fail_unlocked; } { bufferlist::iterator ti = addrbl.begin(); @@ -297,31 +308,12 @@ int Pipe::accept() } set_peer_addr(peer_addr); // so that connection_state gets set up - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - Pipe *existing = 0; - bufferptr bp; - bufferlist authorizer, authorizer_reply; - bool authorizer_valid; - uint64_t feat_missing; - bool replaced = false; - CryptoKey session_key; - - // this should roughly mirror pseudocode at - // http://ceph.newdream.net/wiki/Messaging_protocol - int reply_tag = 0; - uint64_t existing_seq = -1; - - // used for reading in the remote acked seq on connect - uint64_t newly_acked_seq = 0; - while (1) { if (tcp_read((char*)&connect, sizeof(connect)) < 0) { ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; goto fail_unlocked; } - authorizer.clear(); if (connect.authorizer_len) { bp = buffer::create(connect.authorizer_len); @@ -338,25 +330,32 @@ int Pipe::accept() << dendl; msgr->lock.Lock(); // FIXME + pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; + if (state != STATE_ACCEPTING) { + goto shutting_down; + } // note peer's type, flags set_peer_type(connect.host_type); policy = msgr->get_policy(connect.host_type); ldout(msgr->cct,10) << "accept of host_type " << connect.host_type - << ", policy.lossy=" << policy.lossy - << dendl; + << ", policy.lossy=" << policy.lossy + << " policy.server=" << policy.server + << " policy.standby=" << policy.standby + << " policy.resetcheck=" << policy.resetcheck + << dendl; memset(&reply, 0, sizeof(reply)); reply.protocol_version = msgr->get_proto_version(peer_type, false); + msgr->lock.Unlock(); // mismatch? ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { reply.tag = CEPH_MSGR_TAG_BADPROTOVER; - msgr->lock.Unlock(); goto reply; } @@ -382,18 +381,20 @@ int Pipe::accept() if (feat_missing) { ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; reply.tag = CEPH_MSGR_TAG_FEATURES; - msgr->lock.Unlock(); goto reply; } - msgr->lock.Unlock(); - // Check the authorizer. If not good, bail out. - if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer, + pipe_lock.Unlock(); + + if (!msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; + pipe_lock.Lock(); + if (state != STATE_ACCEPTING) + goto shutting_down_msgr_unlocked; reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; delete session_security; session_security = NULL; @@ -405,14 +406,16 @@ int Pipe::accept() ldout(msgr->cct,10) << "accept: setting up session_security." << dendl; msgr->lock.Lock(); + pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; - + if (state != STATE_ACCEPTING) + goto shutting_down; // existing? existing = msgr->_lookup_pipe(peer_addr); if (existing) { - existing->pipe_lock.Lock(); + existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here) if (connect.global_seq < existing->peer_global_seq) { ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq @@ -536,6 +539,8 @@ int Pipe::accept() assert(0); retry_session: + assert(existing->pipe_lock.is_locked()); + assert(pipe_lock.is_locked()); reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq + 1; existing->pipe_lock.Unlock(); @@ -543,19 +548,23 @@ int Pipe::accept() goto reply; reply: + assert(pipe_lock.is_locked()); reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); - rc = tcp_write((char*)&reply, sizeof(reply)); - if (rc < 0) + pipe_lock.Unlock(); + r = tcp_write((char*)&reply, sizeof(reply)); + if (r < 0) goto fail_unlocked; if (reply.authorizer_len) { - rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); - if (rc < 0) + r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); + if (r < 0) goto fail_unlocked; } } replace: + assert(existing->pipe_lock.is_locked()); + assert(pipe_lock.is_locked()); if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) { reply_tag = CEPH_MSGR_TAG_SEQ; existing_seq = existing->in_seq; @@ -599,8 +608,10 @@ int Pipe::accept() open: // open + assert(pipe_lock.is_locked()); connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; + assert(state == STATE_ACCEPTING); state = STATE_OPEN; ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; @@ -627,23 +638,26 @@ int Pipe::accept() // ok! if (msgr->dispatch_queue.stop) goto shutting_down; + removed = msgr->accepting_pipes.erase(this); + assert(removed == 1); register_pipe(); msgr->lock.Unlock(); + pipe_lock.Unlock(); - rc = tcp_write((char*)&reply, sizeof(reply)); - if (rc < 0) { + r = tcp_write((char*)&reply, sizeof(reply)); + if (r < 0) { goto fail_registered; } if (reply.authorizer_len) { - rc = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); - if (rc < 0) { + r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length()); + if (r < 0) { goto fail_registered; } } if (reply_tag == CEPH_MSGR_TAG_SEQ) { - if(tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { + if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) { ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; goto fail_registered; } @@ -660,7 +674,6 @@ int Pipe::accept() start_writer(); } ldout(msgr->cct,20) << "accept done" << dendl; - pipe_lock.Unlock(); maybe_start_delay_thread(); @@ -693,11 +706,12 @@ int Pipe::accept() if (queued || replaced) start_writer(); } - pipe_lock.Unlock(); return -1; shutting_down: msgr->lock.Unlock(); + shutting_down_msgr_unlocked: + assert(pipe_lock.is_locked()); if (msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; @@ -706,11 +720,9 @@ int Pipe::accept() t.sleep(); } - pipe_lock.Lock(); state = STATE_CLOSED; state_closed.set(1); fault(); - pipe_lock.Unlock(); return -1; } @@ -1010,7 +1022,17 @@ int Pipe::connect() ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl; goto fail_locked; } - handle_ack(newly_acked_seq); + ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq + << " vs out_seq " << out_seq << dendl; + while (newly_acked_seq > out_seq) { + Message *m = _get_next_outgoing(); + assert(m); + ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq() + << " " << *m << dendl; + assert(m->get_seq() <= newly_acked_seq); + m->put(); + ++out_seq; + } if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) { ldout(msgr->cct,2) << "connect write error on in_seq" << dendl; goto fail_locked; @@ -1096,6 +1118,7 @@ void Pipe::unregister_pipe() msgr->rank_pipe.erase(p); } else { ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; + msgr->accepting_pipes.erase(this); // somewhat overkill, but safe. } } @@ -1119,6 +1142,8 @@ void Pipe::requeue_sent() void Pipe::discard_requeued_up_to(uint64_t seq) { ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; + if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) + return; list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!rq.empty()) { Message *m = rq.front(); @@ -1130,6 +1155,8 @@ void Pipe::discard_requeued_up_to(uint64_t seq) rq.pop_front(); out_seq++; } + if (rq.empty()) + out_q.erase(CEPH_MSG_PRIO_HIGHEST); } /* @@ -1299,11 +1326,13 @@ void Pipe::stop() */ void Pipe::reader() { - if (state == STATE_ACCEPTING) - accept(); - pipe_lock.Lock(); + if (state == STATE_ACCEPTING) { + accept(); + assert(pipe_lock.is_locked()); + } + // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { @@ -1518,10 +1547,16 @@ void Pipe::writer() // associate message with Connection (for benefit of encode_payload) m->set_connection(connection_state->get()); - ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; + uint64_t features = connection_state->get_features(); + if (m->empty_payload()) + ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; + else + ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; // encode and copy out of *m - m->encode(connection_state->get_features(), !msgr->cct->_conf->ms_nocrc); + m->encode(features, !msgr->cct->_conf->ms_nocrc); // prepare everything ceph_msg_header& header = m->get_header(); diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index e2a155a6038..2ba3505c5a0 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -247,14 +247,17 @@ class DispatchQueue; void stop(); void _send(Message *m) { + assert(pipe_lock.is_locked()); out_q[m->get_priority()].push_back(m); cond.Signal(); } void _send_keepalive() { + assert(pipe_lock.is_locked()); keepalive = true; cond.Signal(); } Message *_get_next_outgoing() { + assert(pipe_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { map<int, list<Message*> >::reverse_iterator p = out_q.rbegin(); diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 148300f159c..82f8a508a64 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -223,6 +223,8 @@ void SimpleMessenger::reaper() ldout(cct,10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; p->pipe_lock.Lock(); p->discard_out_queue(); + if (p->connection_state) + p->connection_state->clear_pipe(p); p->pipe_lock.Unlock(); p->unregister_pipe(); assert(pipes.count(p)); @@ -231,8 +233,6 @@ void SimpleMessenger::reaper() if (p->sd >= 0) ::close(p->sd); ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; - if (p->connection_state) - p->connection_state->clear_pipe(p); p->put(); ldout(cct,10) << "reaper deleted pipe " << p << dendl; } @@ -271,9 +271,10 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr) int SimpleMessenger::rebind(int avoid_port) { ldout(cct,1) << "rebind avoid " << avoid_port << dendl; - mark_down_all(); assert(did_bind); - return accepter.rebind(avoid_port); + int r = accepter.rebind(avoid_port); + mark_down_all(); + return r; } int SimpleMessenger::start() @@ -306,6 +307,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd) p->start_reader(); p->pipe_lock.Unlock(); pipes.insert(p); + accepting_pipes.insert(p); lock.Unlock(); return p; } @@ -554,6 +556,17 @@ void SimpleMessenger::mark_down_all() { ldout(cct,1) << "mark_down_all" << dendl; lock.Lock(); + for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) { + Pipe *p = *q; + ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl; + p->pipe_lock.Lock(); + p->stop(); + if (p->connection_state) + p->connection_state->clear_pipe(p); + p->pipe_lock.Unlock(); + } + accepting_pipes.clear(); + while (!rank_pipe.empty()) { hash_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin(); Pipe *p = it->second; @@ -562,6 +575,8 @@ void SimpleMessenger::mark_down_all() p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); + if (p->connection_state) + p->connection_state->clear_pipe(p); p->pipe_lock.Unlock(); } lock.Unlock(); @@ -576,6 +591,11 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr) p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); + if (p->connection_state) { + // do not generate a reset event for the caller in this case, + // since they asked for it. + p->connection_state->clear_pipe(p); + } p->pipe_lock.Unlock(); } else { ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl; @@ -595,6 +615,11 @@ void SimpleMessenger::mark_down(Connection *con) p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); + if (p->connection_state) { + // do not generate a reset event for the caller in this case, + // since they asked for it. + p->connection_state->clear_pipe(p); + } p->pipe_lock.Unlock(); p->put(); } else { diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 6be1a0a9539..049c7edda65 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -514,6 +514,12 @@ private: * invalid and can be replaced by anyone holding the msgr lock */ hash_map<entity_addr_t, Pipe*> rank_pipe; + /** + * list of pipes are in teh process of accepting + * + * These are not yet in the rank_pipe map. + */ + set<Pipe*> accepting_pipes; /// a set of all the Pipes we have which are somehow active set<Pipe*> pipes; /// a list of Pipes we want to tear down diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index a91c252e531..c764d6bef2a 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -96,6 +96,7 @@ static const __SWORD_TYPE BTRFS_SUPER_MAGIC(0x9123683E); #define CLUSTER_SNAP_ITEM "clustersnap_%s" #define REPLAY_GUARD_XATTR "user.cephos.seq" +#define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq" /* * long file names will have the following format: @@ -2158,6 +2159,78 @@ int FileStore::_do_transactions( return r; } +void FileStore::_set_global_replay_guard(coll_t cid, + const SequencerPosition &spos) +{ + if (btrfs_stable_commits) + return; + + // sync all previous operations on this sequencer + sync_filesystem(basedir_fd); + + char fn[PATH_MAX]; + get_cdir(cid, fn, sizeof(fn)); + int fd = ::open(fn, O_RDONLY); + if (fd < 0) { + int err = errno; + derr << __func__ << ": " << cid << " error " << cpp_strerror(err) << dendl; + assert(0 == "_set_global_replay_guard failed"); + } + + _inject_failure(); + + // then record that we did it + bufferlist v; + ::encode(spos, v); + int r = chain_fsetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length()); + if (r < 0) { + derr << __func__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR + << " got " << cpp_strerror(r) << dendl; + assert(0 == "fsetxattr failed"); + } + + // and make sure our xattr is durable. + ::fsync(fd); + + _inject_failure(); + + TEMP_FAILURE_RETRY(::close(fd)); + dout(10) << __func__ << ": " << spos << " done" << dendl; +} + +int FileStore::_check_global_replay_guard(coll_t cid, + const SequencerPosition& spos) +{ + if (!replaying || btrfs_stable_commits) + return 1; + + char fn[PATH_MAX]; + get_cdir(cid, fn, sizeof(fn)); + int fd = ::open(fn, O_RDONLY); + if (fd < 0) { + dout(10) << __func__ << ": " << cid << " dne" << dendl; + return 1; // if collection does not exist, there is no guard, and we can replay. + } + + char buf[100]; + int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf)); + if (r < 0) { + dout(20) << __func__ << " no xattr" << dendl; + assert(!m_filestore_fail_eio || r != -EIO); + return 1; // no xattr + } + bufferlist bl; + bl.append(buf, r); + + SequencerPosition opos; + bufferlist::iterator p = bl.begin(); + ::decode(opos, p); + + TEMP_FAILURE_RETRY(::close(fd)); + return spos >= opos ? 1 : -1; +} + + void FileStore::_set_replay_guard(coll_t cid, const SequencerPosition &spos, bool in_progress=false) @@ -2261,6 +2334,10 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos if (!replaying || btrfs_stable_commits) return 1; + int r = _check_global_replay_guard(cid, spos); + if (r < 0) + return r; + int fd = lfn_open(cid, oid, 0); if (fd < 0) { dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl; @@ -4283,6 +4360,16 @@ int FileStore::_collection_rename(const coll_t &cid, const coll_t &ncid, return _collection_remove_recursive(cid, spos); } + if (!collection_exists(cid)) { + if (replaying) { + // already happened + return 0; + } else { + return -ENOENT; + } + } + _set_global_replay_guard(cid, spos); + int ret = 0; if (::rename(old_coll, new_coll)) { if (replaying && !btrfs_stable_commits && @@ -4782,6 +4869,7 @@ int FileStore::_split_collection(coll_t cid, if (srccmp < 0) return 0; + _set_global_replay_guard(cid, spos); _set_replay_guard(cid, spos, true); _set_replay_guard(dest, spos, true); diff --git a/src/os/FileStore.h b/src/os/FileStore.h index d5ca2a4c237..e4f7e81a502 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -333,6 +333,8 @@ public: void _set_replay_guard(coll_t cid, const SequencerPosition& spos, bool in_progress); + void _set_global_replay_guard(coll_t cid, + const SequencerPosition &spos); /// close a replay guard opened with in_progress=true void _close_replay_guard(int fd, const SequencerPosition& spos); @@ -357,6 +359,7 @@ public: int _check_replay_guard(int fd, const SequencerPosition& spos); int _check_replay_guard(coll_t cid, const SequencerPosition& spos); int _check_replay_guard(coll_t cid, hobject_t oid, const SequencerPosition& pos); + int _check_global_replay_guard(coll_t cid, const SequencerPosition& spos); // ------------------ // objects diff --git a/src/os/HashIndex.cc b/src/os/HashIndex.cc index 216f53ce72b..6281c6113cc 100644 --- a/src/os/HashIndex.cc +++ b/src/os/HashIndex.cc @@ -447,18 +447,7 @@ int HashIndex::complete_merge(const vector<string> &path, subdir_info_s info) { r = move_objects(path, dst); if (r < 0) return r; - - map<string,hobject_t> objects_dst; - r = list_objects(dst, 0, 0, &objects_dst); - if (r < 0) - return r; - set<string> subdirs; - r = list_subdirs(dst, &subdirs); - if (r < 0) - return r; - dstinfo.objs = objects_dst.size(); - dstinfo.subdirs = subdirs.size() - 1; - r = set_info(dst, dstinfo); + r = reset_attr(dst); if (r < 0) return r; r = remove_path(path); @@ -576,7 +565,7 @@ int HashIndex::complete_split(const vector<string> &path, subdir_info_s info) { if (r < 0) return r; info.objs = objects.size(); - r = set_info(path, info); + r = reset_attr(path); if (r < 0) return r; r = fsync_dir(path); diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc index 12aabfd8fd1..92b30ceae58 100644 --- a/src/os/LFNIndex.cc +++ b/src/os/LFNIndex.cc @@ -71,16 +71,19 @@ int LFNIndex::init() { } int LFNIndex::created(const hobject_t &hoid, const char *path) { + WRAP_RETRY( vector<string> path_comp; string short_name; - int r; r = decompose_full_path(path, &path_comp, 0, &short_name); if (r < 0) - return r; + goto out; r = lfn_created(path_comp, hoid, short_name); if (r < 0) - return r; - return _created(path_comp, hoid, short_name); + goto out; + r = _created(path_comp, hoid, short_name); + if (r < 0) + goto out; + ); } int LFNIndex::unlink(const hobject_t &hoid) { diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 825a2fea99f..6202bac3461 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -908,7 +908,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, finished_lock("OSD::finished_lock"), test_ops_hook(NULL), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), - peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200), + peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp), map_lock("OSD::map_lock"), peer_map_epoch_lock("OSD::peer_map_epoch_lock"), debug_drop_pg_create_probability(g_conf->osd_debug_drop_pg_create_probability), diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ac2c634c1f2..2db9d3b8c44 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -798,10 +798,9 @@ private: list<PG*> peering_queue; OSD *osd; set<PG*> in_use; - const size_t batch_size; - PeeringWQ(OSD *o, time_t ti, ThreadPool *tp, size_t batch_size) + PeeringWQ(OSD *o, time_t ti, ThreadPool *tp) : ThreadPool::BatchWorkQueue<PG>( - "OSD::PeeringWQ", ti, ti*10, tp), osd(o), batch_size(batch_size) {} + "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {} void _dequeue(PG *pg) { for (list<PG*>::iterator i = peering_queue.begin(); @@ -826,7 +825,8 @@ private: void _dequeue(list<PG*> *out) { set<PG*> got; for (list<PG*>::iterator i = peering_queue.begin(); - i != peering_queue.end() && out->size() < batch_size; + i != peering_queue.end() && + out->size() < g_conf->osd_peering_wq_batch_size; ) { if (in_use.count(*i)) { ++i; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index fb8b26ff389..b547840a0f2 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -560,7 +560,6 @@ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead) } assert(p->version > newhead); dout(10) << "rewind_divergent_log future divergent " << *p << dendl; - log.unindex(*p); } log.head = newhead; @@ -568,6 +567,8 @@ void PG::rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead) if (info.last_complete > newhead) info.last_complete = newhead; + log.index(); + for (list<pg_log_entry_t>::iterator d = divergent.begin(); d != divergent.end(); ++d) merge_old_entry(t, *d); @@ -1527,8 +1528,9 @@ void PG::activate(ObjectStore::Transaction& t, dirty_info = true; dirty_big_info = true; // maybe - // clean up stray objects - clean_up_local(t); + // verify that there are no stray objects + if (is_primary()) + check_local(); // find out when we commit tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch)); @@ -5013,7 +5015,8 @@ void PG::start_flush(ObjectStore::Transaction *t, /* Called before initializing peering during advance_map */ void PG::start_peering_interval(const OSDMapRef lastmap, const vector<int>& newup, - const vector<int>& newacting) + const vector<int>& newacting, + ObjectStore::Transaction *t) { const OSDMapRef osdmap = get_osdmap(); @@ -5102,7 +5105,7 @@ void PG::start_peering_interval(const OSDMapRef lastmap, // pg->on_* - on_change(); + on_change(t); assert(!deleting); @@ -5903,11 +5906,6 @@ PG::RecoveryState::Started::Started(my_context ctx) { state_name = "Started"; context< RecoveryMachine >().log_enter(state_name); - PG *pg = context< RecoveryMachine >().pg; - pg->start_flush( - context< RecoveryMachine >().get_cur_transaction(), - context< RecoveryMachine >().get_on_applied_context_list(), - context< RecoveryMachine >().get_on_safe_context_list()); } boost::statechart::result @@ -5982,7 +5980,8 @@ boost::statechart::result PG::RecoveryState::Reset::react(const AdvMap& advmap) pg->is_split(advmap.lastmap, advmap.osdmap)) { dout(10) << "up or acting affected, calling start_peering_interval again" << dendl; - pg->start_peering_interval(advmap.lastmap, advmap.newup, advmap.newacting); + pg->start_peering_interval(advmap.lastmap, advmap.newup, advmap.newacting, + context< RecoveryMachine >().get_cur_transaction()); } return discard_event(); } @@ -6862,6 +6861,12 @@ PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx) state_name = "Started/ReplicaActive"; context< RecoveryMachine >().log_enter(state_name); + + PG *pg = context< RecoveryMachine >().pg; + pg->start_flush( + context< RecoveryMachine >().get_cur_transaction(), + context< RecoveryMachine >().get_on_applied_context_list(), + context< RecoveryMachine >().get_on_safe_context_list()); } @@ -6951,6 +6956,11 @@ PG::RecoveryState::Stray::Stray(my_context ctx) assert(!pg->is_active()); assert(!pg->is_peering()); assert(!pg->is_primary()); + if (!pg->is_replica()) // stray, need to flush for pulls + pg->start_flush( + context< RecoveryMachine >().get_cur_transaction(), + context< RecoveryMachine >().get_on_applied_context_list(), + context< RecoveryMachine >().get_on_safe_context_list()); } boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt) @@ -7298,6 +7308,10 @@ boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&) msg->info, msg->log, msg->missing, newest_update_osd); } + pg->start_flush( + context< RecoveryMachine >().get_cur_transaction(), + context< RecoveryMachine >().get_on_applied_context_list(), + context< RecoveryMachine >().get_on_safe_context_list()); return transit< GetMissing >(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 9446334bb53..f56d96d0198 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -834,7 +834,7 @@ public: return missing.num_missing() - missing_loc.size(); } - virtual void clean_up_local(ObjectStore::Transaction& t) = 0; + virtual void check_local() = 0; virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0; @@ -1927,7 +1927,8 @@ public: void start_peering_interval(const OSDMapRef lastmap, const vector<int>& newup, - const vector<int>& newacting); + const vector<int>& newacting, + ObjectStore::Transaction *t); void start_flush(ObjectStore::Transaction *t, list<Context *> *on_applied, list<Context *> *on_safe); @@ -2009,7 +2010,7 @@ public: virtual bool same_for_rep_modify_since(epoch_t e) = 0; virtual void on_role_change() = 0; - virtual void on_change() = 0; + virtual void on_change(ObjectStore::Transaction *t) = 0; virtual void on_activate() = 0; virtual void on_flushed() = 0; virtual void on_shutdown() = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index c01d328a512..7acbca323b8 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -5260,6 +5260,9 @@ void ReplicatedPG::submit_push_data( ObjectStore::Transaction *t) { if (first) { + dout(10) << __func__ << ": Creating oid " + << recovery_info.soid << " in the temp collection" << dendl; + temp_contents.insert(recovery_info.soid); missing.revise_have(recovery_info.soid, eversion_t()); remove_snap_mapped_object(*t, recovery_info.soid); t->remove(get_temp_coll(t), recovery_info.soid); @@ -5287,6 +5290,10 @@ void ReplicatedPG::submit_push_complete(ObjectRecoveryInfo &recovery_info, ObjectStore::Transaction *t) { remove_snap_mapped_object(*t, recovery_info.soid); + assert(temp_contents.count(recovery_info.soid)); + dout(10) << __func__ << ": Removing oid " + << recovery_info.soid << " from the temp collection" << dendl; + temp_contents.erase(recovery_info.soid); t->collection_move(coll, get_temp_coll(t), recovery_info.soid); for (map<hobject_t, interval_set<uint64_t> >::const_iterator p = recovery_info.clone_subset.begin(); @@ -6315,6 +6322,15 @@ void ReplicatedPG::on_shutdown() void ReplicatedPG::on_flushed() { assert(object_contexts.empty()); + if (have_temp_coll() && + !osd->store->collection_empty(get_temp_coll())) { + vector<hobject_t> objects; + osd->store->collection_list(get_temp_coll(), objects); + derr << __func__ << ": found objects in the temp collection: " + << objects << ", crashing now" + << dendl; + assert(0 == "found garbage in the temp collection"); + } } void ReplicatedPG::on_activate() @@ -6330,7 +6346,7 @@ void ReplicatedPG::on_activate() } } -void ReplicatedPG::on_change() +void ReplicatedPG::on_change(ObjectStore::Transaction *t) { dout(10) << "on_change" << dendl; @@ -6365,6 +6381,16 @@ void ReplicatedPG::on_change() pulling.clear(); pull_from_peer.clear(); + // clear temp + for (set<hobject_t>::iterator i = temp_contents.begin(); + i != temp_contents.end(); + ++i) { + dout(10) << __func__ << ": Removing oid " + << *i << " from the temp collection" << dendl; + t->remove(get_temp_coll(t), *i); + } + temp_contents.clear(); + // clear snap_trimmer state snap_trimmer_machine.process_event(Reset()); @@ -7095,16 +7121,19 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva } -/** clean_up_local - * remove any objects that we're storing but shouldn't. - * as determined by log. +/** check_local + * + * verifies that stray objects have been deleted */ -void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) +void ReplicatedPG::check_local() { - dout(10) << "clean_up_local" << dendl; + dout(10) << __func__ << dendl; assert(info.last_update >= log.tail); // otherwise we need some help! + if (!g_conf->osd_debug_verify_stray_on_activate) + return; + // just scan the log. set<hobject_t> did; for (list<pg_log_entry_t>::reverse_iterator p = log.log.rbegin(); @@ -7115,11 +7144,17 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) did.insert(p->soid); if (p->is_delete()) { - dout(10) << " deleting " << p->soid - << " when " << p->version << dendl; - remove_snap_mapped_object(t, p->soid); + dout(10) << " checking " << p->soid + << " at " << p->version << dendl; + struct stat st; + int r = osd->store->stat(coll, p->soid, &st); + if (r != -ENOENT) { + dout(10) << "Object " << p->soid << " exists, but should have been " + << "deleted" << dendl; + assert(0 == "erroneously present object"); + } } else { - // keep old(+missing) objects, just for kicks. + // ignore old(+missing) objects } } } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 644a277f0dc..5b989442305 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -545,6 +545,9 @@ protected: }; map<hobject_t, PullInfo> pulling; + // Track contents of temp collection, clear on reset + set<hobject_t> temp_contents; + ObjectRecoveryInfo recalc_subsets(const ObjectRecoveryInfo& recovery_info); static void trim_pushed_data(const interval_set<uint64_t> ©_subset, const interval_set<uint64_t> &intervals_received, @@ -721,7 +724,7 @@ protected: int prepare_transaction(OpContext *ctx); // pg on-disk content - void clean_up_local(ObjectStore::Transaction& t); + void check_local(); void _clear_recovery_state(); @@ -1001,7 +1004,7 @@ public: void _finish_mark_all_unfound_lost(list<ObjectContext*>& obcs); void on_role_change(); - void on_change(); + void on_change(ObjectStore::Transaction *t); void on_activate(); void on_flushed(); void on_removal(ObjectStore::Transaction *t); diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc index c98ffb047ac..c0b009bb95c 100644 --- a/src/test/filestore/store_test.cc +++ b/src/test/filestore/store_test.cc @@ -823,6 +823,75 @@ TEST_F(StoreTest, ColSplitTest3) { } #endif +/** + * This test tests adding two different groups + * of objects, each with 1 common prefix and 1 + * different prefix. We then remove half + * in order to verify that the merging correctly + * stops at the common prefix subdir. See bug + * #5273 */ +TEST_F(StoreTest, TwoHash) { + coll_t cid("asdf"); + int r; + { + ObjectStore::Transaction t; + t.create_collection(cid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + std::cout << "Making objects" << std::endl; + for (int i = 0; i < 360; ++i) { + ObjectStore::Transaction t; + hobject_t o; + if (i < 8) { + o.hash = (i << 16) | 0xA1; + t.touch(cid, o); + } + o.hash = (i << 16) | 0xB1; + t.touch(cid, o); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + std::cout << "Removing half" << std::endl; + for (int i = 1; i < 8; ++i) { + ObjectStore::Transaction t; + hobject_t o; + o.hash = (i << 16) | 0xA1; + t.remove(cid, o); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + std::cout << "Checking" << std::endl; + for (int i = 1; i < 8; ++i) { + ObjectStore::Transaction t; + hobject_t o; + o.hash = (i << 16) | 0xA1; + bool exists = store->exists(cid, o); + ASSERT_EQ(exists, false); + } + { + hobject_t o; + o.hash = 0xA1; + bool exists = store->exists(cid, o); + ASSERT_EQ(exists, true); + } + std::cout << "Cleanup" << std::endl; + for (int i = 0; i < 360; ++i) { + ObjectStore::Transaction t; + hobject_t o; + o.hash = (i << 16) | 0xA1; + t.remove(cid, o); + o.hash = (i << 16) | 0xB1; + t.remove(cid, o); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + ObjectStore::Transaction t; + t.remove_collection(cid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); +} + int main(int argc, char **argv) { vector<const char*> args; argv_to_vec(argc, (const char **)argv, args); @@ -830,7 +899,7 @@ int main(int argc, char **argv) { global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); common_init_finish(g_ceph_context); g_ceph_context->_conf->set_val("osd_journal_size", "400"); - g_ceph_context->_conf->set_val("filestore_index_retry_probability", "1"); + g_ceph_context->_conf->set_val("filestore_index_retry_probability", "0.5"); g_ceph_context->_conf->set_val("filestore_op_thread_timeout", "1000"); g_ceph_context->_conf->set_val("filestore_op_thread_suicide_timeout", "10000"); g_ceph_context->_conf->apply_changes(NULL); |