diff options
author | Samuel Just <sam.just@inktank.com> | 2012-12-10 22:00:30 -0800 |
---|---|---|
committer | Samuel Just <sam.just@inktank.com> | 2012-12-10 22:00:36 -0800 |
commit | bcf1461c7e3dad87985b8ccbb2418bfdf6831da5 (patch) | |
tree | 12173b2475fdbe26522cb784035b54ef7b22f8d3 | |
parent | e4d0aeace187afee6313d803511ba595e91dd7ed (diff) | |
parent | 1699b7dc5e946fc40101cec5a21dcaed3bb69c7a (diff) | |
download | ceph-bcf1461c7e3dad87985b8ccbb2418bfdf6831da5.tar.gz |
Merge remote-tracking branch 'upstream/wip_split2' into next
Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r-- | src/common/PrioritizedQueue.h | 27 | ||||
-rw-r--r-- | src/common/config_opts.h | 4 | ||||
-rw-r--r-- | src/common/simple_cache.hpp | 7 | ||||
-rw-r--r-- | src/include/Context.h | 1 | ||||
-rw-r--r-- | src/mon/OSDMonitor.cc | 14 | ||||
-rw-r--r-- | src/mon/PGMonitor.cc | 4 | ||||
-rw-r--r-- | src/os/CollectionIndex.h | 17 | ||||
-rw-r--r-- | src/os/FileStore.cc | 84 | ||||
-rw-r--r-- | src/os/FileStore.h | 6 | ||||
-rw-r--r-- | src/os/HashIndex.cc | 225 | ||||
-rw-r--r-- | src/os/HashIndex.h | 67 | ||||
-rw-r--r-- | src/os/IndexManager.cc | 6 | ||||
-rw-r--r-- | src/os/LFNIndex.cc | 120 | ||||
-rw-r--r-- | src/os/LFNIndex.h | 89 | ||||
-rw-r--r-- | src/os/ObjectStore.cc | 13 | ||||
-rw-r--r-- | src/os/ObjectStore.h | 21 | ||||
-rw-r--r-- | src/osd/OSD.cc | 389 | ||||
-rw-r--r-- | src/osd/OSD.h | 65 | ||||
-rw-r--r-- | src/osd/PG.cc | 203 | ||||
-rw-r--r-- | src/osd/PG.h | 16 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 9 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 1 | ||||
-rw-r--r-- | src/osd/osd_types.cc | 55 | ||||
-rw-r--r-- | src/osd/osd_types.h | 12 | ||||
-rw-r--r-- | src/test/filestore/store_test.cc | 81 |
25 files changed, 1417 insertions, 119 deletions
diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 72ec3e913e2..9c27ddc3860 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -47,14 +47,25 @@ class PrioritizedQueue { int64_t total_priority; template <class F> - static unsigned filter_list_pairs(list<pair<unsigned, T> > *l, F f) { + static unsigned filter_list_pairs( + list<pair<unsigned, T> > *l, F f, + list<T> *out) { unsigned ret = 0; + if (out) { + for (typename list<pair<unsigned, T> >::reverse_iterator i = l->rbegin(); + i != l->rend(); + ++i) { + if (f(i->second)) { + out->push_front(i->second); + } + } + } for (typename list<pair<unsigned, T> >::iterator i = l->begin(); i != l->end(); - ) { + ) { if (f(i->second)) { l->erase(i++); - ret++; + ++ret; } else { ++i; } @@ -119,11 +130,11 @@ class PrioritizedQueue { return q.empty(); } template <class F> - void remove_by_filter(F f) { + void remove_by_filter(F f, list<T> *out) { for (typename map<K, list<pair<unsigned, T> > >::iterator i = q.begin(); i != q.end(); ) { - size -= filter_list_pairs(&(i->second), f); + size -= filter_list_pairs(&(i->second), f, out); if (i->second.empty()) { if (cur == i) ++cur; @@ -205,13 +216,13 @@ public: } template <class F> - void remove_by_filter(F f) { + void remove_by_filter(F f, list<T> *removed = 0) { for (typename map<unsigned, SubQueue>::iterator i = queue.begin(); i != queue.end(); ) { unsigned priority = i->first; - i->second.remove_by_filter(f); + i->second.remove_by_filter(f, removed); if (i->second.empty()) { ++i; remove_queue(priority); @@ -222,7 +233,7 @@ public: for (typename map<unsigned, SubQueue>::iterator i = high_queue.begin(); i != high_queue.end(); ) { - i->second.remove_by_filter(f); + i->second.remove_by_filter(f, removed); if (i->second.empty()) { high_queue.erase(i++); } else { diff --git a/src/common/config_opts.h b/src/common/config_opts.h index d312ef28e0c..b04a28f3259 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -377,6 +377,10 @@ OPTION(osd_client_op_priority, OPT_INT, 63) OPTION(osd_recovery_op_priority, OPT_INT, 10) OPTION(filestore, OPT_BOOL, false) + +// Tests index failure paths +OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0) + OPTION(filestore_debug_omap_check, OPT_BOOL, 0) // Expensive debugging check on sync // Use omap for xattrs for attrs over OPTION(filestore_xattr_use_omap, OPT_BOOL, false) diff --git a/src/common/simple_cache.hpp b/src/common/simple_cache.hpp index 7d062351888..60919fd9731 100644 --- a/src/common/simple_cache.hpp +++ b/src/common/simple_cache.hpp @@ -50,17 +50,16 @@ public: pinned.insert(make_pair(key, val)); } - void clear_pinned() { + void clear_pinned(K e) { Mutex::Locker l(lock); for (typename map<K, V>::iterator i = pinned.begin(); - i != pinned.end(); - ++i) { + i != pinned.end() && i->first <= e; + pinned.erase(i++)) { if (!contents.count(i->first)) _add(i->first, i->second); else lru.splice(lru.begin(), lru, contents[i->first]); } - pinned.clear(); } void set_size(size_t new_size) { diff --git a/src/include/Context.h b/src/include/Context.h index f3da98c53d5..a64bb2de5f6 100644 --- a/src/include/Context.h +++ b/src/include/Context.h @@ -124,6 +124,7 @@ public: void finish(int r) { finish_contexts(cct, contexts, r); } + bool empty() { return contexts.empty(); } }; diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 3186c4d6bc3..476e5138b30 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -2696,7 +2696,7 @@ bool OSDMonitor::prepare_command(MMonCommand *m) return true; } } else if (m->cmd[2] == "set") { - if (m->cmd.size() != 6) { + if (m->cmd.size() < 6) { err = -EINVAL; ss << "usage: osd pool set <poolname> <field> <value>"; goto out; @@ -2740,12 +2740,12 @@ bool OSDMonitor::prepare_command(MMonCommand *m) paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs, paxos->get_version())); return true; } else if (m->cmd[4] == "pg_num") { - if (true) { - // ** DISABLE THIS FOR NOW ** - ss << "pg_num adjustment currently disabled (broken implementation)"; - // ** DISABLE THIS FOR NOW ** - } else - if (n <= p->get_pg_num()) { + if (m->cmd.size() < 6 || + m->cmd[6] != "--allow-experimental-feature") { + ss << "increasing pg_num is currently experimental, add " + << "--allow-experimental-feature as the last argument " + << "to force"; + } else if (n <= p->get_pg_num()) { ss << "specified pg_num " << n << " <= current " << p->get_pg_num(); } else if (!mon->pgmon()->pg_map.creating_pgs.empty()) { ss << "currently creating pgs, wait"; diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index b66ae895225..c679d415468 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -786,6 +786,10 @@ void PGMonitor::send_pg_creates() if (pgid.preferred() >= 0) continue; + // don't send creates for splits + if (s.parent_split_bits) + continue; + if (nrep) { pg_map.creating_pgs_by_osd[acting[0]].insert(pgid); } else { diff --git a/src/os/CollectionIndex.h b/src/os/CollectionIndex.h index d931a88b2d5..9b1ceae8c46 100644 --- a/src/os/CollectionIndex.h +++ b/src/os/CollectionIndex.h @@ -151,6 +151,20 @@ protected: int *exist ///< [out] True if the object exists, else false ) = 0; + /** + * Moves objects matching <match> in the lsb <bits> + * + * dest and this must be the same subclass + * + * @return Error Code, 0 for success + */ + virtual int split( + uint32_t match, //< [in] value to match + uint32_t bits, //< [in] bits to check + std::tr1::shared_ptr<CollectionIndex> dest //< [in] destination index + ) { assert(0); return 0; } + + /// List contents of collection by hash virtual int collection_list_partial( const hobject_t &start, ///< [in] object at which to start @@ -166,6 +180,9 @@ protected: vector<hobject_t> *ls ///< [out] Listed Objects ) = 0; + /// Call prior to removing directory + virtual int prep_delete() { return 0; } + /// Virtual destructor virtual ~CollectionIndex() {} }; diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 98ee811e586..8cb8720738e 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -2158,6 +2158,21 @@ unsigned FileStore::apply_transactions(list<Transaction*> &tls, return r; } +void FileStore::_set_replay_guard(coll_t cid, + const SequencerPosition &spos, + bool in_progress=false) +{ + char fn[PATH_MAX]; + get_cdir(cid, fn, sizeof(fn)); + int fd = ::open(fn, O_RDONLY); + if (fd < 0) { + derr << "_set_replay_guard " << cid << " error " << fd << dendl; + assert(0 == "_set_replay_guard failed"); + } + _set_replay_guard(fd, spos, 0, in_progress); + ::close(fd); +} + void FileStore::_set_replay_guard(int fd, const SequencerPosition& spos, @@ -2199,6 +2214,20 @@ void FileStore::_set_replay_guard(int fd, dout(10) << "_set_replay_guard " << spos << " done" << dendl; } +void FileStore::_close_replay_guard(coll_t cid, + const SequencerPosition &spos) +{ + char fn[PATH_MAX]; + get_cdir(cid, fn, sizeof(fn)); + int fd = ::open(fn, O_RDONLY); + if (fd < 0) { + derr << "_set_replay_guard " << cid << " error " << fd << dendl; + assert(0 == "_close_replay_guard failed"); + } + _close_replay_guard(fd, spos); + ::close(fd); +} + void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos) { if (btrfs_stable_commits) @@ -2227,7 +2256,6 @@ void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos) dout(10) << "_close_replay_guard " << spos << " done" << dendl; } - int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPosition& spos) { if (!replaying || btrfs_stable_commits) @@ -2575,6 +2603,15 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n r = _omap_setheader(cid, oid, bl, spos); } break; + case Transaction::OP_SPLIT_COLLECTION: + { + coll_t cid(i.get_cid()); + uint32_t bits(i.get_u32()); + uint32_t rem(i.get_u32()); + coll_t dest(i.get_cid()); + r = _split_collection(cid, bits, rem, dest, spos); + } + break; default: derr << "bad op " << op << dendl; @@ -4433,6 +4470,15 @@ int FileStore::_create_collection(coll_t c) int FileStore::_destroy_collection(coll_t c) { + { + Index from; + int r = get_index(c, &from); + if (r < 0) + return r; + r = from->prep_delete(); + if (r < 0) + return r; + } char fn[PATH_MAX]; get_cdir(c, fn, sizeof(fn)); dout(15) << "_destroy_collection " << fn << dendl; @@ -4555,6 +4601,42 @@ int FileStore::_omap_setheader(coll_t cid, const hobject_t &hoid, return object_map->set_header(hoid, bl, &spos); } +int FileStore::_split_collection(coll_t cid, + uint32_t bits, + uint32_t rem, + coll_t dest, + const SequencerPosition &spos) +{ + dout(15) << __func__ << " " << cid << " bits: " << bits << dendl; + int r = _create_collection(dest); + if (r < 0 && !(r == -EEXIST && replaying)) + return r; + + int dstcmp = _check_replay_guard(cid, spos); + if (dstcmp < 0) + return 0; + + int srccmp = _check_replay_guard(dest, spos); + if (srccmp < 0) + return 0; + + _set_replay_guard(cid, spos, true); + _set_replay_guard(dest, spos, true); + + Index from; + r = get_index(cid, &from); + + Index to; + if (!r) + r = get_index(dest, &to); + + if (!r) + r = from->split(rem, bits, to); + + _close_replay_guard(cid, spos); + _close_replay_guard(dest, spos); + return r; +} const char** FileStore::get_tracked_conf_keys() const { diff --git a/src/os/FileStore.h b/src/os/FileStore.h index f18e1f88269..0281e94d634 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -330,9 +330,13 @@ public: const SequencerPosition& spos, const hobject_t *hoid=0, bool in_progress=false); + void _set_replay_guard(coll_t cid, + const SequencerPosition& spos, + bool in_progress); /// close a replay guard opened with in_progress=true void _close_replay_guard(int fd, const SequencerPosition& spos); + void _close_replay_guard(coll_t cid, const SequencerPosition& spos); /** * check replay guard xattr on given file @@ -466,6 +470,8 @@ private: const SequencerPosition &spos); int _omap_setheader(coll_t cid, const hobject_t &hoid, const bufferlist &bl, const SequencerPosition &spos); + int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest, + const SequencerPosition &spos); virtual const char** get_tracked_conf_keys() const; virtual void handle_conf_change(const struct md_config_t *conf, diff --git a/src/os/HashIndex.cc b/src/os/HashIndex.cc index 8948c506411..d0d155c8d18 100644 --- a/src/os/HashIndex.cc +++ b/src/os/HashIndex.cc @@ -42,10 +42,200 @@ int HashIndex::cleanup() { return complete_split(in_progress.path, info); else if (in_progress.is_merge()) return complete_merge(in_progress.path, info); + else if (in_progress.is_col_split()) { + for (vector<string>::iterator i = in_progress.path.begin(); + i != in_progress.path.end(); + ++i) { + vector<string> path(in_progress.path.begin(), i); + int r = reset_attr(path); + if (r < 0) + return r; + } + return 0; + } else return -EINVAL; } +int HashIndex::reset_attr( + const vector<string> &path) +{ + int exists = 0; + int r = path_exists(path, &exists); + if (r < 0) + return r; + if (!exists) + return 0; + map<string, hobject_t> objects; + set<string> subdirs; + r = list_objects(path, 0, 0, &objects); + if (r < 0) + return r; + r = list_subdirs(path, &subdirs); + if (r < 0) + return r; + + subdir_info_s info; + info.hash_level = path.size(); + info.objs = objects.size(); + info.subdirs = subdirs.size(); + return set_info(path, info); +} + +int HashIndex::col_split_level( + HashIndex &from, + HashIndex &to, + const vector<string> &path, + uint32_t inbits, + uint32_t match, + unsigned *mkdirred) +{ + /* For each subdir, move, recurse, or ignore based on comparing the low order + * bits of the hash represented by the subdir path with inbits, match passed + * in. + */ + set<string> subdirs; + int r = from.list_subdirs(path, &subdirs); + if (r < 0) + return r; + map<string, hobject_t> objects; + r = from.list_objects(path, 0, 0, &objects); + if (r < 0) + return r; + + set<string> to_move; + for (set<string>::iterator i = subdirs.begin(); + i != subdirs.end(); + ++i) { + uint32_t bits = 0; + uint32_t hash = 0; + vector<string> sub_path(path.begin(), path.end()); + sub_path.push_back(*i); + path_to_hobject_hash_prefix(sub_path, &bits, &hash); + if (bits < inbits) { + if ((match & ~((~0)<<bits)) == (hash & ~((~0)<<bits))) { + r = col_split_level( + from, + to, + sub_path, + inbits, + match, + mkdirred); + if (r < 0) + return r; + if (*mkdirred > path.size()) + *mkdirred = path.size(); + } // else, skip, doesn't need to be moved or recursed into + } else { + if ((match & ~((~0)<<inbits)) == (hash & ~((~0)<<inbits))) { + to_move.insert(*i); + } + } // else, skip, doesn't need to be moved or recursed into + } + + /* Then, do the same for each object */ + map<string, hobject_t> objs_to_move; + for (map<string, hobject_t>::iterator i = objects.begin(); + i != objects.end(); + ++i) { + if ((i->second.hash & ~((~0)<<inbits)) == match) { + objs_to_move.insert(*i); + } + } + + if (objs_to_move.empty() && to_move.empty()) + return 0; + + // Make parent directories as needed + while (*mkdirred < path.size()) { + ++*mkdirred; + int exists = 0; + vector<string> creating_path(path.begin(), path.begin()+*mkdirred); + r = to.path_exists(creating_path, &exists); + if (r < 0) + return r; + if (exists) + continue; + subdir_info_s info; + info.objs = 0; + info.subdirs = 0; + info.hash_level = creating_path.size(); + if (*mkdirred < path.size() - 1) + info.subdirs = 1; + r = to.start_col_split(creating_path); + if (r < 0) + return r; + r = to.create_path(creating_path); + if (r < 0) + return r; + r = to.set_info(creating_path, info); + if (r < 0) + return r; + r = to.end_split_or_merge(creating_path); + if (r < 0) + return r; + } + + subdir_info_s from_info; + subdir_info_s to_info; + r = from.get_info(path, &from_info); + if (r < 0) + return r; + r = to.get_info(path, &to_info); + if (r < 0) + return r; + + from.start_col_split(path); + to.start_col_split(path); + + // Do subdir moves + for (set<string>::iterator i = to_move.begin(); + i != to_move.end(); + ++i) { + from_info.subdirs--; + to_info.subdirs++; + r = move_subdir(from, to, path, *i); + if (r < 0) + return r; + } + + for (map<string, hobject_t>::iterator i = objs_to_move.begin(); + i != objs_to_move.end(); + ++i) { + from_info.objs--; + to_info.objs++; + r = move_object(from, to, path, *i); + if (r < 0) + return r; + } + + + r = to.set_info(path, to_info); + if (r < 0) + return r; + r = from.set_info(path, from_info); + if (r < 0) + return r; + from.end_split_or_merge(path); + to.end_split_or_merge(path); + return 0; +} + +int HashIndex::_split( + uint32_t match, + uint32_t bits, + std::tr1::shared_ptr<CollectionIndex> dest) { + assert(collection_version() == dest->collection_version()); + unsigned mkdirred = 0; + return col_split_level( + *this, + *static_cast<HashIndex*>(dest.get()), + vector<string>(), + bits, + match, + &mkdirred); +} + int HashIndex::_init() { subdir_info_s info; vector<string> path; @@ -143,6 +333,41 @@ int HashIndex::_collection_list_partial(const hobject_t &start, return list_by_hash(path, min_count, max_count, seq, next, ls); } +int HashIndex::prep_delete() { + return recursive_remove(vector<string>()); +} + +int HashIndex::recursive_remove(const vector<string> &path) { + set<string> subdirs; + int r = list_subdirs(path, &subdirs); + if (r < 0) + return r; + map<string, hobject_t> objects; + r = list_objects(path, 0, 0, &objects); + if (r < 0) + return r; + if (objects.size()) + return -ENOTEMPTY; + vector<string> subdir(path); + for (set<string>::iterator i = subdirs.begin(); + i != subdirs.end(); + ++i) { + subdir.push_back(*i); + r = recursive_remove(subdir); + if (r < 0) + return r; + subdir.pop_back(); + } + return remove_path(path); +} + +int HashIndex::start_col_split(const vector<string> &path) { + bufferlist bl; + InProgressOp op_tag(InProgressOp::COL_SPLIT, path); + op_tag.encode(bl); + return add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl); +} + int HashIndex::start_split(const vector<string> &path) { bufferlist bl; InProgressOp op_tag(InProgressOp::SPLIT, path); diff --git a/src/os/HashIndex.h b/src/os/HashIndex.h index 41c4abef20c..fcabd9f7198 100644 --- a/src/os/HashIndex.h +++ b/src/os/HashIndex.h @@ -97,6 +97,7 @@ private: struct InProgressOp { static const int SPLIT = 0; static const int MERGE = 1; + static const int COL_SPLIT = 2; int op; vector<string> path; @@ -108,6 +109,7 @@ private: } bool is_split() const { return op == SPLIT; } + bool is_col_split() const { return op == COL_SPLIT; } bool is_merge() const { return op == MERGE; } void encode(bufferlist &bl) const { @@ -134,8 +136,10 @@ public: const char *base_path, ///< [in] Path to the index root. int merge_at, ///< [in] Merge threshhold. int split_multiple, ///< [in] Split threshhold. - uint32_t index_version)///< [in] Index version - : LFNIndex(collection, base_path, index_version), merge_threshold(merge_at), + uint32_t index_version,///< [in] Index version + double retry_probability=0) ///< [in] retry probability + : LFNIndex(collection, base_path, index_version, retry_probability), + merge_threshold(merge_at), split_multiplier(split_multiple) {} /// @see CollectionIndex @@ -143,6 +147,16 @@ public: /// @see CollectionIndex int cleanup(); + + /// @see CollectionIndex + int prep_delete(); + + /// @see CollectionIndex + int _split( + uint32_t match, + uint32_t bits, + std::tr1::shared_ptr<CollectionIndex> dest + ); protected: int _init(); @@ -175,6 +189,14 @@ protected: hobject_t *next ); private: + /// Recursively remove path and its subdirs + int recursive_remove( + const vector<string> &path ///< [in] path to remove + ); /// @return Error Code, 0 on success + /// Tag root directory at beginning of col_split + int start_col_split( + const vector<string> &path ///< [in] path to split + ); ///< @return Error Code, 0 on success /// Tag root directory at beginning of split int start_split( const vector<string> &path ///< [in] path to split @@ -221,6 +243,11 @@ private: subdir_info_s info ///< [in] Info attached to path ); /// @return Error Code, 0 on success + /// Resets attr to match actual subdir contents + int reset_attr( + const vector<string> &path ///< [in] path to cleanup + ); + /// Initiate Split int initiate_split( const vector<string> &path, ///< [in] Subdir to split @@ -239,25 +266,55 @@ private: vector<string> *path ///< [out] Path components for hoid. ); + /// do collection split for path + static int col_split_level( + HashIndex &from, ///< [in] from index + HashIndex &dest, ///< [in] to index + const vector<string> &path, ///< [in] path to split + uint32_t bits, ///< [in] num bits to match + uint32_t match, ///< [in] bits to match + unsigned *mkdirred ///< [in,out] path[:mkdirred] has been mkdirred + ); + + /** * Get string representation of hobject_t/hash * * e.g: 0x01234567 -> "76543210" */ - string get_path_str( + static string get_path_str( const hobject_t &hoid ///< [in] Object to get hash string for ); ///< @return Hash string for hoid. /// Get string from hash, @see get_path_str - string get_hash_str( + static string get_hash_str( uint32_t hash ///< [in] Hash to convert to a string. ); ///< @return String representation of hash /// Get hash from hash prefix string e.g. "FFFFAB" -> 0xFFFFAB00 - uint32_t hash_prefix_to_hash( + static uint32_t hash_prefix_to_hash( string prefix ///< [in] string to convert ); ///< @return Hash + /// Get hash mod from path + static void path_to_hobject_hash_prefix( + const vector<string> &path,///< [in] path to convert + uint32_t *bits, ///< [out] bits + uint32_t *hash ///< [out] hash + ) { + string hash_str; + for (vector<string>::const_iterator i = path.begin(); + i != path.end(); + ++i) { + hash_str.push_back(*i->begin()); + } + uint32_t rev_hash = hash_prefix_to_hash(hash_str); + if (hash) + *hash = rev_hash; + if (bits) + *bits = path.size() * 4; + } + /// Get path contents by hash int get_path_contents_by_hash( const vector<string> &path, /// [in] Path to list diff --git a/src/os/IndexManager.cc b/src/os/IndexManager.cc index 85281c4d926..11bf5c18172 100644 --- a/src/os/IndexManager.cc +++ b/src/os/IndexManager.cc @@ -75,7 +75,8 @@ int IndexManager::init_index(coll_t c, const char *path, uint32_t version) { return r; HashIndex index(c, path, g_conf->filestore_merge_threshold, g_conf->filestore_split_multiple, - CollectionIndex::HASH_INDEX_TAG_2); + CollectionIndex::HASH_INDEX_TAG_2, + g_conf->filestore_index_retry_probability); return index.init(); } @@ -110,7 +111,8 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) { // No need to check *index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold, g_conf->filestore_split_multiple, - CollectionIndex::HOBJECT_WITH_POOL), + CollectionIndex::HOBJECT_WITH_POOL, + g_conf->filestore_index_retry_probability), RemoveOnDelete(c, this)); return 0; } diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc index fc4a0d223e6..5e505638d15 100644 --- a/src/os/LFNIndex.cc +++ b/src/os/LFNIndex.cc @@ -47,6 +47,18 @@ const string LFNIndex::FILENAME_COOKIE = "long"; const int LFNIndex::FILENAME_PREFIX_LEN = FILENAME_SHORT_LEN - FILENAME_HASH_LEN - FILENAME_COOKIE.size() - FILENAME_EXTRA; +void LFNIndex::maybe_inject_failure() { + if (error_injection_enabled) { + if (current_failure > last_failure && + (((double)(rand() % 10000))/((double)(10000)) + < error_injection_probability)) { + last_failure = current_failure; + current_failure = 0; + throw RetryException(); + } + ++current_failure; + } +} /* Public methods */ @@ -72,41 +84,47 @@ int LFNIndex::created(const hobject_t &hoid, const char *path) { } int LFNIndex::unlink(const hobject_t &hoid) { + WRAP_RETRY( vector<string> path; string short_name; - int r; r = _lookup(hoid, &path, &short_name, NULL); - if (r < 0) - return r; + if (r < 0) { + goto out; + } r = _remove(path, hoid, short_name); - if (r < 0) - return r; - return 0; + if (r < 0) { + goto out; + } + ); } int LFNIndex::lookup(const hobject_t &hoid, IndexedPath *out_path, int *exist) { + WRAP_RETRY( vector<string> path; string short_name; - int r; r = _lookup(hoid, &path, &short_name, exist); if (r < 0) - return r; + goto out; string full_path = get_full_path(path, short_name); struct stat buf; + maybe_inject_failure(); r = ::stat(full_path.c_str(), &buf); + maybe_inject_failure(); if (r < 0) { if (errno == ENOENT) { *exist = 0; } else { - return -errno; + r = -errno; + goto out; } } else { *exist = 1; } *out_path = IndexedPath(new Path(full_path, self_ref)); - return 0; + r = 0; + ); } int LFNIndex::collection_list(vector<hobject_t> *ls) { @@ -126,11 +144,14 @@ int LFNIndex::collection_list_partial(const hobject_t &start, /* Derived class utility methods */ int LFNIndex::fsync_dir(const vector<string> &path) { + maybe_inject_failure(); int fd = ::open(get_full_path_subdir(path).c_str(), O_RDONLY); if (fd < 0) return -errno; + maybe_inject_failure(); int r = ::fsync(fd); TEMP_FAILURE_RETRY(::close(fd)); + maybe_inject_failure(); if (r < 0) return -errno; else @@ -144,10 +165,13 @@ int LFNIndex::link_object(const vector<string> &from, int r; string from_path = get_full_path(from, from_short_name); string to_path; + maybe_inject_failure(); r = lfn_get_name(to, hoid, 0, &to_path, 0); if (r < 0) return r; + maybe_inject_failure(); r = ::link(from_path.c_str(), to_path.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; else @@ -162,7 +186,9 @@ int LFNIndex::remove_objects(const vector<string> &dir, to_clean != to_remove.end(); ++to_clean) { if (!lfn_is_hashed_filename(to_clean->first)) { + maybe_inject_failure(); int r = ::unlink(get_full_path(dir, to_clean->first).c_str()); + maybe_inject_failure(); if (r < 0) return -errno; continue; @@ -189,14 +215,18 @@ int LFNIndex::remove_objects(const vector<string> &dir, if (candidate == chain.rend() || *i > candidate->first) { string remove_path_name = get_full_path(dir, lfn_get_short_name(to_clean->second, *i)); + maybe_inject_failure(); int r = ::unlink(remove_path_name.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; continue; } string from = get_full_path(dir, candidate->second.first); string to = get_full_path(dir, lfn_get_short_name(candidate->second.second, *i)); + maybe_inject_failure(); int r = ::rename(from.c_str(), to.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; remaining->erase(candidate->second.first); @@ -226,10 +256,13 @@ int LFNIndex::move_objects(const vector<string> &from, r = lfn_get_name(to, i->second, &to_name, &to_path, 0); if (r < 0) return r; + maybe_inject_failure(); r = ::link(from_path.c_str(), to_path.c_str()); if (r < 0 && errno != EEXIST) return -errno; + maybe_inject_failure(); r = lfn_created(to, i->second, to_name); + maybe_inject_failure(); if (r < 0) return r; } @@ -239,7 +272,9 @@ int LFNIndex::move_objects(const vector<string> &from, for (map<string,hobject_t>::iterator i = to_move.begin(); i != to_move.end(); ++i) { + maybe_inject_failure(); r = ::unlink(get_full_path(from, i->first).c_str()); + maybe_inject_failure(); if (r < 0) return -errno; } @@ -250,7 +285,9 @@ int LFNIndex::remove_object(const vector<string> &from, const hobject_t &hoid) { string short_name; int r, exist; + maybe_inject_failure(); r = get_mangled_name(from, hoid, &short_name, &exist); + maybe_inject_failure(); if (r < 0) return r; return lfn_unlink(from, hoid, short_name); @@ -262,6 +299,53 @@ int LFNIndex::get_mangled_name(const vector<string> &from, return lfn_get_name(from, hoid, mangled_name, 0, exists); } +int LFNIndex::move_subdir( + LFNIndex &from, + LFNIndex &dest, + const vector<string> &path, + string dir + ) { + vector<string> sub_path(path.begin(), path.end()); + sub_path.push_back(dir); + string from_path(from.get_full_path_subdir(sub_path)); + string to_path(dest.get_full_path_subdir(sub_path)); + int r = ::rename(from_path.c_str(), to_path.c_str()); + if (r < 0) + return -errno; + return 0; +} + +int LFNIndex::move_object( + LFNIndex &from, + LFNIndex &dest, + const vector<string> &path, + const pair<string, hobject_t> &obj + ) { + string from_path(from.get_full_path(path, obj.first)); + string to_path; + string to_name; + int exists; + int r = dest.lfn_get_name(path, obj.second, &to_name, &to_path, &exists); + if (r < 0) + return r; + if (!exists) { + r = ::link(from_path.c_str(), to_path.c_str()); + if (r < 0) + return r; + } + r = dest.lfn_created(path, obj.second, to_name); + if (r < 0) + return r; + r = dest.fsync_dir(path); + if (r < 0) + return r; + r = from.remove_object(path, obj.second); + if (r < 0) + return r; + return from.fsync_dir(path); +} + + static int get_hobject_from_oinfo(const char *dir, const char *file, hobject_t *o) { char path[PATH_MAX]; @@ -365,7 +449,9 @@ int LFNIndex::list_subdirs(const vector<string> &to_list, } int LFNIndex::create_path(const vector<string> &to_create) { + maybe_inject_failure(); int r = ::mkdir(get_full_path_subdir(to_create).c_str(), 0777); + maybe_inject_failure(); if (r < 0) return -errno; else @@ -373,7 +459,9 @@ int LFNIndex::create_path(const vector<string> &to_create) { } int LFNIndex::remove_path(const vector<string> &to_remove) { + maybe_inject_failure(); int r = ::rmdir(get_full_path_subdir(to_remove).c_str()); + maybe_inject_failure(); if (r < 0) return -errno; else @@ -401,6 +489,7 @@ int LFNIndex::add_attr_path(const vector<string> &path, const string &attr_name, bufferlist &attr_value) { string full_path = get_full_path_subdir(path); + maybe_inject_failure(); return chain_setxattr(full_path.c_str(), mangle_attr_name(attr_name).c_str(), reinterpret_cast<void *>(attr_value.c_str()), attr_value.length()); @@ -436,6 +525,7 @@ int LFNIndex::remove_attr_path(const vector<string> &path, const string &attr_name) { string full_path = get_full_path_subdir(path); string mangled_attr_name = mangle_attr_name(attr_name); + maybe_inject_failure(); return chain_removexattr(full_path.c_str(), mangled_attr_name.c_str()); } @@ -590,6 +680,7 @@ int LFNIndex::lfn_get_name(const vector<string> &path, if (exists) { struct stat buf; string full_path = get_full_path(path, full_name); + maybe_inject_failure(); r = ::stat(full_path.c_str(), &buf); if (r < 0) { if (errno == ENOENT) @@ -616,7 +707,9 @@ int LFNIndex::lfn_get_name(const vector<string> &path, return -errno; if (errno == ENODATA) { // Left over from incomplete transaction, it'll be replayed + maybe_inject_failure(); r = ::unlink(candidate_path.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; } @@ -651,6 +744,7 @@ int LFNIndex::lfn_created(const vector<string> &path, return 0; string full_path = get_full_path(path, mangled_name); string full_name = lfn_generate_object_name(hoid); + maybe_inject_failure(); return chain_setxattr(full_path.c_str(), get_lfn_attr().c_str(), full_name.c_str(), full_name.size()); } @@ -660,7 +754,9 @@ int LFNIndex::lfn_unlink(const vector<string> &path, const string &mangled_name) { if (!lfn_is_hashed_filename(mangled_name)) { string full_path = get_full_path(path, mangled_name); + maybe_inject_failure(); int r = ::unlink(full_path.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; return 0; @@ -691,7 +787,9 @@ int LFNIndex::lfn_unlink(const vector<string> &path, } if (i == removed_index + 1) { string full_path = get_full_path(path, mangled_name); + maybe_inject_failure(); int r = ::unlink(full_path.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; else @@ -699,7 +797,9 @@ int LFNIndex::lfn_unlink(const vector<string> &path, } else { string rename_to = get_full_path(path, mangled_name); string rename_from = get_full_path(path, lfn_get_short_name(hoid, i - 1)); + maybe_inject_failure(); int r = ::rename(rename_from.c_str(), rename_to.c_str()); + maybe_inject_failure(); if (r < 0) return -errno; else diff --git a/src/os/LFNIndex.h b/src/os/LFNIndex.h index f703544cb13..b3c05358822 100644 --- a/src/os/LFNIndex.h +++ b/src/os/LFNIndex.h @@ -21,6 +21,7 @@ #include <set> #include <vector> #include <tr1/memory> +#include <exception> #include "osd/osd_types.h" #include "include/object.h" @@ -48,6 +49,31 @@ * Unless otherwise noted, methods which return an int return 0 on sucess * and a negative error code on failure. */ +#define WRAP_RETRY(x) { \ + bool failed = false; \ + int r = 0; \ + init_inject_failure(); \ + while (1) { \ + try { \ + if (failed) { \ + r = cleanup(); \ + assert(r == 0); \ + } \ + { x } \ + out: \ + complete_inject_failure(); \ + return r; \ + } catch (RetryException) { \ + failed = true; \ + } catch (...) { \ + assert(0); \ + } \ + } \ + return -1; \ + } \ + + + class LFNIndex : public CollectionIndex { /// Hash digest output size. static const int FILENAME_LFN_DIGEST_SIZE = CEPH_CRYPTO_SHA1_DIGESTSIZE; @@ -78,6 +104,24 @@ class LFNIndex : public CollectionIndex { protected: const uint32_t index_version; + /// true if retry injection is enabled + struct RetryException : public exception {}; + bool error_injection_enabled; + bool error_injection_on; + double error_injection_probability; + uint64_t last_failure; + uint64_t current_failure; + void init_inject_failure() { + if (error_injection_on) { + error_injection_enabled = true; + last_failure = current_failure = 0; + } + } + void maybe_inject_failure(); + void complete_inject_failure() { + error_injection_enabled = false; + } + private: string lfn_attribute; coll_t collection; @@ -87,8 +131,14 @@ public: LFNIndex( coll_t collection, const char *base_path, ///< [in] path to Index root - uint32_t index_version) - : base_path(base_path), index_version(index_version), + uint32_t index_version, + double _error_injection_probability=0) + : base_path(base_path), + index_version(index_version), + error_injection_enabled(false), + error_injection_on(_error_injection_probability != 0), + error_injection_probability(_error_injection_probability), + last_failure(0), current_failure(0), collection(collection) { if (index_version == HASH_INDEX_TAG) { lfn_attribute = LFN_ATTR; @@ -146,6 +196,25 @@ public: hobject_t *next ); + virtual int _split( + uint32_t match, //< [in] value to match + uint32_t bits, //< [in] bits to check + std::tr1::shared_ptr<CollectionIndex> dest //< [in] destination index + ) = 0; + + /// @see CollectionIndex + int split( + uint32_t match, + uint32_t bits, + std::tr1::shared_ptr<CollectionIndex> dest + ) { + WRAP_RETRY( + r = _split(match, bits, dest); + goto out; + ); + } + + protected: virtual int _init() = 0; @@ -270,6 +339,22 @@ protected: int *exists ///< [out] 1 if the file exists, else 0 ); + /// do move subdir from from to dest + static int move_subdir( + LFNIndex &from, ///< [in] from index + LFNIndex &dest, ///< [in] to index + const vector<string> &path, ///< [in] path containing dir + string dir ///< [in] dir to move + ); + + /// do move object from from to dest + static int move_object( + LFNIndex &from, ///< [in] from index + LFNIndex &dest, ///< [in] to index + const vector<string> &path, ///< [in] path to split + const pair<string, hobject_t> &obj ///< [in] obj to move + ); + /** * Lists objects in to_list. * diff --git a/src/os/ObjectStore.cc b/src/os/ObjectStore.cc index 214f7b2bc6b..dacc5440308 100644 --- a/src/os/ObjectStore.cc +++ b/src/os/ObjectStore.cc @@ -355,6 +355,19 @@ void ObjectStore::Transaction::dump(ceph::Formatter *f) } break; + case Transaction::OP_SPLIT_COLLECTION: + { + coll_t cid(i.get_cid()); + uint32_t bits(i.get_u32()); + uint32_t rem(i.get_u32()); + coll_t dest(i.get_cid()); + f->dump_string("op_name", "op_split_collection"); + f->dump_stream("collection") << cid; + f->dump_stream("bits") << bits; + f->dump_stream("rem") << rem; + f->dump_stream("dest") << dest; + } + default: f->dump_string("op_name", "unknown"); f->dump_unsigned("op_code", op); diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 439897f273a..a1cb21c0326 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -152,6 +152,7 @@ public: OP_OMAP_SETKEYS = 32, // cid, attrset OP_OMAP_RMKEYS = 33, // cid, keyset OP_OMAP_SETHEADER = 34, // cid, header + OP_SPLIT_COLLECTION = 35, // cid, bits, destination }; private: @@ -295,6 +296,11 @@ public: void get_keyset(set<string> &keys) { ::decode(keys, p); } + uint32_t get_u32() { + uint32_t bits; + ::decode(bits, p); + return bits; + } }; iterator begin() { @@ -543,6 +549,21 @@ public: ops++; } + /// Split collection based on given prefixes + void split_collection( + coll_t cid, + uint32_t bits, + uint32_t rem, + coll_t destination) { + __u32 op = OP_SPLIT_COLLECTION; + ::encode(op, tbl); + ::encode(cid, tbl); + ::encode(bits, tbl); + ::encode(rem, tbl); + ::encode(destination, tbl); + ++ops; + } + // etc. Transaction() : ops(0), pad_unused_bytes(0), largest_data_len(0), largest_data_off(0), largest_data_off_in_tbl(0), diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index e7a1022d7ab..92092969ea0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -175,9 +175,57 @@ OSDService::OSDService(OSD *osd) : map_cache_lock("OSDService::map_lock"), map_cache(g_conf->osd_map_cache_size), map_bl_cache(g_conf->osd_map_cache_size), - map_bl_inc_cache(g_conf->osd_map_cache_size) + map_bl_inc_cache(g_conf->osd_map_cache_size), + in_progress_split_lock("OSDService::in_progress_split_lock") {} +void OSDService::_start_split(const set<pg_t> &pgs) +{ + for (set<pg_t>::const_iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + assert(!in_progress_splits.count(*i)); + in_progress_splits.insert(*i); + } +} + +void OSDService::expand_pg_num(OSDMapRef old_map, + OSDMapRef new_map) +{ + Mutex::Locker l(in_progress_split_lock); + set<pg_t> children; + for (set<pg_t>::iterator i = in_progress_splits.begin(); + i != in_progress_splits.end(); + ) { + assert(old_map->have_pg_pool(i->pool())); + if (!new_map->have_pg_pool(i->pool())) { + in_progress_splits.erase(i++); + } else { + i->is_split(old_map->get_pg_num(i->pool()), + new_map->get_pg_num(i->pool()), &children); + ++i; + } + } + _start_split(children); +} + +bool OSDService::splitting(pg_t pgid) +{ + Mutex::Locker l(in_progress_split_lock); + return in_progress_splits.count(pgid); +} + +void OSDService::complete_split(const set<pg_t> &pgs) +{ + Mutex::Locker l(in_progress_split_lock); + for (set<pg_t>::const_iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + assert(in_progress_splits.count(*i)); + in_progress_splits.erase(*i); + } +} + void OSDService::need_heartbeat_peer_update() { osd->need_heartbeat_peer_update(); @@ -1271,6 +1319,22 @@ PG *OSD::_open_lock_pg( { assert(osd_lock.is_locked()); + PG* pg = _make_pg(createmap, pgid); + + pg_map[pgid] = pg; + + if (hold_map_lock) + pg->lock_with_map_lock_held(no_lockdep_check); + else + pg->lock(no_lockdep_check); + pg->get(); // because it's in pg_map + return pg; +} + +PG* OSD::_make_pg( + OSDMapRef createmap, + pg_t pgid) +{ dout(10) << "_open_lock_pg " << pgid << dendl; PGPool pool = _get_pool(pgid.pool(), createmap); @@ -1283,17 +1347,41 @@ PG *OSD::_open_lock_pg( else assert(0); - assert(pg_map.count(pgid) == 0); - pg_map[pgid] = pg; - - if (hold_map_lock) - pg->lock_with_map_lock_held(no_lockdep_check); - else - pg->lock(no_lockdep_check); - pg->get(); // because it's in pg_map return pg; } + +void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx) +{ + epoch_t e(service.get_osdmap()->get_epoch()); + pg->get(); // For pg_map + pg_map[pg->info.pgid] = pg; + dout(10) << "Adding newly split pg " << *pg << dendl; + vector<int> up, acting; + pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid, up, acting); + int role = pg->get_osdmap()->calc_pg_role(service.whoami, acting); + pg->set_role(role); + service.reg_last_pg_scrub(pg->info.pgid, + pg->info.history.last_scrub_stamp); + pg->handle_loaded(rctx); + pg->write_if_dirty(*(rctx->transaction)); + pg->queue_null(e, e); + map<pg_t, list<PG::CephPeeringEvtRef> >::iterator to_wake = + peering_wait_for_split.find(pg->info.pgid); + if (to_wake != peering_wait_for_split.end()) { + for (list<PG::CephPeeringEvtRef>::iterator i = + to_wake->second.begin(); + i != to_wake->second.end(); + ++i) { + pg->queue_peering_event(*i); + } + peering_wait_for_split.erase(to_wake); + } + wake_pg_waiters(pg->info.pgid); + if (!service.get_osdmap()->have_pg_pool(pg->info.pgid.pool())) + _remove_pg(pg); +} + PG *OSD::_create_lock_pg( OSDMapRef createmap, pg_t pgid, bool newly_created, bool hold_map_lock, @@ -1424,10 +1512,21 @@ void OSD::load_pgs() continue; } - PG *pg = _open_lock_pg(osdmap, pgid); + bufferlist bl; + epoch_t map_epoch = PG::peek_map_epoch(store, *it, &bl); + + PG *pg = _open_lock_pg(map_epoch == 0 ? osdmap : service.get_map(map_epoch), pgid); // read pg state, log - pg->read_state(store); + pg->read_state(store, bl); + + set<pg_t> split_pgs; + if (osdmap->have_pg_pool(pg->info.pgid.pool()) && + pg->info.pgid.is_split(pg->get_osdmap()->get_pg_num(pg->info.pgid.pool()), + osdmap->get_pg_num(pg->info.pgid.pool()), + &split_pgs)) { + service.start_split(split_pgs); + } service.reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp); @@ -1532,6 +1631,7 @@ void OSD::build_past_intervals_parallel() pg->info.history.last_epoch_clean, cur_map, last_map, pg->info.pgid.pool(), + pg->info.pgid, &pg->past_intervals, &debug); if (new_interval) { @@ -1566,13 +1666,13 @@ void OSD::build_past_intervals_parallel() store->apply_transaction(t); } - /* * look up a pg. if we have it, great. if not, consider creating it IF the pg mapping * hasn't changed since the given epoch and we are the primary. */ -PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, - epoch_t epoch, int from, int& created, bool primary) +PG *OSD::get_or_create_pg( + const pg_info_t& info, pg_interval_map_t& pi, + epoch_t epoch, int from, int& created, bool primary) { PG *pg; @@ -1593,6 +1693,10 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi, return NULL; } + if (service.splitting(info.pgid)) { + assert(0); + } + bool create = false; if (primary) { assert(role == 0); // otherwise, probably bug in project_pg_history. @@ -1702,6 +1806,7 @@ void OSD::project_pg_history(pg_t pgid, pg_history_t& h, epoch_t from, e--) { // verify during intermediate epoch (e-1) OSDMapRef oldmap = get_map(e-1); + assert(oldmap->have_pg_pool(pgid.pool())); vector<int> up, acting; oldmap->pg_to_up_acting_osds(pgid, up, acting); @@ -1714,6 +1819,12 @@ void OSD::project_pg_history(pg_t pgid, pg_history_t& h, epoch_t from, << dendl; h.same_interval_since = e; } + // split? + if (pgid.is_split(oldmap->get_pg_num(pgid.pool()), + osdmap->get_pg_num(pgid.pool()), + 0)) { + h.same_interval_since = e; + } // up set change? if (up != currentup && e > h.same_up_since) { dout(15) << "project_pg_history " << pgid << " up changed in " << e @@ -3546,6 +3657,21 @@ void OSD::note_up_osd(int peer) forget_peer_epoch(peer, osdmap->get_epoch() - 1); } +struct C_OnMapApply : public Context { + OSDService *service; + boost::scoped_ptr<ObjectStore::Transaction> t; + list<OSDMapRef> pinned_maps; + epoch_t e; + C_OnMapApply(OSDService *service, + ObjectStore::Transaction *t, + const list<OSDMapRef> &pinned_maps, + epoch_t e) + : service(service), t(t), pinned_maps(pinned_maps), e(e) {} + void finish(int r) { + service->clear_map_bl_cache_pins(e); + } +}; + void OSD::handle_osd_map(MOSDMap *m) { assert(osd_lock.is_locked()); @@ -3600,7 +3726,8 @@ void OSD::handle_osd_map(MOSDMap *m) skip_maps = true; } - ObjectStore::Transaction t; + ObjectStore::Transaction *_t = new ObjectStore::Transaction; + ObjectStore::Transaction &t = *_t; // store new maps: queue for disk and put in the osdmap cache epoch_t start = MAX(osdmap->get_epoch() + 1, first); @@ -3778,17 +3905,13 @@ void OSD::handle_osd_map(MOSDMap *m) // superblock and commit write_superblock(t); - int r = store->apply_transaction(t, fin); - if (r) { - map_lock.put_write(); - derr << "error writing map: " << cpp_strerror(-r) << dendl; - m->put(); - shutdown(); - return; - } + store->queue_transaction( + 0, + _t, + new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()), + 0, fin); service.publish_superblock(superblock); - clear_map_bl_cache_pins(); map_lock.put_write(); check_osdmap_features(); @@ -3852,7 +3975,9 @@ void OSD::check_osdmap_features() } } -void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx) +void OSD::advance_pg( + epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx, + set<boost::intrusive_ptr<PG> > *new_pgs) { assert(pg->is_locked()); epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1; @@ -3866,9 +3991,22 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx) next_epoch <= osd_epoch; ++next_epoch) { OSDMapRef nextmap = get_map(next_epoch); + vector<int> newup, newacting; nextmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting); pg->handle_advance_map(nextmap, lastmap, newup, newacting, rctx); + + // Check for split! + set<pg_t> children; + if (pg->info.pgid.is_split( + lastmap->get_pg_num(pg->pool.id), + nextmap->get_pg_num(pg->pool.id), + &children)) { + split_pgs( + pg, children, new_pgs, lastmap, nextmap, + rctx); + } + lastmap = nextmap; } pg->handle_activate_map(rctx); @@ -3940,6 +4078,22 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin) waiting_for_pg.erase(p++); } } + map<pg_t, list<PG::CephPeeringEvtRef> >::iterator q = + peering_wait_for_split.begin(); + while (q != peering_wait_for_split.end()) { + pg_t pgid = q->first; + + // am i still primary? + vector<int> acting; + int nrep = osdmap->pg_to_acting_osds(pgid, acting); + int role = osdmap->calc_pg_role(whoami, acting, nrep); + if (role >= 0) { + ++q; // still me + } else { + dout(10) << " discarding waiting ops for " << pgid << dendl; + peering_wait_for_split.erase(q++); + } + } } void OSD::activate_map() @@ -3958,6 +4112,9 @@ void OSD::activate_map() list<PG*> to_remove; + service.expand_pg_num(service.get_osdmap(), + osdmap); + // scan pg's for (hash_map<pg_t,PG*>::iterator it = pg_map.begin(); it != pg_map.end(); @@ -3974,11 +4131,18 @@ void OSD::activate_map() if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean) oldest_last_clean = pg->info.history.last_epoch_clean; + set<pg_t> split_pgs; if (!osdmap->have_pg_pool(pg->info.pgid.pool())) { //pool is deleted! pg->get(); to_remove.push_back(pg); + } else if (it->first.is_split( + service.get_osdmap()->get_pg_num(it->first.pool()), + osdmap->get_pg_num(it->first.pool()), + &split_pgs)) { + service.start_split(split_pgs); } + pg->unlock(); } @@ -4183,11 +4347,11 @@ void OSDService::pin_map_bl(epoch_t e, bufferlist &bl) map_bl_cache.pin(e, bl); } -void OSDService::clear_map_bl_cache_pins() +void OSDService::clear_map_bl_cache_pins(epoch_t e) { Mutex::Locker l(map_cache_lock); - map_bl_inc_cache.clear_pinned(); - map_bl_cache.clear_pinned(); + map_bl_inc_cache.clear_pinned(e); + map_bl_cache.clear_pinned(e); } OSDMapRef OSDService::_add_map(OSDMap *o) @@ -4313,15 +4477,64 @@ bool OSD::can_create_pg(pg_t pgid) return false; } - if (creating_pgs[pgid].split_bits) { - dout(10) << "can_create_pg " << pgid << " - split" << dendl; - return false; - } - dout(10) << "can_create_pg " << pgid << " - can create now" << dendl; return true; } +void OSD::split_pgs( + PG *parent, + const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs, + OSDMapRef curmap, + OSDMapRef nextmap, + PG::RecoveryCtx *rctx) +{ + for (set<pg_t>::const_iterator i = childpgids.begin(); + i != childpgids.end(); + ++i) { + dout(10) << "Splitting " << *parent << " into " << *i << dendl; + assert(service.splitting(*i)); + PG* child = _make_pg(nextmap, *i); + child->lock(true); + out_pgs->insert(child); + + unsigned pg_num = nextmap->get_pg_num( + parent->pool.id); + + unsigned split_bits = i->get_split_bits(pg_num); + dout(10) << "pg_num is " << pg_num << dendl; + dout(10) << "m_seed " << i->ps() << dendl; + dout(10) << "split_bits is " << split_bits << dendl; + + rctx->transaction->split_collection( + coll_t(parent->info.pgid), + split_bits, + i->m_seed, + coll_t(*i)); + for (interval_set<snapid_t>::iterator k = parent->snap_collections.begin(); + k != parent->snap_collections.end(); + ++k) { + for (snapid_t j = k.get_start(); j < k.get_start() + k.get_len(); + ++j) { + rctx->transaction->split_collection( + coll_t(parent->info.pgid, j), + split_bits, + i->m_seed, + coll_t(*i, j)); + } + } + child->snap_collections = parent->snap_collections; + parent->split_into( + *i, + child, + split_bits); + + child->write_if_dirty(*(rctx->transaction)); + child->unlock(); + } + parent->write_if_dirty(*(rctx->transaction)); +} + + void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& t, C_Contexts *tfin) { @@ -4522,7 +4735,8 @@ void OSD::handle_pg_create(OpRequestRef op) pg_t pgid = p->first; epoch_t created = p->second.created; pg_t parent = p->second.parent; - int split_bits = p->second.split_bits; + if (p->second.split_bits) // Skip split pgs + continue; pg_t on = pgid; if (pgid.preferred() >= 0) { @@ -4530,13 +4744,7 @@ void OSD::handle_pg_create(OpRequestRef op) continue; } - if (split_bits) { - on = parent; - dout(20) << "mkpg " << pgid << " e" << created << " from parent " << parent - << " split by " << split_bits << " bits" << dendl; - } else { - dout(20) << "mkpg " << pgid << " e" << created << dendl; - } + dout(20) << "mkpg " << pgid << " e" << created << dendl; // is it still ours? vector<int> up, acting; @@ -4560,12 +4768,6 @@ void OSD::handle_pg_create(OpRequestRef op) continue; } - // does parent exist? - if (split_bits && !_have_pg(parent)) { - dout(10) << "mkpg " << pgid << " missing parent " << parent << ", skipping" << dendl; - continue; - } - // figure history pg_history_t history; history.epoch_created = created; @@ -4575,7 +4777,6 @@ void OSD::handle_pg_create(OpRequestRef op) // register. creating_pgs[pgid].history = history; creating_pgs[pgid].parent = parent; - creating_pgs[pgid].split_bits = split_bits; creating_pgs[pgid].acting.swap(acting); calc_priors_during(pgid, created, history.same_interval_since, creating_pgs[pgid].prior); @@ -4656,7 +4857,9 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap) delete ctx.query_map; do_infos(*ctx.info_map, curmap); delete ctx.info_map; - if (ctx.transaction->empty() || !pg) { + if ((ctx.on_applied->empty() && + ctx.on_safe->empty() && + ctx.transaction->empty()) || !pg) { delete ctx.transaction; delete ctx.on_applied; delete ctx.on_safe; @@ -4822,8 +5025,17 @@ void OSD::handle_pg_notify(OpRequestRef op) } int created = 0; + if (service.splitting(it->first.info.pgid)) { + peering_wait_for_split[it->first.info.pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + it->first.epoch_sent, it->first.query_epoch, + PG::MNotifyRec(from, it->first)))); + continue; + } + pg = get_or_create_pg(it->first.info, it->second, - it->first.query_epoch, from, created, true); + it->first.query_epoch, from, created, true); if (!pg) continue; pg->queue_notify(it->first.epoch_sent, it->first.query_epoch, from, it->first); @@ -4848,9 +5060,18 @@ void OSD::handle_pg_log(OpRequestRef op) return; } + if (service.splitting(m->info.pgid)) { + peering_wait_for_split[m->info.pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + m->get_epoch(), m->get_query_epoch(), + PG::MLogRec(from, m)))); + return; + } + int created = 0; PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(), - from, created, false); + from, created, false); if (!pg) return; op->mark_started(); @@ -4882,8 +5103,16 @@ void OSD::handle_pg_info(OpRequestRef op) continue; } + if (service.splitting(p->first.info.pgid)) { + peering_wait_for_split[p->first.info.pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + p->first.epoch_sent, p->first.query_epoch, + PG::MInfoRec(from, p->first.info, p->first.epoch_sent)))); + continue; + } PG *pg = get_or_create_pg(p->first.info, p->second, p->first.epoch_sent, - from, created, false); + from, created, false); if (!pg) continue; pg->queue_info(p->first.epoch_sent, p->first.query_epoch, from, @@ -5123,6 +5352,15 @@ void OSD::handle_pg_query(OpRequestRef op) continue; } + if (service.splitting(pgid)) { + peering_wait_for_split[pgid].push_back( + PG::CephPeeringEvtRef( + new PG::CephPeeringEvt( + it->second.epoch_sent, it->second.epoch_sent, + PG::MQuery(from, it->second, it->second.epoch_sent)))); + continue; + } + PG *pg = 0; if (pg_map.count(pgid)) { @@ -5655,6 +5893,11 @@ void OSD::handle_sub_op(OpRequestRef op) _share_map_incoming(m->get_source_inst(), m->map_epoch, (Session*)m->get_connection()->get_priv()); + if (service.splitting(pgid)) { + waiting_for_pg[pgid].push_back(op); + return; + } + PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL; if (!pg) { return; @@ -5771,7 +6014,10 @@ void OSD::OpWQ::_process(PGRef pg) OpRequestRef op; { Mutex::Locker l(qlock); - assert(pg_for_processing.count(&*pg)); + if (!pg_for_processing.count(&*pg)) { + pg->unlock(); + return; + } assert(pg_for_processing[&*pg].size()); op = pg_for_processing[&*pg].front(); pg_for_processing[&*pg].pop_front(); @@ -5782,6 +6028,12 @@ void OSD::OpWQ::_process(PGRef pg) pg->unlock(); } + +void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued) +{ + osd->op_wq.dequeue(pg, dequeued); +} + /* * NOTE: dequeue called in worker thread, with pg lock */ @@ -5806,6 +6058,30 @@ void OSDService::queue_for_peering(PG *pg) peering_wq.queue(pg); } +struct C_CompleteSplits : public Context { + OSD *osd; + set<boost::intrusive_ptr<PG> > pgs; + C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in) + : osd(osd), pgs(in) {} + void finish(int r) { + Mutex::Locker l(osd->osd_lock); + PG::RecoveryCtx rctx = osd->create_context(); + set<pg_t> to_complete; + for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + (*i)->lock(); + osd->add_newly_split_pg(&**i, &rctx); + osd->dispatch_context_transaction(rctx, &**i); + if (!((*i)->deleting)) + to_complete.insert((*i)->info.pgid); + (*i)->unlock(); + } + osd->service.complete_split(to_complete); + osd->dispatch_context(rctx, 0, osd->service.get_osdmap()); + } +}; + void OSD::process_peering_events(const list<PG*> &pgs) { bool need_up_thru = false; @@ -5815,6 +6091,7 @@ void OSD::process_peering_events(const list<PG*> &pgs) for (list<PG*>::const_iterator i = pgs.begin(); i != pgs.end(); ++i) { + set<boost::intrusive_ptr<PG> > split_pgs; PG *pg = *i; pg->lock(); curmap = service.get_osdmap(); @@ -5822,7 +6099,7 @@ void OSD::process_peering_events(const list<PG*> &pgs) pg->unlock(); continue; } - advance_pg(curmap->get_epoch(), pg, &rctx); + advance_pg(curmap->get_epoch(), pg, &rctx, &split_pgs); if (!pg->peering_queue.empty()) { PG::CephPeeringEvtRef evt = pg->peering_queue.front(); pg->peering_queue.pop_front(); @@ -5832,6 +6109,10 @@ void OSD::process_peering_events(const list<PG*> &pgs) same_interval_since = MAX(pg->info.history.same_interval_since, same_interval_since); pg->write_if_dirty(*rctx.transaction); + if (split_pgs.size()) { + rctx.on_applied->add(new C_CompleteSplits(this, split_pgs)); + split_pgs.clear(); + } dispatch_context_transaction(rctx, pg); pg->unlock(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 12d50c6f9c4..ce387391180 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -133,6 +133,7 @@ class AuthAuthorizeHandlerRegistry; class OpsFlightSocketHook; class HistoricOpsSocketHook; +struct C_CompleteSplits; extern const coll_t meta_coll; @@ -183,6 +184,8 @@ public: ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq; ClassHandler *&class_handler; + void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued); + // -- superblock -- Mutex publish_lock, pre_publish_lock; OSDSuperblock superblock; @@ -355,7 +358,7 @@ public: void _add_map_inc_bl(epoch_t e, bufferlist& bl); bool get_inc_map_bl(epoch_t e, bufferlist& bl); - void clear_map_bl_cache_pins(); + void clear_map_bl_cache_pins(epoch_t e); void need_heartbeat_peer_update(); @@ -365,6 +368,19 @@ public: void init(); void shutdown(); + // split + Mutex in_progress_split_lock; + set<pg_t> in_progress_splits; + void _start_split(const set<pg_t> &pgs); + void start_split(const set<pg_t> &pgs) { + Mutex::Locker l(in_progress_split_lock); + return _start_split(pgs); + } + void complete_split(const set<pg_t> &pgs); + bool splitting(pg_t pgid); + void expand_pg_num(OSDMapRef old_map, + OSDMapRef new_map); + OSDService(OSD *osd); }; class OSD : public Dispatcher { @@ -601,6 +617,7 @@ private: } friend class OpsFlightSocketHook; friend class HistoricOpsSocketHook; + friend class C_CompleteSplits; OpsFlightSocketHook *admin_ops_hook; HistoricOpsSocketHook *historic_ops_hook; @@ -629,9 +646,26 @@ private: return op.first == pg; } }; - void dequeue(PG *pg) { + void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) { lock(); - pqueue.remove_by_filter(Pred(pg)); + if (!dequeued) { + pqueue.remove_by_filter(Pred(pg)); + pg_for_processing.erase(pg); + } else { + list<pair<PGRef, OpRequestRef> > _dequeued; + pqueue.remove_by_filter(Pred(pg), &_dequeued); + for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin(); + i != _dequeued.end(); + ++i) { + dequeued->push_back(i->second); + } + if (pg_for_processing.count(pg)) { + dequeued->splice( + dequeued->begin(), + pg_for_processing[pg]); + pg_for_processing.erase(pg); + } + } unlock(); } bool _empty() { @@ -742,7 +776,9 @@ private: void note_down_osd(int osd); void note_up_osd(int osd); - void advance_pg(epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx); + void advance_pg( + epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx, + set<boost::intrusive_ptr<PG> > *split_pgs); void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin); void activate_map(); @@ -771,9 +807,6 @@ private: bool get_inc_map_bl(epoch_t e, bufferlist& bl) { return service.get_inc_map_bl(e, bl); } - void clear_map_bl_cache_pins() { - service.clear_map_bl_cache_pins(); - } MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to); void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false); @@ -785,6 +818,7 @@ protected: // -- placement groups -- hash_map<pg_t, PG*> pg_map; map<pg_t, list<OpRequestRef> > waiting_for_pg; + map<pg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split; PGRecoveryStats pg_recovery_stats; PGPool _get_pool(int id, OSDMapRef createmap); @@ -806,11 +840,14 @@ protected: PG *_lookup_qlock_pg(pg_t pgid); PG *lookup_lock_raw_pg(pg_t pgid); + PG* _make_pg(OSDMapRef createmap, pg_t pgid); + void add_newly_split_pg(PG *pg, + PG::RecoveryCtx *rctx); PG *get_or_create_pg(const pg_info_t& info, - pg_interval_map_t& pi, - epoch_t epoch, int from, int& pcreated, - bool primary); + pg_interval_map_t& pi, + epoch_t epoch, int from, int& pcreated, + bool primary); void load_pgs(); void build_past_intervals_parallel(); @@ -840,7 +877,6 @@ protected: vector<int> acting; set<int> prior; pg_t parent; - int split_bits; }; hash_map<pg_t, create_pg_info> creating_pgs; double debug_drop_pg_create_probability; @@ -852,7 +888,12 @@ protected: void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin); void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t); - + void split_pgs( + PG *parent, + const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs, + OSDMapRef curmap, + OSDMapRef nextmap, + PG::RecoveryCtx *rctx); // == monitor interaction == utime_t last_mon_report; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 8c4c29ba7e7..49d12ea35ef 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -869,6 +869,7 @@ void PG::generate_past_intervals() cur_map, last_map, info.pgid.pool(), + info.pgid, &past_intervals, &debug); if (new_interval) { @@ -1906,6 +1907,149 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) osd->osd->finish_recovery_op(this, soid, dequeue); } +void PG::IndexedLog::split_into( + pg_t child_pgid, + unsigned split_bits, + PG::IndexedLog *olog) +{ + list<pg_log_entry_t> oldlog; + oldlog.swap(log); + + eversion_t old_tail; + olog->head = head; + olog->tail = tail; + unsigned mask = ~((~0)<<split_bits); + for (list<pg_log_entry_t>::iterator i = oldlog.begin(); + i != oldlog.end(); + ) { + if ((i->soid.hash & mask) == child_pgid.m_seed) { + olog->log.push_back(*i); + if (log.empty()) + tail = i->version; + } else { + log.push_back(*i); + if (olog->empty()) + olog->tail = i->version; + } + oldlog.erase(i++); + } + + if (log.empty()) + tail = head; + else + head = log.rbegin()->version; + + if (olog->empty()) + olog->tail = olog->head; + else + olog->head = olog->log.rbegin()->version; + + olog->index(); + index(); +} + +static void split_list( + list<OpRequestRef> *from, + list<OpRequestRef> *to, + unsigned match, + unsigned bits) +{ + for (list<OpRequestRef>::iterator i = from->begin(); + i != from->end(); + ) { + if (PG::split_request(*i, match, bits)) { + to->push_back(*i); + from->erase(i++); + } else { + ++i; + } + } +} + +static void split_replay_queue( + map<eversion_t, OpRequestRef> *from, + map<eversion_t, OpRequestRef> *to, + unsigned match, + unsigned bits) +{ + for (map<eversion_t, OpRequestRef>::iterator i = from->begin(); + i != from->end(); + ) { + if (PG::split_request(i->second, match, bits)) { + to->insert(*i); + from->erase(i++); + } else { + ++i; + } + } +} + +void PG::split_ops(PG *child, unsigned split_bits) { + unsigned match = child->info.pgid.m_seed; + assert(waiting_for_map.empty()); + assert(waiting_for_all_missing.empty()); + assert(waiting_for_missing_object.empty()); + assert(waiting_for_degraded_object.empty()); + assert(waiting_for_ack.empty()); + assert(waiting_for_ondisk.empty()); + split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits); + + osd->dequeue_pg(this, &waiting_for_active); + split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits); +} + +void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) +{ + child->osdmap_ref = osdmap_ref; + + child->pool = pool; + + // Log + log.split_into(child_pgid, split_bits, &(child->log)); + child->info.last_complete = info.last_complete; + + info.last_update = log.head; + child->info.last_update = child->log.head; + + info.log_tail = log.tail; + child->info.log_tail = child->log.tail; + + if (info.last_complete < log.tail) + info.last_complete = log.tail; + if (child->info.last_complete < child->log.tail) + child->info.last_complete = child->log.tail; + + // Missing + missing.split_into(child_pgid, split_bits, &(child->missing)); + + // Info + child->info.history = info.history; + child->info.purged_snaps = info.purged_snaps; + child->info.last_backfill = info.last_backfill; + + child->info.stats = info.stats; + info.stats.stats_invalid = true; + child->info.stats.stats_invalid = true; + child->info.last_epoch_started = info.last_epoch_started; + + child->snap_trimq = snap_trimq; + + get_osdmap()->pg_to_up_acting_osds(child->info.pgid, child->up, child->acting); + child->role = get_osdmap()->calc_pg_role(osd->whoami, child->acting); + if (get_primary() != child->get_primary()) + child->info.history.same_primary_since = get_osdmap()->get_epoch(); + + // History + child->past_intervals = past_intervals; + + split_ops(child, split_bits); + _split_into(child_pgid, child, split_bits); + + child->dirty_info = true; + child->dirty_log = true; + dirty_info = true; + dirty_log = true; +} void PG::defer_recovery() { @@ -2150,8 +2294,9 @@ void PG::write_info(ObjectStore::Transaction& t) { // pg state bufferlist infobl; - __u8 struct_v = 4; + __u8 struct_v = 5; ::encode(struct_v, infobl); + ::encode(get_osdmap()->get_epoch(), infobl); t.collection_setattr(coll, "info", infobl); // potentially big stuff @@ -2166,6 +2311,20 @@ void PG::write_info(ObjectStore::Transaction& t) dirty_info = false; } +epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, bufferlist *bl) +{ + assert(bl); + store->collection_getattr(coll, "info", *bl); + bufferlist::iterator bp = bl->begin(); + __u8 struct_v = 0; + ::decode(struct_v, bp); + if (struct_v < 5) + return 0; + epoch_t cur_epoch = 0; + ::decode(cur_epoch, bp); + return cur_epoch; +} + void PG::write_log(ObjectStore::Transaction& t) { dout(10) << "write_log" << dendl; @@ -2612,15 +2771,12 @@ std::string PG::get_corrupt_pg_log_name() const return buf; } -void PG::read_state(ObjectStore *store) +void PG::read_state(ObjectStore *store, bufferlist &bl) { - bufferlist bl; - bufferlist::iterator p; + bufferlist::iterator p = bl.begin(); __u8 struct_v; // info - store->collection_getattr(coll, "info", bl); - p = bl.begin(); ::decode(struct_v, p); if (struct_v < 4) ::decode(info, p); @@ -4318,6 +4474,13 @@ bool PG::may_need_replay(const OSDMapRef osdmap) const return crashed; } +bool PG::is_split(OSDMapRef lastmap, OSDMapRef nextmap) +{ + return info.pgid.is_split( + lastmap->get_pg_num(pool.id), + nextmap->get_pg_num(pool.id), + 0); +} bool PG::acting_up_affected(const vector<int>& newup, const vector<int>& newacting) { @@ -4426,14 +4589,14 @@ void PG::start_peering_interval(const OSDMapRef lastmap, info.history.same_interval_since, info.history.last_epoch_clean, osdmap, - lastmap, info.pgid.pool(), &past_intervals); + lastmap, info.pgid.pool(), info.pgid, &past_intervals); if (new_interval) { dout(10) << " noting past " << past_intervals.rbegin()->second << dendl; dirty_info = true; } } - if (oldacting != acting || oldup != up) { + if (oldacting != acting || oldup != up || is_split(lastmap, osdmap)) { info.history.same_interval_since = osdmap->get_epoch(); } if (oldup != up) { @@ -4706,6 +4869,24 @@ bool PG::can_discard_request(OpRequestRef op) return true; } +bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits) +{ + unsigned mask = ~((~0)<<bits); + switch (op->request->get_type()) { + case CEPH_MSG_OSD_OP: + return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match; + case MSG_OSD_SUBOP: + return false; + case MSG_OSD_SUBOPREPLY: + return false; + case MSG_OSD_PG_SCAN: + return false; + case MSG_OSD_PG_BACKFILL: + return false; + } + return false; +} + bool PG::must_delay_request(OpRequestRef op) { switch (op->request->get_type()) { @@ -4968,7 +5149,8 @@ boost::statechart::result PG::RecoveryState::Started::react(const AdvMap& advmap { dout(10) << "Started advmap" << dendl; PG *pg = context< RecoveryMachine >().pg; - if (pg->acting_up_affected(advmap.newup, advmap.newacting)) { + if (pg->acting_up_affected(advmap.newup, advmap.newacting) || + pg->is_split(advmap.lastmap, advmap.osdmap)) { dout(10) << "up or acting affected, transitioning to Reset" << dendl; post_event(advmap); return transit< Reset >(); @@ -5020,7 +5202,8 @@ boost::statechart::result PG::RecoveryState::Reset::react(const AdvMap& advmap) pg->generate_past_intervals(); pg->remove_down_peer_info(advmap.osdmap); - if (pg->acting_up_affected(advmap.newup, advmap.newacting)) { + if (pg->acting_up_affected(advmap.newup, advmap.newacting) || + pg->is_split(advmap.lastmap, advmap.osdmap)) { dout(10) << "up or acting affected, calling start_peering_interval again" << dendl; pg->start_peering_interval(advmap.lastmap, advmap.newup, advmap.newacting); diff --git a/src/osd/PG.h b/src/osd/PG.h index b9693fb072a..2cf1173203d 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -180,6 +180,11 @@ public: index(); } + void split_into( + pg_t child_pgid, + unsigned split_bits, + IndexedLog *olog); + void zero() { unindex(); pg_log_t::clear(); @@ -653,6 +658,7 @@ protected: waiting_for_degraded_object; map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk; map<eversion_t,OpRequestRef> replay_queue; + void split_ops(PG *child, unsigned split_bits); void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m); void requeue_ops(list<OpRequestRef> &l); @@ -789,6 +795,9 @@ public: void start_recovery_op(const hobject_t& soid); void finish_recovery_op(const hobject_t& soid, bool dequeue=false); + void split_into(pg_t child_pgid, PG *child, unsigned split_bits); + virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0; + loff_t get_log_write_pos() { return 0; } @@ -1700,7 +1709,9 @@ public: void trim_peers(); std::string get_corrupt_pg_log_name() const; - void read_state(ObjectStore *store); + void read_state(ObjectStore *store, bufferlist &bl); + static epoch_t peek_map_epoch(ObjectStore *store, + coll_t coll, bufferlist *bl); coll_t make_snap_collection(ObjectStore::Transaction& t, snapid_t sn); void update_snap_collections(vector<pg_log_entry_t> &log_entries, ObjectStore::Transaction& t); @@ -1729,6 +1740,7 @@ public: void fulfill_info(int from, const pg_query_t &query, pair<int, pg_info_t> ¬ify_info); void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch); + bool is_split(OSDMapRef lastmap, OSDMapRef nextmap); bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting); // OpRequest queueing @@ -1740,6 +1752,8 @@ public: bool must_delay_request(OpRequestRef op); + static bool split_request(OpRequestRef op, unsigned match, unsigned bits); + bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch); bool old_peering_evt(CephPeeringEvtRef evt) { return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested()); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 76ad5089493..cf1b1f14683 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -6063,6 +6063,10 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs) unlock(); } +void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits) +{ + assert(repop_queue.empty()); +} /* * pg status change notification @@ -7085,6 +7089,11 @@ void ReplicatedPG::_scrub_finish() bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB); const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub")); + if (info.stats.stats_invalid) { + info.stats.stats = scrub_cstat; + info.stats.stats_invalid = false; + } + dout(10) << mode << " got " << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, " << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, " diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 5abc8e53657..fbc1b65571c 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -930,6 +930,7 @@ protected: virtual void _scrub_finish(); object_stat_collection_t scrub_cstat; + virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits); void apply_and_flush_repops(bool requeue); void calc_trim_to(); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index e8db13e50db..6fdac54aa5e 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -221,6 +221,27 @@ bool pg_t::is_split(unsigned old_pg_num, unsigned new_pg_num, set<pg_t> *childre return split; } +unsigned pg_t::get_split_bits(unsigned pg_num) const { + assert(pg_num > 1); + + // Find unique p such that pg_num \in [2^(p-1), 2^p) + unsigned p = pg_pool_t::calc_bits_of(pg_num); + + if ((m_seed % (1<<(p-1))) < (pg_num % (1<<(p-1)))) + return p; + else + return p - 1; +} + +pg_t pg_t::get_parent() const +{ + unsigned bits = pg_pool_t::calc_bits_of(m_seed); + assert(bits); + pg_t retval = *this; + retval.m_seed &= ~((~0)<<(bits - 1)); + return retval; +} + void pg_t::dump(Formatter *f) const { f->dump_unsigned("pool", m_pool); @@ -998,6 +1019,7 @@ void pg_stat_t::dump(Formatter *f) const f->dump_stream("last_deep_scrub_stamp") << last_deep_scrub_stamp; f->dump_unsigned("log_size", log_size); f->dump_unsigned("ondisk_log_size", ondisk_log_size); + f->dump_stream("stats_invalid") << stats_invalid; stats.dump(f); f->open_array_section("up"); for (vector<int>::const_iterator p = up.begin(); p != up.end(); ++p) @@ -1011,7 +1033,7 @@ void pg_stat_t::dump(Formatter *f) const void pg_stat_t::encode(bufferlist &bl) const { - ENCODE_START(10, 8, bl); + ENCODE_START(11, 8, bl); ::encode(version, bl); ::encode(reported, bl); ::encode(state, bl); @@ -1036,6 +1058,7 @@ void pg_stat_t::encode(bufferlist &bl) const ::encode(mapping_epoch, bl); ::encode(last_deep_scrub, bl); ::encode(last_deep_scrub_stamp, bl); + ::encode(stats_invalid, bl); ENCODE_FINISH(bl); } @@ -1105,6 +1128,11 @@ void pg_stat_t::decode(bufferlist::iterator &bl) } } } + if (struct_v < 11) { + stats_invalid = false; + } else { + ::decode(stats_invalid, bl); + } DECODE_FINISH(bl); } @@ -1487,14 +1515,17 @@ bool pg_interval_t::check_new_interval( OSDMapRef osdmap, OSDMapRef lastmap, int64_t pool_id, + pg_t pgid, map<epoch_t, pg_interval_t> *past_intervals, std::ostream *out) { // remember past interval if (new_acting != old_acting || new_up != old_up || (!(lastmap->get_pools().count(pool_id))) || - lastmap->get_pools().find(pool_id)->second.min_size != - osdmap->get_pools().find(pool_id)->second.min_size) { + (lastmap->get_pools().find(pool_id)->second.min_size != + osdmap->get_pools().find(pool_id)->second.min_size) || + pgid.is_split(lastmap->get_pg_num(pgid.pool()), + osdmap->get_pg_num(pgid.pool()), 0)) { pg_interval_t& i = (*past_intervals)[same_interval_since]; i.first = same_interval_since; i.last = osdmap->get_epoch() - 1; @@ -2029,6 +2060,24 @@ void pg_missing_t::got(const std::map<hobject_t, pg_missing_t::item>::iterator & missing.erase(m); } +void pg_missing_t::split_into( + pg_t child_pgid, + unsigned split_bits, + pg_missing_t *omissing) +{ + unsigned mask = ~((~0)<<split_bits); + for (map<hobject_t, item>::iterator i = missing.begin(); + i != missing.end(); + ) { + if ((i->first.hash & mask) == child_pgid.m_seed) { + omissing->add(i->first, i->second.need, i->second.have); + rm(i++); + } else { + ++i; + } + } +} + // -- pg_create_t -- void pg_create_t::encode(bufferlist &bl) const diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index c1e5aa26585..81f1aa2962d 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -235,11 +235,19 @@ struct pg_t { m_preferred = osd; } + pg_t get_parent() const; + int print(char *o, int maxlen) const; bool parse(const char *s); bool is_split(unsigned old_pg_num, unsigned new_pg_num, set<pg_t> *pchildren) const; + /** + * Returns b such that for all object o: + * ~((~0)<<b) & o.hash) == 0 iff o is in the pg for *this + */ + unsigned get_split_bits(unsigned pg_num) const; + void encode(bufferlist& bl) const { __u8 v = 1; ::encode(v, bl); @@ -879,6 +887,7 @@ struct pg_stat_t { utime_t last_deep_scrub_stamp; object_stat_collection_t stats; + bool stats_invalid; int64_t log_size; int64_t ondisk_log_size; // >= active_log_size @@ -890,6 +899,7 @@ struct pg_stat_t { : state(0), created(0), last_epoch_clean(0), parent_split_bits(0), + stats_invalid(false), log_size(0), ondisk_log_size(0), mapping_epoch(0) { } @@ -1144,6 +1154,7 @@ struct pg_interval_t { std::tr1::shared_ptr<const OSDMap> osdmap, ///< [in] current map std::tr1::shared_ptr<const OSDMap> lastmap, ///< [in] last map int64_t poolid, ///< [in] pool for pg + pg_t pgid, ///< [in] pgid for pg map<epoch_t, pg_interval_t> *past_intervals,///< [out] intervals ostream *out = 0 ///< [out] debug ostream ); @@ -1449,6 +1460,7 @@ struct pg_missing_t { void rm(const std::map<hobject_t, pg_missing_t::item>::iterator &m); void got(const hobject_t& oid, eversion_t v); void got(const std::map<hobject_t, pg_missing_t::item>::iterator &m); + void split_into(pg_t child_pgid, unsigned split_bits, pg_missing_t *omissing); void clear() { missing.clear(); diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc index af13c4493e6..c98ffb047ac 100644 --- a/src/test/filestore/store_test.cc +++ b/src/test/filestore/store_test.cc @@ -745,6 +745,84 @@ TEST_F(StoreTest, XattrTest) { ASSERT_TRUE(bl2 == attrs["attr3"]); } +void colsplittest( + ObjectStore *store, + unsigned num_objects, + unsigned common_suffix_size + ) { + coll_t cid("from"); + coll_t tid("to"); + int r = 0; + { + ObjectStore::Transaction t; + t.create_collection(cid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + { + ObjectStore::Transaction t; + for (uint32_t i = 0; i < 2*num_objects; ++i) { + stringstream objname; + objname << "obj" << i; + t.touch(cid, hobject_t( + objname.str(), + "", + CEPH_NOSNAP, + i<<common_suffix_size, + 0)); + } + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + { + ObjectStore::Transaction t; + t.split_collection(cid, common_suffix_size+1, 0, tid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); + } + + ObjectStore::Transaction t; + vector<hobject_t> objects; + r = store->collection_list(cid, objects); + ASSERT_EQ(r, 0); + ASSERT_EQ(objects.size(), num_objects); + for (vector<hobject_t>::iterator i = objects.begin(); + i != objects.end(); + ++i) { + ASSERT_EQ(!(i->hash & (1<<common_suffix_size)), 0u); + t.remove(cid, *i); + } + + objects.clear(); + r = store->collection_list(tid, objects); + ASSERT_EQ(r, 0); + ASSERT_EQ(objects.size(), num_objects); + for (vector<hobject_t>::iterator i = objects.begin(); + i != objects.end(); + ++i) { + ASSERT_EQ(i->hash & (1<<common_suffix_size), 0u); + t.remove(tid, *i); + } + + t.remove_collection(cid); + t.remove_collection(tid); + r = store->apply_transaction(t); + ASSERT_EQ(r, 0); +} + +TEST_F(StoreTest, ColSplitTest1) { + colsplittest(store.get(), 10000, 11); +} +TEST_F(StoreTest, ColSplitTest2) { + colsplittest(store.get(), 100, 7); +} + +#if 0 +TEST_F(StoreTest, ColSplitTest3) { + colsplittest(store.get(), 100000, 25); +} +#endif + int main(int argc, char **argv) { vector<const char*> args; argv_to_vec(argc, (const char **)argv, args); @@ -752,6 +830,9 @@ int main(int argc, char **argv) { global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); common_init_finish(g_ceph_context); g_ceph_context->_conf->set_val("osd_journal_size", "400"); + g_ceph_context->_conf->set_val("filestore_index_retry_probability", "1"); + g_ceph_context->_conf->set_val("filestore_op_thread_timeout", "1000"); + g_ceph_context->_conf->set_val("filestore_op_thread_suicide_timeout", "10000"); g_ceph_context->_conf->apply_changes(NULL); ::testing::InitGoogleTest(&argc, argv); |