summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-12-11 17:16:19 -0800
committerSage Weil <sage@inktank.com>2012-12-11 17:16:19 -0800
commit84f90a09abc3f53c32881efd5c4d2cd44e21a2e4 (patch)
tree0852ebbd6040da15c052c0031a5d909a485cbe3f
parent6a8a58dc4b71df6d291d67ddad0b5667289d6d3b (diff)
parentcaea0cbf9f63d74506d69a596dd3f78097d68da5 (diff)
downloadceph-84f90a09abc3f53c32881efd5c4d2cd44e21a2e4.tar.gz
Merge branch 'next'
-rw-r--r--src/Makefile.am2
-rw-r--r--src/ceph_mds.cc8
-rw-r--r--src/common/ConfUtils.cc3
-rw-r--r--src/common/PrioritizedQueue.h27
-rw-r--r--src/common/config_opts.h4
-rw-r--r--src/common/simple_cache.hpp7
-rw-r--r--src/global/global_init.cc9
-rw-r--r--src/include/Context.h1
-rw-r--r--src/include/atomic.h2
-rw-r--r--src/mds/MDS.cc18
-rw-r--r--src/mds/events/EMetaBlob.h8
-rw-r--r--src/mon/OSDMonitor.cc14
-rw-r--r--src/mon/PGMonitor.cc4
-rw-r--r--src/os/CollectionIndex.h17
-rw-r--r--src/os/FileStore.cc84
-rw-r--r--src/os/FileStore.h6
-rw-r--r--src/os/HashIndex.cc225
-rw-r--r--src/os/HashIndex.h67
-rw-r--r--src/os/IndexManager.cc6
-rw-r--r--src/os/JournalingObjectStore.cc15
-rw-r--r--src/os/LFNIndex.cc120
-rw-r--r--src/os/LFNIndex.h89
-rw-r--r--src/os/ObjectStore.cc13
-rw-r--r--src/os/ObjectStore.h21
-rw-r--r--src/osd/OSD.cc389
-rw-r--r--src/osd/OSD.h65
-rw-r--r--src/osd/PG.cc203
-rw-r--r--src/osd/PG.h16
-rw-r--r--src/osd/ReplicatedPG.cc9
-rw-r--r--src/osd/ReplicatedPG.h1
-rw-r--r--src/osd/osd_types.cc55
-rw-r--r--src/osd/osd_types.h12
-rw-r--r--src/test/cli/ceph-conf/env-vs-args.t2
-rw-r--r--src/test/filestore/store_test.cc81
34 files changed, 1464 insertions, 139 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 6408af0adf0..7a9759f8fdf 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -764,7 +764,7 @@ check_PROGRAMS += unittest_heartbeatmap
unittest_formatter_SOURCES = test/formatter.cc rgw/rgw_formats.cc
unittest_formatter_LDFLAGS = $(PTHREAD_CFLAGS) ${AM_LDFLAGS}
unittest_formatter_LDADD = ${UNITTEST_LDADD} $(LIBGLOBAL_LDA)
-unittest_formatter_CXXFLAGS = ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
+unittest_formatter_CXXFLAGS = ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
check_PROGRAMS += unittest_formatter
unittest_libcephfs_config_SOURCES = test/libcephfs_config.cc
diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc
index e6928fb8c0f..36aa58bd579 100644
--- a/src/ceph_mds.cc
+++ b/src/ceph_mds.cc
@@ -288,11 +288,13 @@ int main(int argc, const char **argv)
mds->orig_argv = argv;
if (shadow)
- mds->init(shadow);
+ r = mds->init(shadow);
else
- mds->init();
+ r = mds->init();
- messenger->wait();
+ if (r >= 0) {
+ messenger->wait();
+ }
unregister_async_signal_handler(SIGHUP, sighup_handler);
unregister_async_signal_handler(SIGINT, handle_mds_signal);
diff --git a/src/common/ConfUtils.cc b/src/common/ConfUtils.cc
index a9e5d7a42f5..147cdc2fb60 100644
--- a/src/common/ConfUtils.cc
+++ b/src/common/ConfUtils.cc
@@ -101,9 +101,6 @@ parse_file(const std::string &fname, std::deque<std::string> *errors,
FILE *fp = fopen(fname.c_str(), "r");
if (!fp) {
ret = -errno;
- ostringstream oss;
- oss << "read_conf: failed to open '" << fname << "': " << cpp_strerror(ret);
- errors->push_back(oss.str());
return ret;
}
diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h
index 72ec3e913e2..9c27ddc3860 100644
--- a/src/common/PrioritizedQueue.h
+++ b/src/common/PrioritizedQueue.h
@@ -47,14 +47,25 @@ class PrioritizedQueue {
int64_t total_priority;
template <class F>
- static unsigned filter_list_pairs(list<pair<unsigned, T> > *l, F f) {
+ static unsigned filter_list_pairs(
+ list<pair<unsigned, T> > *l, F f,
+ list<T> *out) {
unsigned ret = 0;
+ if (out) {
+ for (typename list<pair<unsigned, T> >::reverse_iterator i = l->rbegin();
+ i != l->rend();
+ ++i) {
+ if (f(i->second)) {
+ out->push_front(i->second);
+ }
+ }
+ }
for (typename list<pair<unsigned, T> >::iterator i = l->begin();
i != l->end();
- ) {
+ ) {
if (f(i->second)) {
l->erase(i++);
- ret++;
+ ++ret;
} else {
++i;
}
@@ -119,11 +130,11 @@ class PrioritizedQueue {
return q.empty();
}
template <class F>
- void remove_by_filter(F f) {
+ void remove_by_filter(F f, list<T> *out) {
for (typename map<K, list<pair<unsigned, T> > >::iterator i = q.begin();
i != q.end();
) {
- size -= filter_list_pairs(&(i->second), f);
+ size -= filter_list_pairs(&(i->second), f, out);
if (i->second.empty()) {
if (cur == i)
++cur;
@@ -205,13 +216,13 @@ public:
}
template <class F>
- void remove_by_filter(F f) {
+ void remove_by_filter(F f, list<T> *removed = 0) {
for (typename map<unsigned, SubQueue>::iterator i = queue.begin();
i != queue.end();
) {
unsigned priority = i->first;
- i->second.remove_by_filter(f);
+ i->second.remove_by_filter(f, removed);
if (i->second.empty()) {
++i;
remove_queue(priority);
@@ -222,7 +233,7 @@ public:
for (typename map<unsigned, SubQueue>::iterator i = high_queue.begin();
i != high_queue.end();
) {
- i->second.remove_by_filter(f);
+ i->second.remove_by_filter(f, removed);
if (i->second.empty()) {
high_queue.erase(i++);
} else {
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index d312ef28e0c..b04a28f3259 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -377,6 +377,10 @@ OPTION(osd_client_op_priority, OPT_INT, 63)
OPTION(osd_recovery_op_priority, OPT_INT, 10)
OPTION(filestore, OPT_BOOL, false)
+
+// Tests index failure paths
+OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0)
+
OPTION(filestore_debug_omap_check, OPT_BOOL, 0) // Expensive debugging check on sync
// Use omap for xattrs for attrs over
OPTION(filestore_xattr_use_omap, OPT_BOOL, false)
diff --git a/src/common/simple_cache.hpp b/src/common/simple_cache.hpp
index 7d062351888..60919fd9731 100644
--- a/src/common/simple_cache.hpp
+++ b/src/common/simple_cache.hpp
@@ -50,17 +50,16 @@ public:
pinned.insert(make_pair(key, val));
}
- void clear_pinned() {
+ void clear_pinned(K e) {
Mutex::Locker l(lock);
for (typename map<K, V>::iterator i = pinned.begin();
- i != pinned.end();
- ++i) {
+ i != pinned.end() && i->first <= e;
+ pinned.erase(i++)) {
if (!contents.count(i->first))
_add(i->first, i->second);
else
lru.splice(lru.begin(), lru, contents[i->first]);
}
- pinned.clear();
}
void set_size(size_t new_size) {
diff --git a/src/global/global_init.cc b/src/global/global_init.cc
index 97549bfb9d6..43ce0909565 100644
--- a/src/global/global_init.cc
+++ b/src/global/global_init.cc
@@ -82,7 +82,10 @@ void global_init(std::vector < const char * > *alt_def_args, std::vector < const
else if (ret == -EINVAL) {
if (!(flags & CINIT_FLAG_NO_DEFAULT_CONFIG_FILE)) {
if (conf_file_list.length()) {
- dout_emergency("global_init: unable to open config file.\n");
+ ostringstream oss;
+ oss << "global_init: unable to open config file from search list "
+ << conf_file_list << "\n";
+ dout_emergency(oss.str());
_exit(1);
} else {
derr <<"did not load config file, using default settings." << dendl;
@@ -104,9 +107,7 @@ void global_init(std::vector < const char * > *alt_def_args, std::vector < const
g_lockdep = cct->_conf->lockdep;
// Now we're ready to complain about config file parse errors
- if (conf_file_list.length()) {
- complain_about_parse_errors(cct, &parse_errors);
- }
+ complain_about_parse_errors(cct, &parse_errors);
// signal stuff
int siglist[] = { SIGPIPE, 0 };
diff --git a/src/include/Context.h b/src/include/Context.h
index f3da98c53d5..a64bb2de5f6 100644
--- a/src/include/Context.h
+++ b/src/include/Context.h
@@ -124,6 +124,7 @@ public:
void finish(int r) {
finish_contexts(cct, contexts, r);
}
+ bool empty() { return contexts.empty(); }
};
diff --git a/src/include/atomic.h b/src/include/atomic.h
index 0c22b269c7c..3ecbd287376 100644
--- a/src/include/atomic.h
+++ b/src/include/atomic.h
@@ -20,6 +20,8 @@
# include "acconfig.h"
#endif
+#include <stdlib.h>
+
#ifndef NO_ATOMIC_OPS
// libatomic_ops implementation
diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc
index d71405643f4..cbcda25a8dd 100644
--- a/src/mds/MDS.cc
+++ b/src/mds/MDS.cc
@@ -21,6 +21,7 @@
#include "common/Clock.h"
#include "common/signal.h"
#include "common/ceph_argparse.h"
+#include "common/errno.h"
#include "msg/Messenger.h"
@@ -459,9 +460,17 @@ int MDS::init(int wanted_state)
// tell monc about log_client so it will know about mon session resets
monc->set_log_client(&clog);
- monc->authenticate();
- monc->wait_auth_rotating(30.0);
-
+ int r = monc->authenticate();
+ if (r < 0) {
+ derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
+ mds_lock.Lock();
+ suicide();
+ mds_lock.Unlock();
+ return r;
+ }
+ while (monc->wait_auth_rotating(30.0) < 0) {
+ derr << "unable to obtain rotating service keys; retrying" << dendl;
+ }
objecter->init_unlocked();
mds_lock.Lock();
@@ -1562,7 +1571,8 @@ void MDS::suicide()
// shut down cache
mdcache->shutdown();
- objecter->shutdown_locked();
+ if (objecter->initialized)
+ objecter->shutdown_locked();
// shut down messenger
messenger->shutdown();
diff --git a/src/mds/events/EMetaBlob.h b/src/mds/events/EMetaBlob.h
index 9c281e9a048..116b70415c3 100644
--- a/src/mds/events/EMetaBlob.h
+++ b/src/mds/events/EMetaBlob.h
@@ -635,12 +635,12 @@ private:
dirty, complete, isnew);
}
dirlump& add_dir(dirfrag_t df, fnode_t *pf, version_t pv, bool dirty, bool complete=false, bool isnew=false) {
- if (lump_map.count(df) == 0) {
+ if (lump_map.count(df) == 0)
lump_order.push_back(df);
- lump_map[df].fnode = *pf;
- lump_map[df].fnode.version = pv;
- }
+
dirlump& l = lump_map[df];
+ l.fnode = *pf;
+ l.fnode.version = pv;
if (complete) l.mark_complete();
if (dirty) l.mark_dirty();
if (isnew) l.mark_new();
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index 3186c4d6bc3..476e5138b30 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -2696,7 +2696,7 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
return true;
}
} else if (m->cmd[2] == "set") {
- if (m->cmd.size() != 6) {
+ if (m->cmd.size() < 6) {
err = -EINVAL;
ss << "usage: osd pool set <poolname> <field> <value>";
goto out;
@@ -2740,12 +2740,12 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs, paxos->get_version()));
return true;
} else if (m->cmd[4] == "pg_num") {
- if (true) {
- // ** DISABLE THIS FOR NOW **
- ss << "pg_num adjustment currently disabled (broken implementation)";
- // ** DISABLE THIS FOR NOW **
- } else
- if (n <= p->get_pg_num()) {
+ if (m->cmd.size() < 6 ||
+ m->cmd[6] != "--allow-experimental-feature") {
+ ss << "increasing pg_num is currently experimental, add "
+ << "--allow-experimental-feature as the last argument "
+ << "to force";
+ } else if (n <= p->get_pg_num()) {
ss << "specified pg_num " << n << " <= current " << p->get_pg_num();
} else if (!mon->pgmon()->pg_map.creating_pgs.empty()) {
ss << "currently creating pgs, wait";
diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc
index b66ae895225..c679d415468 100644
--- a/src/mon/PGMonitor.cc
+++ b/src/mon/PGMonitor.cc
@@ -786,6 +786,10 @@ void PGMonitor::send_pg_creates()
if (pgid.preferred() >= 0)
continue;
+ // don't send creates for splits
+ if (s.parent_split_bits)
+ continue;
+
if (nrep) {
pg_map.creating_pgs_by_osd[acting[0]].insert(pgid);
} else {
diff --git a/src/os/CollectionIndex.h b/src/os/CollectionIndex.h
index d931a88b2d5..9b1ceae8c46 100644
--- a/src/os/CollectionIndex.h
+++ b/src/os/CollectionIndex.h
@@ -151,6 +151,20 @@ protected:
int *exist ///< [out] True if the object exists, else false
) = 0;
+ /**
+ * Moves objects matching <match> in the lsb <bits>
+ *
+ * dest and this must be the same subclass
+ *
+ * @return Error Code, 0 for success
+ */
+ virtual int split(
+ uint32_t match, //< [in] value to match
+ uint32_t bits, //< [in] bits to check
+ std::tr1::shared_ptr<CollectionIndex> dest //< [in] destination index
+ ) { assert(0); return 0; }
+
+
/// List contents of collection by hash
virtual int collection_list_partial(
const hobject_t &start, ///< [in] object at which to start
@@ -166,6 +180,9 @@ protected:
vector<hobject_t> *ls ///< [out] Listed Objects
) = 0;
+ /// Call prior to removing directory
+ virtual int prep_delete() { return 0; }
+
/// Virtual destructor
virtual ~CollectionIndex() {}
};
diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc
index 98ee811e586..8cb8720738e 100644
--- a/src/os/FileStore.cc
+++ b/src/os/FileStore.cc
@@ -2158,6 +2158,21 @@ unsigned FileStore::apply_transactions(list<Transaction*> &tls,
return r;
}
+void FileStore::_set_replay_guard(coll_t cid,
+ const SequencerPosition &spos,
+ bool in_progress=false)
+{
+ char fn[PATH_MAX];
+ get_cdir(cid, fn, sizeof(fn));
+ int fd = ::open(fn, O_RDONLY);
+ if (fd < 0) {
+ derr << "_set_replay_guard " << cid << " error " << fd << dendl;
+ assert(0 == "_set_replay_guard failed");
+ }
+ _set_replay_guard(fd, spos, 0, in_progress);
+ ::close(fd);
+}
+
void FileStore::_set_replay_guard(int fd,
const SequencerPosition& spos,
@@ -2199,6 +2214,20 @@ void FileStore::_set_replay_guard(int fd,
dout(10) << "_set_replay_guard " << spos << " done" << dendl;
}
+void FileStore::_close_replay_guard(coll_t cid,
+ const SequencerPosition &spos)
+{
+ char fn[PATH_MAX];
+ get_cdir(cid, fn, sizeof(fn));
+ int fd = ::open(fn, O_RDONLY);
+ if (fd < 0) {
+ derr << "_set_replay_guard " << cid << " error " << fd << dendl;
+ assert(0 == "_close_replay_guard failed");
+ }
+ _close_replay_guard(fd, spos);
+ ::close(fd);
+}
+
void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos)
{
if (btrfs_stable_commits)
@@ -2227,7 +2256,6 @@ void FileStore::_close_replay_guard(int fd, const SequencerPosition& spos)
dout(10) << "_close_replay_guard " << spos << " done" << dendl;
}
-
int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPosition& spos)
{
if (!replaying || btrfs_stable_commits)
@@ -2575,6 +2603,15 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
r = _omap_setheader(cid, oid, bl, spos);
}
break;
+ case Transaction::OP_SPLIT_COLLECTION:
+ {
+ coll_t cid(i.get_cid());
+ uint32_t bits(i.get_u32());
+ uint32_t rem(i.get_u32());
+ coll_t dest(i.get_cid());
+ r = _split_collection(cid, bits, rem, dest, spos);
+ }
+ break;
default:
derr << "bad op " << op << dendl;
@@ -4433,6 +4470,15 @@ int FileStore::_create_collection(coll_t c)
int FileStore::_destroy_collection(coll_t c)
{
+ {
+ Index from;
+ int r = get_index(c, &from);
+ if (r < 0)
+ return r;
+ r = from->prep_delete();
+ if (r < 0)
+ return r;
+ }
char fn[PATH_MAX];
get_cdir(c, fn, sizeof(fn));
dout(15) << "_destroy_collection " << fn << dendl;
@@ -4555,6 +4601,42 @@ int FileStore::_omap_setheader(coll_t cid, const hobject_t &hoid,
return object_map->set_header(hoid, bl, &spos);
}
+int FileStore::_split_collection(coll_t cid,
+ uint32_t bits,
+ uint32_t rem,
+ coll_t dest,
+ const SequencerPosition &spos)
+{
+ dout(15) << __func__ << " " << cid << " bits: " << bits << dendl;
+ int r = _create_collection(dest);
+ if (r < 0 && !(r == -EEXIST && replaying))
+ return r;
+
+ int dstcmp = _check_replay_guard(cid, spos);
+ if (dstcmp < 0)
+ return 0;
+
+ int srccmp = _check_replay_guard(dest, spos);
+ if (srccmp < 0)
+ return 0;
+
+ _set_replay_guard(cid, spos, true);
+ _set_replay_guard(dest, spos, true);
+
+ Index from;
+ r = get_index(cid, &from);
+
+ Index to;
+ if (!r)
+ r = get_index(dest, &to);
+
+ if (!r)
+ r = from->split(rem, bits, to);
+
+ _close_replay_guard(cid, spos);
+ _close_replay_guard(dest, spos);
+ return r;
+}
const char** FileStore::get_tracked_conf_keys() const
{
diff --git a/src/os/FileStore.h b/src/os/FileStore.h
index f18e1f88269..0281e94d634 100644
--- a/src/os/FileStore.h
+++ b/src/os/FileStore.h
@@ -330,9 +330,13 @@ public:
const SequencerPosition& spos,
const hobject_t *hoid=0,
bool in_progress=false);
+ void _set_replay_guard(coll_t cid,
+ const SequencerPosition& spos,
+ bool in_progress);
/// close a replay guard opened with in_progress=true
void _close_replay_guard(int fd, const SequencerPosition& spos);
+ void _close_replay_guard(coll_t cid, const SequencerPosition& spos);
/**
* check replay guard xattr on given file
@@ -466,6 +470,8 @@ private:
const SequencerPosition &spos);
int _omap_setheader(coll_t cid, const hobject_t &hoid, const bufferlist &bl,
const SequencerPosition &spos);
+ int _split_collection(coll_t cid, uint32_t bits, uint32_t rem, coll_t dest,
+ const SequencerPosition &spos);
virtual const char** get_tracked_conf_keys() const;
virtual void handle_conf_change(const struct md_config_t *conf,
diff --git a/src/os/HashIndex.cc b/src/os/HashIndex.cc
index 8948c506411..d0d155c8d18 100644
--- a/src/os/HashIndex.cc
+++ b/src/os/HashIndex.cc
@@ -42,10 +42,200 @@ int HashIndex::cleanup() {
return complete_split(in_progress.path, info);
else if (in_progress.is_merge())
return complete_merge(in_progress.path, info);
+ else if (in_progress.is_col_split()) {
+ for (vector<string>::iterator i = in_progress.path.begin();
+ i != in_progress.path.end();
+ ++i) {
+ vector<string> path(in_progress.path.begin(), i);
+ int r = reset_attr(path);
+ if (r < 0)
+ return r;
+ }
+ return 0;
+ }
else
return -EINVAL;
}
+int HashIndex::reset_attr(
+ const vector<string> &path)
+{
+ int exists = 0;
+ int r = path_exists(path, &exists);
+ if (r < 0)
+ return r;
+ if (!exists)
+ return 0;
+ map<string, hobject_t> objects;
+ set<string> subdirs;
+ r = list_objects(path, 0, 0, &objects);
+ if (r < 0)
+ return r;
+ r = list_subdirs(path, &subdirs);
+ if (r < 0)
+ return r;
+
+ subdir_info_s info;
+ info.hash_level = path.size();
+ info.objs = objects.size();
+ info.subdirs = subdirs.size();
+ return set_info(path, info);
+}
+
+int HashIndex::col_split_level(
+ HashIndex &from,
+ HashIndex &to,
+ const vector<string> &path,
+ uint32_t inbits,
+ uint32_t match,
+ unsigned *mkdirred)
+{
+ /* For each subdir, move, recurse, or ignore based on comparing the low order
+ * bits of the hash represented by the subdir path with inbits, match passed
+ * in.
+ */
+ set<string> subdirs;
+ int r = from.list_subdirs(path, &subdirs);
+ if (r < 0)
+ return r;
+ map<string, hobject_t> objects;
+ r = from.list_objects(path, 0, 0, &objects);
+ if (r < 0)
+ return r;
+
+ set<string> to_move;
+ for (set<string>::iterator i = subdirs.begin();
+ i != subdirs.end();
+ ++i) {
+ uint32_t bits = 0;
+ uint32_t hash = 0;
+ vector<string> sub_path(path.begin(), path.end());
+ sub_path.push_back(*i);
+ path_to_hobject_hash_prefix(sub_path, &bits, &hash);
+ if (bits < inbits) {
+ if ((match & ~((~0)<<bits)) == (hash & ~((~0)<<bits))) {
+ r = col_split_level(
+ from,
+ to,
+ sub_path,
+ inbits,
+ match,
+ mkdirred);
+ if (r < 0)
+ return r;
+ if (*mkdirred > path.size())
+ *mkdirred = path.size();
+ } // else, skip, doesn't need to be moved or recursed into
+ } else {
+ if ((match & ~((~0)<<inbits)) == (hash & ~((~0)<<inbits))) {
+ to_move.insert(*i);
+ }
+ } // else, skip, doesn't need to be moved or recursed into
+ }
+
+ /* Then, do the same for each object */
+ map<string, hobject_t> objs_to_move;
+ for (map<string, hobject_t>::iterator i = objects.begin();
+ i != objects.end();
+ ++i) {
+ if ((i->second.hash & ~((~0)<<inbits)) == match) {
+ objs_to_move.insert(*i);
+ }
+ }
+
+ if (objs_to_move.empty() && to_move.empty())
+ return 0;
+
+ // Make parent directories as needed
+ while (*mkdirred < path.size()) {
+ ++*mkdirred;
+ int exists = 0;
+ vector<string> creating_path(path.begin(), path.begin()+*mkdirred);
+ r = to.path_exists(creating_path, &exists);
+ if (r < 0)
+ return r;
+ if (exists)
+ continue;
+ subdir_info_s info;
+ info.objs = 0;
+ info.subdirs = 0;
+ info.hash_level = creating_path.size();
+ if (*mkdirred < path.size() - 1)
+ info.subdirs = 1;
+ r = to.start_col_split(creating_path);
+ if (r < 0)
+ return r;
+ r = to.create_path(creating_path);
+ if (r < 0)
+ return r;
+ r = to.set_info(creating_path, info);
+ if (r < 0)
+ return r;
+ r = to.end_split_or_merge(creating_path);
+ if (r < 0)
+ return r;
+ }
+
+ subdir_info_s from_info;
+ subdir_info_s to_info;
+ r = from.get_info(path, &from_info);
+ if (r < 0)
+ return r;
+ r = to.get_info(path, &to_info);
+ if (r < 0)
+ return r;
+
+ from.start_col_split(path);
+ to.start_col_split(path);
+
+ // Do subdir moves
+ for (set<string>::iterator i = to_move.begin();
+ i != to_move.end();
+ ++i) {
+ from_info.subdirs--;
+ to_info.subdirs++;
+ r = move_subdir(from, to, path, *i);
+ if (r < 0)
+ return r;
+ }
+
+ for (map<string, hobject_t>::iterator i = objs_to_move.begin();
+ i != objs_to_move.end();
+ ++i) {
+ from_info.objs--;
+ to_info.objs++;
+ r = move_object(from, to, path, *i);
+ if (r < 0)
+ return r;
+ }
+
+
+ r = to.set_info(path, to_info);
+ if (r < 0)
+ return r;
+ r = from.set_info(path, from_info);
+ if (r < 0)
+ return r;
+ from.end_split_or_merge(path);
+ to.end_split_or_merge(path);
+ return 0;
+}
+
+int HashIndex::_split(
+ uint32_t match,
+ uint32_t bits,
+ std::tr1::shared_ptr<CollectionIndex> dest) {
+ assert(collection_version() == dest->collection_version());
+ unsigned mkdirred = 0;
+ return col_split_level(
+ *this,
+ *static_cast<HashIndex*>(dest.get()),
+ vector<string>(),
+ bits,
+ match,
+ &mkdirred);
+}
+
int HashIndex::_init() {
subdir_info_s info;
vector<string> path;
@@ -143,6 +333,41 @@ int HashIndex::_collection_list_partial(const hobject_t &start,
return list_by_hash(path, min_count, max_count, seq, next, ls);
}
+int HashIndex::prep_delete() {
+ return recursive_remove(vector<string>());
+}
+
+int HashIndex::recursive_remove(const vector<string> &path) {
+ set<string> subdirs;
+ int r = list_subdirs(path, &subdirs);
+ if (r < 0)
+ return r;
+ map<string, hobject_t> objects;
+ r = list_objects(path, 0, 0, &objects);
+ if (r < 0)
+ return r;
+ if (objects.size())
+ return -ENOTEMPTY;
+ vector<string> subdir(path);
+ for (set<string>::iterator i = subdirs.begin();
+ i != subdirs.end();
+ ++i) {
+ subdir.push_back(*i);
+ r = recursive_remove(subdir);
+ if (r < 0)
+ return r;
+ subdir.pop_back();
+ }
+ return remove_path(path);
+}
+
+int HashIndex::start_col_split(const vector<string> &path) {
+ bufferlist bl;
+ InProgressOp op_tag(InProgressOp::COL_SPLIT, path);
+ op_tag.encode(bl);
+ return add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl);
+}
+
int HashIndex::start_split(const vector<string> &path) {
bufferlist bl;
InProgressOp op_tag(InProgressOp::SPLIT, path);
diff --git a/src/os/HashIndex.h b/src/os/HashIndex.h
index 41c4abef20c..fcabd9f7198 100644
--- a/src/os/HashIndex.h
+++ b/src/os/HashIndex.h
@@ -97,6 +97,7 @@ private:
struct InProgressOp {
static const int SPLIT = 0;
static const int MERGE = 1;
+ static const int COL_SPLIT = 2;
int op;
vector<string> path;
@@ -108,6 +109,7 @@ private:
}
bool is_split() const { return op == SPLIT; }
+ bool is_col_split() const { return op == COL_SPLIT; }
bool is_merge() const { return op == MERGE; }
void encode(bufferlist &bl) const {
@@ -134,8 +136,10 @@ public:
const char *base_path, ///< [in] Path to the index root.
int merge_at, ///< [in] Merge threshhold.
int split_multiple, ///< [in] Split threshhold.
- uint32_t index_version)///< [in] Index version
- : LFNIndex(collection, base_path, index_version), merge_threshold(merge_at),
+ uint32_t index_version,///< [in] Index version
+ double retry_probability=0) ///< [in] retry probability
+ : LFNIndex(collection, base_path, index_version, retry_probability),
+ merge_threshold(merge_at),
split_multiplier(split_multiple) {}
/// @see CollectionIndex
@@ -143,6 +147,16 @@ public:
/// @see CollectionIndex
int cleanup();
+
+ /// @see CollectionIndex
+ int prep_delete();
+
+ /// @see CollectionIndex
+ int _split(
+ uint32_t match,
+ uint32_t bits,
+ std::tr1::shared_ptr<CollectionIndex> dest
+ );
protected:
int _init();
@@ -175,6 +189,14 @@ protected:
hobject_t *next
);
private:
+ /// Recursively remove path and its subdirs
+ int recursive_remove(
+ const vector<string> &path ///< [in] path to remove
+ ); /// @return Error Code, 0 on success
+ /// Tag root directory at beginning of col_split
+ int start_col_split(
+ const vector<string> &path ///< [in] path to split
+ ); ///< @return Error Code, 0 on success
/// Tag root directory at beginning of split
int start_split(
const vector<string> &path ///< [in] path to split
@@ -221,6 +243,11 @@ private:
subdir_info_s info ///< [in] Info attached to path
); /// @return Error Code, 0 on success
+ /// Resets attr to match actual subdir contents
+ int reset_attr(
+ const vector<string> &path ///< [in] path to cleanup
+ );
+
/// Initiate Split
int initiate_split(
const vector<string> &path, ///< [in] Subdir to split
@@ -239,25 +266,55 @@ private:
vector<string> *path ///< [out] Path components for hoid.
);
+ /// do collection split for path
+ static int col_split_level(
+ HashIndex &from, ///< [in] from index
+ HashIndex &dest, ///< [in] to index
+ const vector<string> &path, ///< [in] path to split
+ uint32_t bits, ///< [in] num bits to match
+ uint32_t match, ///< [in] bits to match
+ unsigned *mkdirred ///< [in,out] path[:mkdirred] has been mkdirred
+ );
+
+
/**
* Get string representation of hobject_t/hash
*
* e.g: 0x01234567 -> "76543210"
*/
- string get_path_str(
+ static string get_path_str(
const hobject_t &hoid ///< [in] Object to get hash string for
); ///< @return Hash string for hoid.
/// Get string from hash, @see get_path_str
- string get_hash_str(
+ static string get_hash_str(
uint32_t hash ///< [in] Hash to convert to a string.
); ///< @return String representation of hash
/// Get hash from hash prefix string e.g. "FFFFAB" -> 0xFFFFAB00
- uint32_t hash_prefix_to_hash(
+ static uint32_t hash_prefix_to_hash(
string prefix ///< [in] string to convert
); ///< @return Hash
+ /// Get hash mod from path
+ static void path_to_hobject_hash_prefix(
+ const vector<string> &path,///< [in] path to convert
+ uint32_t *bits, ///< [out] bits
+ uint32_t *hash ///< [out] hash
+ ) {
+ string hash_str;
+ for (vector<string>::const_iterator i = path.begin();
+ i != path.end();
+ ++i) {
+ hash_str.push_back(*i->begin());
+ }
+ uint32_t rev_hash = hash_prefix_to_hash(hash_str);
+ if (hash)
+ *hash = rev_hash;
+ if (bits)
+ *bits = path.size() * 4;
+ }
+
/// Get path contents by hash
int get_path_contents_by_hash(
const vector<string> &path, /// [in] Path to list
diff --git a/src/os/IndexManager.cc b/src/os/IndexManager.cc
index 85281c4d926..11bf5c18172 100644
--- a/src/os/IndexManager.cc
+++ b/src/os/IndexManager.cc
@@ -75,7 +75,8 @@ int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
return r;
HashIndex index(c, path, g_conf->filestore_merge_threshold,
g_conf->filestore_split_multiple,
- CollectionIndex::HASH_INDEX_TAG_2);
+ CollectionIndex::HASH_INDEX_TAG_2,
+ g_conf->filestore_index_retry_probability);
return index.init();
}
@@ -110,7 +111,8 @@ int IndexManager::build_index(coll_t c, const char *path, Index *index) {
// No need to check
*index = Index(new HashIndex(c, path, g_conf->filestore_merge_threshold,
g_conf->filestore_split_multiple,
- CollectionIndex::HOBJECT_WITH_POOL),
+ CollectionIndex::HOBJECT_WITH_POOL,
+ g_conf->filestore_index_retry_probability),
RemoveOnDelete(c, this));
return 0;
}
diff --git a/src/os/JournalingObjectStore.cc b/src/os/JournalingObjectStore.cc
index b91b6dc207c..971fd15b824 100644
--- a/src/os/JournalingObjectStore.cc
+++ b/src/os/JournalingObjectStore.cc
@@ -109,6 +109,11 @@ int JournalingObjectStore::journal_replay(uint64_t fs_op_seq)
uint64_t JournalingObjectStore::ApplyManager::op_apply_start(uint64_t op)
{
Mutex::Locker l(apply_lock);
+ while (blocked) {
+ // note: this only happens during journal replay
+ dout(10) << "op_apply_start blocked, waiting" << dendl;
+ blocked_cond.Wait(apply_lock);
+ }
dout(10) << "op_apply_start " << op << " open_ops " << open_ops << " -> " << (open_ops+1) << dendl;
assert(!blocked);
assert(op > committed_seq);
@@ -126,6 +131,11 @@ void JournalingObjectStore::ApplyManager::op_apply_finish(uint64_t op)
--open_ops;
assert(open_ops >= 0);
+ // signal a blocked commit_start (only needed during journal replay)
+ if (blocked) {
+ blocked_cond.Signal();
+ }
+
// there can be multiple applies in flight; track the max value we
// note. note that we can't _read_ this value and learn anything
// meaningful unless/until we've quiesced all in-flight applies.
@@ -173,6 +183,10 @@ bool JournalingObjectStore::ApplyManager::commit_start()
<< ", open_ops " << open_ops
<< dendl;
blocked = true;
+ while (open_ops > 0) {
+ dout(10) << "commit_start waiting for " << open_ops << " open ops to drain" << dendl;
+ blocked_cond.Wait(apply_lock);
+ }
assert(open_ops == 0);
dout(10) << "commit_start blocked, all open_ops have completed" << dendl;
{
@@ -204,6 +218,7 @@ void JournalingObjectStore::ApplyManager::commit_started()
// allow new ops. (underlying fs should now be committing all prior ops)
dout(10) << "commit_started committing " << committing_seq << ", unblocking" << dendl;
blocked = false;
+ blocked_cond.Signal();
}
void JournalingObjectStore::ApplyManager::commit_finish()
diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc
index fc4a0d223e6..5e505638d15 100644
--- a/src/os/LFNIndex.cc
+++ b/src/os/LFNIndex.cc
@@ -47,6 +47,18 @@ const string LFNIndex::FILENAME_COOKIE = "long";
const int LFNIndex::FILENAME_PREFIX_LEN = FILENAME_SHORT_LEN - FILENAME_HASH_LEN -
FILENAME_COOKIE.size() -
FILENAME_EXTRA;
+void LFNIndex::maybe_inject_failure() {
+ if (error_injection_enabled) {
+ if (current_failure > last_failure &&
+ (((double)(rand() % 10000))/((double)(10000))
+ < error_injection_probability)) {
+ last_failure = current_failure;
+ current_failure = 0;
+ throw RetryException();
+ }
+ ++current_failure;
+ }
+}
/* Public methods */
@@ -72,41 +84,47 @@ int LFNIndex::created(const hobject_t &hoid, const char *path) {
}
int LFNIndex::unlink(const hobject_t &hoid) {
+ WRAP_RETRY(
vector<string> path;
string short_name;
- int r;
r = _lookup(hoid, &path, &short_name, NULL);
- if (r < 0)
- return r;
+ if (r < 0) {
+ goto out;
+ }
r = _remove(path, hoid, short_name);
- if (r < 0)
- return r;
- return 0;
+ if (r < 0) {
+ goto out;
+ }
+ );
}
int LFNIndex::lookup(const hobject_t &hoid,
IndexedPath *out_path,
int *exist) {
+ WRAP_RETRY(
vector<string> path;
string short_name;
- int r;
r = _lookup(hoid, &path, &short_name, exist);
if (r < 0)
- return r;
+ goto out;
string full_path = get_full_path(path, short_name);
struct stat buf;
+ maybe_inject_failure();
r = ::stat(full_path.c_str(), &buf);
+ maybe_inject_failure();
if (r < 0) {
if (errno == ENOENT) {
*exist = 0;
} else {
- return -errno;
+ r = -errno;
+ goto out;
}
} else {
*exist = 1;
}
*out_path = IndexedPath(new Path(full_path, self_ref));
- return 0;
+ r = 0;
+ );
}
int LFNIndex::collection_list(vector<hobject_t> *ls) {
@@ -126,11 +144,14 @@ int LFNIndex::collection_list_partial(const hobject_t &start,
/* Derived class utility methods */
int LFNIndex::fsync_dir(const vector<string> &path) {
+ maybe_inject_failure();
int fd = ::open(get_full_path_subdir(path).c_str(), O_RDONLY);
if (fd < 0)
return -errno;
+ maybe_inject_failure();
int r = ::fsync(fd);
TEMP_FAILURE_RETRY(::close(fd));
+ maybe_inject_failure();
if (r < 0)
return -errno;
else
@@ -144,10 +165,13 @@ int LFNIndex::link_object(const vector<string> &from,
int r;
string from_path = get_full_path(from, from_short_name);
string to_path;
+ maybe_inject_failure();
r = lfn_get_name(to, hoid, 0, &to_path, 0);
if (r < 0)
return r;
+ maybe_inject_failure();
r = ::link(from_path.c_str(), to_path.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
else
@@ -162,7 +186,9 @@ int LFNIndex::remove_objects(const vector<string> &dir,
to_clean != to_remove.end();
++to_clean) {
if (!lfn_is_hashed_filename(to_clean->first)) {
+ maybe_inject_failure();
int r = ::unlink(get_full_path(dir, to_clean->first).c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
continue;
@@ -189,14 +215,18 @@ int LFNIndex::remove_objects(const vector<string> &dir,
if (candidate == chain.rend() || *i > candidate->first) {
string remove_path_name =
get_full_path(dir, lfn_get_short_name(to_clean->second, *i));
+ maybe_inject_failure();
int r = ::unlink(remove_path_name.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
continue;
}
string from = get_full_path(dir, candidate->second.first);
string to = get_full_path(dir, lfn_get_short_name(candidate->second.second, *i));
+ maybe_inject_failure();
int r = ::rename(from.c_str(), to.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
remaining->erase(candidate->second.first);
@@ -226,10 +256,13 @@ int LFNIndex::move_objects(const vector<string> &from,
r = lfn_get_name(to, i->second, &to_name, &to_path, 0);
if (r < 0)
return r;
+ maybe_inject_failure();
r = ::link(from_path.c_str(), to_path.c_str());
if (r < 0 && errno != EEXIST)
return -errno;
+ maybe_inject_failure();
r = lfn_created(to, i->second, to_name);
+ maybe_inject_failure();
if (r < 0)
return r;
}
@@ -239,7 +272,9 @@ int LFNIndex::move_objects(const vector<string> &from,
for (map<string,hobject_t>::iterator i = to_move.begin();
i != to_move.end();
++i) {
+ maybe_inject_failure();
r = ::unlink(get_full_path(from, i->first).c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
}
@@ -250,7 +285,9 @@ int LFNIndex::remove_object(const vector<string> &from,
const hobject_t &hoid) {
string short_name;
int r, exist;
+ maybe_inject_failure();
r = get_mangled_name(from, hoid, &short_name, &exist);
+ maybe_inject_failure();
if (r < 0)
return r;
return lfn_unlink(from, hoid, short_name);
@@ -262,6 +299,53 @@ int LFNIndex::get_mangled_name(const vector<string> &from,
return lfn_get_name(from, hoid, mangled_name, 0, exists);
}
+int LFNIndex::move_subdir(
+ LFNIndex &from,
+ LFNIndex &dest,
+ const vector<string> &path,
+ string dir
+ ) {
+ vector<string> sub_path(path.begin(), path.end());
+ sub_path.push_back(dir);
+ string from_path(from.get_full_path_subdir(sub_path));
+ string to_path(dest.get_full_path_subdir(sub_path));
+ int r = ::rename(from_path.c_str(), to_path.c_str());
+ if (r < 0)
+ return -errno;
+ return 0;
+}
+
+int LFNIndex::move_object(
+ LFNIndex &from,
+ LFNIndex &dest,
+ const vector<string> &path,
+ const pair<string, hobject_t> &obj
+ ) {
+ string from_path(from.get_full_path(path, obj.first));
+ string to_path;
+ string to_name;
+ int exists;
+ int r = dest.lfn_get_name(path, obj.second, &to_name, &to_path, &exists);
+ if (r < 0)
+ return r;
+ if (!exists) {
+ r = ::link(from_path.c_str(), to_path.c_str());
+ if (r < 0)
+ return r;
+ }
+ r = dest.lfn_created(path, obj.second, to_name);
+ if (r < 0)
+ return r;
+ r = dest.fsync_dir(path);
+ if (r < 0)
+ return r;
+ r = from.remove_object(path, obj.second);
+ if (r < 0)
+ return r;
+ return from.fsync_dir(path);
+}
+
+
static int get_hobject_from_oinfo(const char *dir, const char *file,
hobject_t *o) {
char path[PATH_MAX];
@@ -365,7 +449,9 @@ int LFNIndex::list_subdirs(const vector<string> &to_list,
}
int LFNIndex::create_path(const vector<string> &to_create) {
+ maybe_inject_failure();
int r = ::mkdir(get_full_path_subdir(to_create).c_str(), 0777);
+ maybe_inject_failure();
if (r < 0)
return -errno;
else
@@ -373,7 +459,9 @@ int LFNIndex::create_path(const vector<string> &to_create) {
}
int LFNIndex::remove_path(const vector<string> &to_remove) {
+ maybe_inject_failure();
int r = ::rmdir(get_full_path_subdir(to_remove).c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
else
@@ -401,6 +489,7 @@ int LFNIndex::add_attr_path(const vector<string> &path,
const string &attr_name,
bufferlist &attr_value) {
string full_path = get_full_path_subdir(path);
+ maybe_inject_failure();
return chain_setxattr(full_path.c_str(), mangle_attr_name(attr_name).c_str(),
reinterpret_cast<void *>(attr_value.c_str()),
attr_value.length());
@@ -436,6 +525,7 @@ int LFNIndex::remove_attr_path(const vector<string> &path,
const string &attr_name) {
string full_path = get_full_path_subdir(path);
string mangled_attr_name = mangle_attr_name(attr_name);
+ maybe_inject_failure();
return chain_removexattr(full_path.c_str(), mangled_attr_name.c_str());
}
@@ -590,6 +680,7 @@ int LFNIndex::lfn_get_name(const vector<string> &path,
if (exists) {
struct stat buf;
string full_path = get_full_path(path, full_name);
+ maybe_inject_failure();
r = ::stat(full_path.c_str(), &buf);
if (r < 0) {
if (errno == ENOENT)
@@ -616,7 +707,9 @@ int LFNIndex::lfn_get_name(const vector<string> &path,
return -errno;
if (errno == ENODATA) {
// Left over from incomplete transaction, it'll be replayed
+ maybe_inject_failure();
r = ::unlink(candidate_path.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
}
@@ -651,6 +744,7 @@ int LFNIndex::lfn_created(const vector<string> &path,
return 0;
string full_path = get_full_path(path, mangled_name);
string full_name = lfn_generate_object_name(hoid);
+ maybe_inject_failure();
return chain_setxattr(full_path.c_str(), get_lfn_attr().c_str(),
full_name.c_str(), full_name.size());
}
@@ -660,7 +754,9 @@ int LFNIndex::lfn_unlink(const vector<string> &path,
const string &mangled_name) {
if (!lfn_is_hashed_filename(mangled_name)) {
string full_path = get_full_path(path, mangled_name);
+ maybe_inject_failure();
int r = ::unlink(full_path.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
return 0;
@@ -691,7 +787,9 @@ int LFNIndex::lfn_unlink(const vector<string> &path,
}
if (i == removed_index + 1) {
string full_path = get_full_path(path, mangled_name);
+ maybe_inject_failure();
int r = ::unlink(full_path.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
else
@@ -699,7 +797,9 @@ int LFNIndex::lfn_unlink(const vector<string> &path,
} else {
string rename_to = get_full_path(path, mangled_name);
string rename_from = get_full_path(path, lfn_get_short_name(hoid, i - 1));
+ maybe_inject_failure();
int r = ::rename(rename_from.c_str(), rename_to.c_str());
+ maybe_inject_failure();
if (r < 0)
return -errno;
else
diff --git a/src/os/LFNIndex.h b/src/os/LFNIndex.h
index f703544cb13..b3c05358822 100644
--- a/src/os/LFNIndex.h
+++ b/src/os/LFNIndex.h
@@ -21,6 +21,7 @@
#include <set>
#include <vector>
#include <tr1/memory>
+#include <exception>
#include "osd/osd_types.h"
#include "include/object.h"
@@ -48,6 +49,31 @@
* Unless otherwise noted, methods which return an int return 0 on sucess
* and a negative error code on failure.
*/
+#define WRAP_RETRY(x) { \
+ bool failed = false; \
+ int r = 0; \
+ init_inject_failure(); \
+ while (1) { \
+ try { \
+ if (failed) { \
+ r = cleanup(); \
+ assert(r == 0); \
+ } \
+ { x } \
+ out: \
+ complete_inject_failure(); \
+ return r; \
+ } catch (RetryException) { \
+ failed = true; \
+ } catch (...) { \
+ assert(0); \
+ } \
+ } \
+ return -1; \
+ } \
+
+
+
class LFNIndex : public CollectionIndex {
/// Hash digest output size.
static const int FILENAME_LFN_DIGEST_SIZE = CEPH_CRYPTO_SHA1_DIGESTSIZE;
@@ -78,6 +104,24 @@ class LFNIndex : public CollectionIndex {
protected:
const uint32_t index_version;
+ /// true if retry injection is enabled
+ struct RetryException : public exception {};
+ bool error_injection_enabled;
+ bool error_injection_on;
+ double error_injection_probability;
+ uint64_t last_failure;
+ uint64_t current_failure;
+ void init_inject_failure() {
+ if (error_injection_on) {
+ error_injection_enabled = true;
+ last_failure = current_failure = 0;
+ }
+ }
+ void maybe_inject_failure();
+ void complete_inject_failure() {
+ error_injection_enabled = false;
+ }
+
private:
string lfn_attribute;
coll_t collection;
@@ -87,8 +131,14 @@ public:
LFNIndex(
coll_t collection,
const char *base_path, ///< [in] path to Index root
- uint32_t index_version)
- : base_path(base_path), index_version(index_version),
+ uint32_t index_version,
+ double _error_injection_probability=0)
+ : base_path(base_path),
+ index_version(index_version),
+ error_injection_enabled(false),
+ error_injection_on(_error_injection_probability != 0),
+ error_injection_probability(_error_injection_probability),
+ last_failure(0), current_failure(0),
collection(collection) {
if (index_version == HASH_INDEX_TAG) {
lfn_attribute = LFN_ATTR;
@@ -146,6 +196,25 @@ public:
hobject_t *next
);
+ virtual int _split(
+ uint32_t match, //< [in] value to match
+ uint32_t bits, //< [in] bits to check
+ std::tr1::shared_ptr<CollectionIndex> dest //< [in] destination index
+ ) = 0;
+
+ /// @see CollectionIndex
+ int split(
+ uint32_t match,
+ uint32_t bits,
+ std::tr1::shared_ptr<CollectionIndex> dest
+ ) {
+ WRAP_RETRY(
+ r = _split(match, bits, dest);
+ goto out;
+ );
+ }
+
+
protected:
virtual int _init() = 0;
@@ -270,6 +339,22 @@ protected:
int *exists ///< [out] 1 if the file exists, else 0
);
+ /// do move subdir from from to dest
+ static int move_subdir(
+ LFNIndex &from, ///< [in] from index
+ LFNIndex &dest, ///< [in] to index
+ const vector<string> &path, ///< [in] path containing dir
+ string dir ///< [in] dir to move
+ );
+
+ /// do move object from from to dest
+ static int move_object(
+ LFNIndex &from, ///< [in] from index
+ LFNIndex &dest, ///< [in] to index
+ const vector<string> &path, ///< [in] path to split
+ const pair<string, hobject_t> &obj ///< [in] obj to move
+ );
+
/**
* Lists objects in to_list.
*
diff --git a/src/os/ObjectStore.cc b/src/os/ObjectStore.cc
index 214f7b2bc6b..dacc5440308 100644
--- a/src/os/ObjectStore.cc
+++ b/src/os/ObjectStore.cc
@@ -355,6 +355,19 @@ void ObjectStore::Transaction::dump(ceph::Formatter *f)
}
break;
+ case Transaction::OP_SPLIT_COLLECTION:
+ {
+ coll_t cid(i.get_cid());
+ uint32_t bits(i.get_u32());
+ uint32_t rem(i.get_u32());
+ coll_t dest(i.get_cid());
+ f->dump_string("op_name", "op_split_collection");
+ f->dump_stream("collection") << cid;
+ f->dump_stream("bits") << bits;
+ f->dump_stream("rem") << rem;
+ f->dump_stream("dest") << dest;
+ }
+
default:
f->dump_string("op_name", "unknown");
f->dump_unsigned("op_code", op);
diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h
index 439897f273a..a1cb21c0326 100644
--- a/src/os/ObjectStore.h
+++ b/src/os/ObjectStore.h
@@ -152,6 +152,7 @@ public:
OP_OMAP_SETKEYS = 32, // cid, attrset
OP_OMAP_RMKEYS = 33, // cid, keyset
OP_OMAP_SETHEADER = 34, // cid, header
+ OP_SPLIT_COLLECTION = 35, // cid, bits, destination
};
private:
@@ -295,6 +296,11 @@ public:
void get_keyset(set<string> &keys) {
::decode(keys, p);
}
+ uint32_t get_u32() {
+ uint32_t bits;
+ ::decode(bits, p);
+ return bits;
+ }
};
iterator begin() {
@@ -543,6 +549,21 @@ public:
ops++;
}
+ /// Split collection based on given prefixes
+ void split_collection(
+ coll_t cid,
+ uint32_t bits,
+ uint32_t rem,
+ coll_t destination) {
+ __u32 op = OP_SPLIT_COLLECTION;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(bits, tbl);
+ ::encode(rem, tbl);
+ ::encode(destination, tbl);
+ ++ops;
+ }
+
// etc.
Transaction() :
ops(0), pad_unused_bytes(0), largest_data_len(0), largest_data_off(0), largest_data_off_in_tbl(0),
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index e7a1022d7ab..92092969ea0 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -175,9 +175,57 @@ OSDService::OSDService(OSD *osd) :
map_cache_lock("OSDService::map_lock"),
map_cache(g_conf->osd_map_cache_size),
map_bl_cache(g_conf->osd_map_cache_size),
- map_bl_inc_cache(g_conf->osd_map_cache_size)
+ map_bl_inc_cache(g_conf->osd_map_cache_size),
+ in_progress_split_lock("OSDService::in_progress_split_lock")
{}
+void OSDService::_start_split(const set<pg_t> &pgs)
+{
+ for (set<pg_t>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ assert(!in_progress_splits.count(*i));
+ in_progress_splits.insert(*i);
+ }
+}
+
+void OSDService::expand_pg_num(OSDMapRef old_map,
+ OSDMapRef new_map)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ set<pg_t> children;
+ for (set<pg_t>::iterator i = in_progress_splits.begin();
+ i != in_progress_splits.end();
+ ) {
+ assert(old_map->have_pg_pool(i->pool()));
+ if (!new_map->have_pg_pool(i->pool())) {
+ in_progress_splits.erase(i++);
+ } else {
+ i->is_split(old_map->get_pg_num(i->pool()),
+ new_map->get_pg_num(i->pool()), &children);
+ ++i;
+ }
+ }
+ _start_split(children);
+}
+
+bool OSDService::splitting(pg_t pgid)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ return in_progress_splits.count(pgid);
+}
+
+void OSDService::complete_split(const set<pg_t> &pgs)
+{
+ Mutex::Locker l(in_progress_split_lock);
+ for (set<pg_t>::const_iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ assert(in_progress_splits.count(*i));
+ in_progress_splits.erase(*i);
+ }
+}
+
void OSDService::need_heartbeat_peer_update()
{
osd->need_heartbeat_peer_update();
@@ -1271,6 +1319,22 @@ PG *OSD::_open_lock_pg(
{
assert(osd_lock.is_locked());
+ PG* pg = _make_pg(createmap, pgid);
+
+ pg_map[pgid] = pg;
+
+ if (hold_map_lock)
+ pg->lock_with_map_lock_held(no_lockdep_check);
+ else
+ pg->lock(no_lockdep_check);
+ pg->get(); // because it's in pg_map
+ return pg;
+}
+
+PG* OSD::_make_pg(
+ OSDMapRef createmap,
+ pg_t pgid)
+{
dout(10) << "_open_lock_pg " << pgid << dendl;
PGPool pool = _get_pool(pgid.pool(), createmap);
@@ -1283,17 +1347,41 @@ PG *OSD::_open_lock_pg(
else
assert(0);
- assert(pg_map.count(pgid) == 0);
- pg_map[pgid] = pg;
-
- if (hold_map_lock)
- pg->lock_with_map_lock_held(no_lockdep_check);
- else
- pg->lock(no_lockdep_check);
- pg->get(); // because it's in pg_map
return pg;
}
+
+void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
+{
+ epoch_t e(service.get_osdmap()->get_epoch());
+ pg->get(); // For pg_map
+ pg_map[pg->info.pgid] = pg;
+ dout(10) << "Adding newly split pg " << *pg << dendl;
+ vector<int> up, acting;
+ pg->get_osdmap()->pg_to_up_acting_osds(pg->info.pgid, up, acting);
+ int role = pg->get_osdmap()->calc_pg_role(service.whoami, acting);
+ pg->set_role(role);
+ service.reg_last_pg_scrub(pg->info.pgid,
+ pg->info.history.last_scrub_stamp);
+ pg->handle_loaded(rctx);
+ pg->write_if_dirty(*(rctx->transaction));
+ pg->queue_null(e, e);
+ map<pg_t, list<PG::CephPeeringEvtRef> >::iterator to_wake =
+ peering_wait_for_split.find(pg->info.pgid);
+ if (to_wake != peering_wait_for_split.end()) {
+ for (list<PG::CephPeeringEvtRef>::iterator i =
+ to_wake->second.begin();
+ i != to_wake->second.end();
+ ++i) {
+ pg->queue_peering_event(*i);
+ }
+ peering_wait_for_split.erase(to_wake);
+ }
+ wake_pg_waiters(pg->info.pgid);
+ if (!service.get_osdmap()->have_pg_pool(pg->info.pgid.pool()))
+ _remove_pg(pg);
+}
+
PG *OSD::_create_lock_pg(
OSDMapRef createmap,
pg_t pgid, bool newly_created, bool hold_map_lock,
@@ -1424,10 +1512,21 @@ void OSD::load_pgs()
continue;
}
- PG *pg = _open_lock_pg(osdmap, pgid);
+ bufferlist bl;
+ epoch_t map_epoch = PG::peek_map_epoch(store, *it, &bl);
+
+ PG *pg = _open_lock_pg(map_epoch == 0 ? osdmap : service.get_map(map_epoch), pgid);
// read pg state, log
- pg->read_state(store);
+ pg->read_state(store, bl);
+
+ set<pg_t> split_pgs;
+ if (osdmap->have_pg_pool(pg->info.pgid.pool()) &&
+ pg->info.pgid.is_split(pg->get_osdmap()->get_pg_num(pg->info.pgid.pool()),
+ osdmap->get_pg_num(pg->info.pgid.pool()),
+ &split_pgs)) {
+ service.start_split(split_pgs);
+ }
service.reg_last_pg_scrub(pg->info.pgid, pg->info.history.last_scrub_stamp);
@@ -1532,6 +1631,7 @@ void OSD::build_past_intervals_parallel()
pg->info.history.last_epoch_clean,
cur_map, last_map,
pg->info.pgid.pool(),
+ pg->info.pgid,
&pg->past_intervals,
&debug);
if (new_interval) {
@@ -1566,13 +1666,13 @@ void OSD::build_past_intervals_parallel()
store->apply_transaction(t);
}
-
/*
* look up a pg. if we have it, great. if not, consider creating it IF the pg mapping
* hasn't changed since the given epoch and we are the primary.
*/
-PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
- epoch_t epoch, int from, int& created, bool primary)
+PG *OSD::get_or_create_pg(
+ const pg_info_t& info, pg_interval_map_t& pi,
+ epoch_t epoch, int from, int& created, bool primary)
{
PG *pg;
@@ -1593,6 +1693,10 @@ PG *OSD::get_or_create_pg(const pg_info_t& info, pg_interval_map_t& pi,
return NULL;
}
+ if (service.splitting(info.pgid)) {
+ assert(0);
+ }
+
bool create = false;
if (primary) {
assert(role == 0); // otherwise, probably bug in project_pg_history.
@@ -1702,6 +1806,7 @@ void OSD::project_pg_history(pg_t pgid, pg_history_t& h, epoch_t from,
e--) {
// verify during intermediate epoch (e-1)
OSDMapRef oldmap = get_map(e-1);
+ assert(oldmap->have_pg_pool(pgid.pool()));
vector<int> up, acting;
oldmap->pg_to_up_acting_osds(pgid, up, acting);
@@ -1714,6 +1819,12 @@ void OSD::project_pg_history(pg_t pgid, pg_history_t& h, epoch_t from,
<< dendl;
h.same_interval_since = e;
}
+ // split?
+ if (pgid.is_split(oldmap->get_pg_num(pgid.pool()),
+ osdmap->get_pg_num(pgid.pool()),
+ 0)) {
+ h.same_interval_since = e;
+ }
// up set change?
if (up != currentup && e > h.same_up_since) {
dout(15) << "project_pg_history " << pgid << " up changed in " << e
@@ -3546,6 +3657,21 @@ void OSD::note_up_osd(int peer)
forget_peer_epoch(peer, osdmap->get_epoch() - 1);
}
+struct C_OnMapApply : public Context {
+ OSDService *service;
+ boost::scoped_ptr<ObjectStore::Transaction> t;
+ list<OSDMapRef> pinned_maps;
+ epoch_t e;
+ C_OnMapApply(OSDService *service,
+ ObjectStore::Transaction *t,
+ const list<OSDMapRef> &pinned_maps,
+ epoch_t e)
+ : service(service), t(t), pinned_maps(pinned_maps), e(e) {}
+ void finish(int r) {
+ service->clear_map_bl_cache_pins(e);
+ }
+};
+
void OSD::handle_osd_map(MOSDMap *m)
{
assert(osd_lock.is_locked());
@@ -3600,7 +3726,8 @@ void OSD::handle_osd_map(MOSDMap *m)
skip_maps = true;
}
- ObjectStore::Transaction t;
+ ObjectStore::Transaction *_t = new ObjectStore::Transaction;
+ ObjectStore::Transaction &t = *_t;
// store new maps: queue for disk and put in the osdmap cache
epoch_t start = MAX(osdmap->get_epoch() + 1, first);
@@ -3778,17 +3905,13 @@ void OSD::handle_osd_map(MOSDMap *m)
// superblock and commit
write_superblock(t);
- int r = store->apply_transaction(t, fin);
- if (r) {
- map_lock.put_write();
- derr << "error writing map: " << cpp_strerror(-r) << dendl;
- m->put();
- shutdown();
- return;
- }
+ store->queue_transaction(
+ 0,
+ _t,
+ new C_OnMapApply(&service, _t, pinned_maps, osdmap->get_epoch()),
+ 0, fin);
service.publish_superblock(superblock);
- clear_map_bl_cache_pins();
map_lock.put_write();
check_osdmap_features();
@@ -3852,7 +3975,9 @@ void OSD::check_osdmap_features()
}
}
-void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
+void OSD::advance_pg(
+ epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx,
+ set<boost::intrusive_ptr<PG> > *new_pgs)
{
assert(pg->is_locked());
epoch_t next_epoch = pg->get_osdmap()->get_epoch() + 1;
@@ -3866,9 +3991,22 @@ void OSD::advance_pg(epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx)
next_epoch <= osd_epoch;
++next_epoch) {
OSDMapRef nextmap = get_map(next_epoch);
+
vector<int> newup, newacting;
nextmap->pg_to_up_acting_osds(pg->info.pgid, newup, newacting);
pg->handle_advance_map(nextmap, lastmap, newup, newacting, rctx);
+
+ // Check for split!
+ set<pg_t> children;
+ if (pg->info.pgid.is_split(
+ lastmap->get_pg_num(pg->pool.id),
+ nextmap->get_pg_num(pg->pool.id),
+ &children)) {
+ split_pgs(
+ pg, children, new_pgs, lastmap, nextmap,
+ rctx);
+ }
+
lastmap = nextmap;
}
pg->handle_activate_map(rctx);
@@ -3940,6 +4078,22 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
waiting_for_pg.erase(p++);
}
}
+ map<pg_t, list<PG::CephPeeringEvtRef> >::iterator q =
+ peering_wait_for_split.begin();
+ while (q != peering_wait_for_split.end()) {
+ pg_t pgid = q->first;
+
+ // am i still primary?
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
+ if (role >= 0) {
+ ++q; // still me
+ } else {
+ dout(10) << " discarding waiting ops for " << pgid << dendl;
+ peering_wait_for_split.erase(q++);
+ }
+ }
}
void OSD::activate_map()
@@ -3958,6 +4112,9 @@ void OSD::activate_map()
list<PG*> to_remove;
+ service.expand_pg_num(service.get_osdmap(),
+ osdmap);
+
// scan pg's
for (hash_map<pg_t,PG*>::iterator it = pg_map.begin();
it != pg_map.end();
@@ -3974,11 +4131,18 @@ void OSD::activate_map()
if (pg->is_primary() && pg->info.history.last_epoch_clean < oldest_last_clean)
oldest_last_clean = pg->info.history.last_epoch_clean;
+ set<pg_t> split_pgs;
if (!osdmap->have_pg_pool(pg->info.pgid.pool())) {
//pool is deleted!
pg->get();
to_remove.push_back(pg);
+ } else if (it->first.is_split(
+ service.get_osdmap()->get_pg_num(it->first.pool()),
+ osdmap->get_pg_num(it->first.pool()),
+ &split_pgs)) {
+ service.start_split(split_pgs);
}
+
pg->unlock();
}
@@ -4183,11 +4347,11 @@ void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
map_bl_cache.pin(e, bl);
}
-void OSDService::clear_map_bl_cache_pins()
+void OSDService::clear_map_bl_cache_pins(epoch_t e)
{
Mutex::Locker l(map_cache_lock);
- map_bl_inc_cache.clear_pinned();
- map_bl_cache.clear_pinned();
+ map_bl_inc_cache.clear_pinned(e);
+ map_bl_cache.clear_pinned(e);
}
OSDMapRef OSDService::_add_map(OSDMap *o)
@@ -4313,15 +4477,64 @@ bool OSD::can_create_pg(pg_t pgid)
return false;
}
- if (creating_pgs[pgid].split_bits) {
- dout(10) << "can_create_pg " << pgid << " - split" << dendl;
- return false;
- }
-
dout(10) << "can_create_pg " << pgid << " - can create now" << dendl;
return true;
}
+void OSD::split_pgs(
+ PG *parent,
+ const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+ OSDMapRef curmap,
+ OSDMapRef nextmap,
+ PG::RecoveryCtx *rctx)
+{
+ for (set<pg_t>::const_iterator i = childpgids.begin();
+ i != childpgids.end();
+ ++i) {
+ dout(10) << "Splitting " << *parent << " into " << *i << dendl;
+ assert(service.splitting(*i));
+ PG* child = _make_pg(nextmap, *i);
+ child->lock(true);
+ out_pgs->insert(child);
+
+ unsigned pg_num = nextmap->get_pg_num(
+ parent->pool.id);
+
+ unsigned split_bits = i->get_split_bits(pg_num);
+ dout(10) << "pg_num is " << pg_num << dendl;
+ dout(10) << "m_seed " << i->ps() << dendl;
+ dout(10) << "split_bits is " << split_bits << dendl;
+
+ rctx->transaction->split_collection(
+ coll_t(parent->info.pgid),
+ split_bits,
+ i->m_seed,
+ coll_t(*i));
+ for (interval_set<snapid_t>::iterator k = parent->snap_collections.begin();
+ k != parent->snap_collections.end();
+ ++k) {
+ for (snapid_t j = k.get_start(); j < k.get_start() + k.get_len();
+ ++j) {
+ rctx->transaction->split_collection(
+ coll_t(parent->info.pgid, j),
+ split_bits,
+ i->m_seed,
+ coll_t(*i, j));
+ }
+ }
+ child->snap_collections = parent->snap_collections;
+ parent->split_into(
+ *i,
+ child,
+ split_bits);
+
+ child->write_if_dirty(*(rctx->transaction));
+ child->unlock();
+ }
+ parent->write_if_dirty(*(rctx->transaction));
+}
+
+
void OSD::do_split(PG *parent, set<pg_t>& childpgids, ObjectStore::Transaction& t,
C_Contexts *tfin)
{
@@ -4522,7 +4735,8 @@ void OSD::handle_pg_create(OpRequestRef op)
pg_t pgid = p->first;
epoch_t created = p->second.created;
pg_t parent = p->second.parent;
- int split_bits = p->second.split_bits;
+ if (p->second.split_bits) // Skip split pgs
+ continue;
pg_t on = pgid;
if (pgid.preferred() >= 0) {
@@ -4530,13 +4744,7 @@ void OSD::handle_pg_create(OpRequestRef op)
continue;
}
- if (split_bits) {
- on = parent;
- dout(20) << "mkpg " << pgid << " e" << created << " from parent " << parent
- << " split by " << split_bits << " bits" << dendl;
- } else {
- dout(20) << "mkpg " << pgid << " e" << created << dendl;
- }
+ dout(20) << "mkpg " << pgid << " e" << created << dendl;
// is it still ours?
vector<int> up, acting;
@@ -4560,12 +4768,6 @@ void OSD::handle_pg_create(OpRequestRef op)
continue;
}
- // does parent exist?
- if (split_bits && !_have_pg(parent)) {
- dout(10) << "mkpg " << pgid << " missing parent " << parent << ", skipping" << dendl;
- continue;
- }
-
// figure history
pg_history_t history;
history.epoch_created = created;
@@ -4575,7 +4777,6 @@ void OSD::handle_pg_create(OpRequestRef op)
// register.
creating_pgs[pgid].history = history;
creating_pgs[pgid].parent = parent;
- creating_pgs[pgid].split_bits = split_bits;
creating_pgs[pgid].acting.swap(acting);
calc_priors_during(pgid, created, history.same_interval_since,
creating_pgs[pgid].prior);
@@ -4656,7 +4857,9 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap)
delete ctx.query_map;
do_infos(*ctx.info_map, curmap);
delete ctx.info_map;
- if (ctx.transaction->empty() || !pg) {
+ if ((ctx.on_applied->empty() &&
+ ctx.on_safe->empty() &&
+ ctx.transaction->empty()) || !pg) {
delete ctx.transaction;
delete ctx.on_applied;
delete ctx.on_safe;
@@ -4822,8 +5025,17 @@ void OSD::handle_pg_notify(OpRequestRef op)
}
int created = 0;
+ if (service.splitting(it->first.info.pgid)) {
+ peering_wait_for_split[it->first.info.pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ it->first.epoch_sent, it->first.query_epoch,
+ PG::MNotifyRec(from, it->first))));
+ continue;
+ }
+
pg = get_or_create_pg(it->first.info, it->second,
- it->first.query_epoch, from, created, true);
+ it->first.query_epoch, from, created, true);
if (!pg)
continue;
pg->queue_notify(it->first.epoch_sent, it->first.query_epoch, from, it->first);
@@ -4848,9 +5060,18 @@ void OSD::handle_pg_log(OpRequestRef op)
return;
}
+ if (service.splitting(m->info.pgid)) {
+ peering_wait_for_split[m->info.pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ m->get_epoch(), m->get_query_epoch(),
+ PG::MLogRec(from, m))));
+ return;
+ }
+
int created = 0;
PG *pg = get_or_create_pg(m->info, m->past_intervals, m->get_epoch(),
- from, created, false);
+ from, created, false);
if (!pg)
return;
op->mark_started();
@@ -4882,8 +5103,16 @@ void OSD::handle_pg_info(OpRequestRef op)
continue;
}
+ if (service.splitting(p->first.info.pgid)) {
+ peering_wait_for_split[p->first.info.pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ p->first.epoch_sent, p->first.query_epoch,
+ PG::MInfoRec(from, p->first.info, p->first.epoch_sent))));
+ continue;
+ }
PG *pg = get_or_create_pg(p->first.info, p->second, p->first.epoch_sent,
- from, created, false);
+ from, created, false);
if (!pg)
continue;
pg->queue_info(p->first.epoch_sent, p->first.query_epoch, from,
@@ -5123,6 +5352,15 @@ void OSD::handle_pg_query(OpRequestRef op)
continue;
}
+ if (service.splitting(pgid)) {
+ peering_wait_for_split[pgid].push_back(
+ PG::CephPeeringEvtRef(
+ new PG::CephPeeringEvt(
+ it->second.epoch_sent, it->second.epoch_sent,
+ PG::MQuery(from, it->second, it->second.epoch_sent))));
+ continue;
+ }
+
PG *pg = 0;
if (pg_map.count(pgid)) {
@@ -5655,6 +5893,11 @@ void OSD::handle_sub_op(OpRequestRef op)
_share_map_incoming(m->get_source_inst(), m->map_epoch,
(Session*)m->get_connection()->get_priv());
+ if (service.splitting(pgid)) {
+ waiting_for_pg[pgid].push_back(op);
+ return;
+ }
+
PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
if (!pg) {
return;
@@ -5771,7 +6014,10 @@ void OSD::OpWQ::_process(PGRef pg)
OpRequestRef op;
{
Mutex::Locker l(qlock);
- assert(pg_for_processing.count(&*pg));
+ if (!pg_for_processing.count(&*pg)) {
+ pg->unlock();
+ return;
+ }
assert(pg_for_processing[&*pg].size());
op = pg_for_processing[&*pg].front();
pg_for_processing[&*pg].pop_front();
@@ -5782,6 +6028,12 @@ void OSD::OpWQ::_process(PGRef pg)
pg->unlock();
}
+
+void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+{
+ osd->op_wq.dequeue(pg, dequeued);
+}
+
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
@@ -5806,6 +6058,30 @@ void OSDService::queue_for_peering(PG *pg)
peering_wq.queue(pg);
}
+struct C_CompleteSplits : public Context {
+ OSD *osd;
+ set<boost::intrusive_ptr<PG> > pgs;
+ C_CompleteSplits(OSD *osd, const set<boost::intrusive_ptr<PG> > &in)
+ : osd(osd), pgs(in) {}
+ void finish(int r) {
+ Mutex::Locker l(osd->osd_lock);
+ PG::RecoveryCtx rctx = osd->create_context();
+ set<pg_t> to_complete;
+ for (set<boost::intrusive_ptr<PG> >::iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ (*i)->lock();
+ osd->add_newly_split_pg(&**i, &rctx);
+ osd->dispatch_context_transaction(rctx, &**i);
+ if (!((*i)->deleting))
+ to_complete.insert((*i)->info.pgid);
+ (*i)->unlock();
+ }
+ osd->service.complete_split(to_complete);
+ osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
+ }
+};
+
void OSD::process_peering_events(const list<PG*> &pgs)
{
bool need_up_thru = false;
@@ -5815,6 +6091,7 @@ void OSD::process_peering_events(const list<PG*> &pgs)
for (list<PG*>::const_iterator i = pgs.begin();
i != pgs.end();
++i) {
+ set<boost::intrusive_ptr<PG> > split_pgs;
PG *pg = *i;
pg->lock();
curmap = service.get_osdmap();
@@ -5822,7 +6099,7 @@ void OSD::process_peering_events(const list<PG*> &pgs)
pg->unlock();
continue;
}
- advance_pg(curmap->get_epoch(), pg, &rctx);
+ advance_pg(curmap->get_epoch(), pg, &rctx, &split_pgs);
if (!pg->peering_queue.empty()) {
PG::CephPeeringEvtRef evt = pg->peering_queue.front();
pg->peering_queue.pop_front();
@@ -5832,6 +6109,10 @@ void OSD::process_peering_events(const list<PG*> &pgs)
same_interval_since = MAX(pg->info.history.same_interval_since,
same_interval_since);
pg->write_if_dirty(*rctx.transaction);
+ if (split_pgs.size()) {
+ rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
+ split_pgs.clear();
+ }
dispatch_context_transaction(rctx, pg);
pg->unlock();
}
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index 12d50c6f9c4..ce387391180 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -133,6 +133,7 @@ class AuthAuthorizeHandlerRegistry;
class OpsFlightSocketHook;
class HistoricOpsSocketHook;
+struct C_CompleteSplits;
extern const coll_t meta_coll;
@@ -183,6 +184,8 @@ public:
ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
ClassHandler *&class_handler;
+ void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
+
// -- superblock --
Mutex publish_lock, pre_publish_lock;
OSDSuperblock superblock;
@@ -355,7 +358,7 @@ public:
void _add_map_inc_bl(epoch_t e, bufferlist& bl);
bool get_inc_map_bl(epoch_t e, bufferlist& bl);
- void clear_map_bl_cache_pins();
+ void clear_map_bl_cache_pins(epoch_t e);
void need_heartbeat_peer_update();
@@ -365,6 +368,19 @@ public:
void init();
void shutdown();
+ // split
+ Mutex in_progress_split_lock;
+ set<pg_t> in_progress_splits;
+ void _start_split(const set<pg_t> &pgs);
+ void start_split(const set<pg_t> &pgs) {
+ Mutex::Locker l(in_progress_split_lock);
+ return _start_split(pgs);
+ }
+ void complete_split(const set<pg_t> &pgs);
+ bool splitting(pg_t pgid);
+ void expand_pg_num(OSDMapRef old_map,
+ OSDMapRef new_map);
+
OSDService(OSD *osd);
};
class OSD : public Dispatcher {
@@ -601,6 +617,7 @@ private:
}
friend class OpsFlightSocketHook;
friend class HistoricOpsSocketHook;
+ friend class C_CompleteSplits;
OpsFlightSocketHook *admin_ops_hook;
HistoricOpsSocketHook *historic_ops_hook;
@@ -629,9 +646,26 @@ private:
return op.first == pg;
}
};
- void dequeue(PG *pg) {
+ void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
lock();
- pqueue.remove_by_filter(Pred(pg));
+ if (!dequeued) {
+ pqueue.remove_by_filter(Pred(pg));
+ pg_for_processing.erase(pg);
+ } else {
+ list<pair<PGRef, OpRequestRef> > _dequeued;
+ pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+ i != _dequeued.end();
+ ++i) {
+ dequeued->push_back(i->second);
+ }
+ if (pg_for_processing.count(pg)) {
+ dequeued->splice(
+ dequeued->begin(),
+ pg_for_processing[pg]);
+ pg_for_processing.erase(pg);
+ }
+ }
unlock();
}
bool _empty() {
@@ -742,7 +776,9 @@ private:
void note_down_osd(int osd);
void note_up_osd(int osd);
- void advance_pg(epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx);
+ void advance_pg(
+ epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx,
+ set<boost::intrusive_ptr<PG> > *split_pgs);
void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin);
void activate_map();
@@ -771,9 +807,6 @@ private:
bool get_inc_map_bl(epoch_t e, bufferlist& bl) {
return service.get_inc_map_bl(e, bl);
}
- void clear_map_bl_cache_pins() {
- service.clear_map_bl_cache_pins();
- }
MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to);
void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false);
@@ -785,6 +818,7 @@ protected:
// -- placement groups --
hash_map<pg_t, PG*> pg_map;
map<pg_t, list<OpRequestRef> > waiting_for_pg;
+ map<pg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
PGRecoveryStats pg_recovery_stats;
PGPool _get_pool(int id, OSDMapRef createmap);
@@ -806,11 +840,14 @@ protected:
PG *_lookup_qlock_pg(pg_t pgid);
PG *lookup_lock_raw_pg(pg_t pgid);
+ PG* _make_pg(OSDMapRef createmap, pg_t pgid);
+ void add_newly_split_pg(PG *pg,
+ PG::RecoveryCtx *rctx);
PG *get_or_create_pg(const pg_info_t& info,
- pg_interval_map_t& pi,
- epoch_t epoch, int from, int& pcreated,
- bool primary);
+ pg_interval_map_t& pi,
+ epoch_t epoch, int from, int& pcreated,
+ bool primary);
void load_pgs();
void build_past_intervals_parallel();
@@ -840,7 +877,6 @@ protected:
vector<int> acting;
set<int> prior;
pg_t parent;
- int split_bits;
};
hash_map<pg_t, create_pg_info> creating_pgs;
double debug_drop_pg_create_probability;
@@ -852,7 +888,12 @@ protected:
void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin);
void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
-
+ void split_pgs(
+ PG *parent,
+ const set<pg_t> &childpgids, set<boost::intrusive_ptr<PG> > *out_pgs,
+ OSDMapRef curmap,
+ OSDMapRef nextmap,
+ PG::RecoveryCtx *rctx);
// == monitor interaction ==
utime_t last_mon_report;
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index 8c4c29ba7e7..49d12ea35ef 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -869,6 +869,7 @@ void PG::generate_past_intervals()
cur_map,
last_map,
info.pgid.pool(),
+ info.pgid,
&past_intervals,
&debug);
if (new_interval) {
@@ -1906,6 +1907,149 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
osd->osd->finish_recovery_op(this, soid, dequeue);
}
+void PG::IndexedLog::split_into(
+ pg_t child_pgid,
+ unsigned split_bits,
+ PG::IndexedLog *olog)
+{
+ list<pg_log_entry_t> oldlog;
+ oldlog.swap(log);
+
+ eversion_t old_tail;
+ olog->head = head;
+ olog->tail = tail;
+ unsigned mask = ~((~0)<<split_bits);
+ for (list<pg_log_entry_t>::iterator i = oldlog.begin();
+ i != oldlog.end();
+ ) {
+ if ((i->soid.hash & mask) == child_pgid.m_seed) {
+ olog->log.push_back(*i);
+ if (log.empty())
+ tail = i->version;
+ } else {
+ log.push_back(*i);
+ if (olog->empty())
+ olog->tail = i->version;
+ }
+ oldlog.erase(i++);
+ }
+
+ if (log.empty())
+ tail = head;
+ else
+ head = log.rbegin()->version;
+
+ if (olog->empty())
+ olog->tail = olog->head;
+ else
+ olog->head = olog->log.rbegin()->version;
+
+ olog->index();
+ index();
+}
+
+static void split_list(
+ list<OpRequestRef> *from,
+ list<OpRequestRef> *to,
+ unsigned match,
+ unsigned bits)
+{
+ for (list<OpRequestRef>::iterator i = from->begin();
+ i != from->end();
+ ) {
+ if (PG::split_request(*i, match, bits)) {
+ to->push_back(*i);
+ from->erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+static void split_replay_queue(
+ map<eversion_t, OpRequestRef> *from,
+ map<eversion_t, OpRequestRef> *to,
+ unsigned match,
+ unsigned bits)
+{
+ for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
+ i != from->end();
+ ) {
+ if (PG::split_request(i->second, match, bits)) {
+ to->insert(*i);
+ from->erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+void PG::split_ops(PG *child, unsigned split_bits) {
+ unsigned match = child->info.pgid.m_seed;
+ assert(waiting_for_map.empty());
+ assert(waiting_for_all_missing.empty());
+ assert(waiting_for_missing_object.empty());
+ assert(waiting_for_degraded_object.empty());
+ assert(waiting_for_ack.empty());
+ assert(waiting_for_ondisk.empty());
+ split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
+
+ osd->dequeue_pg(this, &waiting_for_active);
+ split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits);
+}
+
+void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
+{
+ child->osdmap_ref = osdmap_ref;
+
+ child->pool = pool;
+
+ // Log
+ log.split_into(child_pgid, split_bits, &(child->log));
+ child->info.last_complete = info.last_complete;
+
+ info.last_update = log.head;
+ child->info.last_update = child->log.head;
+
+ info.log_tail = log.tail;
+ child->info.log_tail = child->log.tail;
+
+ if (info.last_complete < log.tail)
+ info.last_complete = log.tail;
+ if (child->info.last_complete < child->log.tail)
+ child->info.last_complete = child->log.tail;
+
+ // Missing
+ missing.split_into(child_pgid, split_bits, &(child->missing));
+
+ // Info
+ child->info.history = info.history;
+ child->info.purged_snaps = info.purged_snaps;
+ child->info.last_backfill = info.last_backfill;
+
+ child->info.stats = info.stats;
+ info.stats.stats_invalid = true;
+ child->info.stats.stats_invalid = true;
+ child->info.last_epoch_started = info.last_epoch_started;
+
+ child->snap_trimq = snap_trimq;
+
+ get_osdmap()->pg_to_up_acting_osds(child->info.pgid, child->up, child->acting);
+ child->role = get_osdmap()->calc_pg_role(osd->whoami, child->acting);
+ if (get_primary() != child->get_primary())
+ child->info.history.same_primary_since = get_osdmap()->get_epoch();
+
+ // History
+ child->past_intervals = past_intervals;
+
+ split_ops(child, split_bits);
+ _split_into(child_pgid, child, split_bits);
+
+ child->dirty_info = true;
+ child->dirty_log = true;
+ dirty_info = true;
+ dirty_log = true;
+}
void PG::defer_recovery()
{
@@ -2150,8 +2294,9 @@ void PG::write_info(ObjectStore::Transaction& t)
{
// pg state
bufferlist infobl;
- __u8 struct_v = 4;
+ __u8 struct_v = 5;
::encode(struct_v, infobl);
+ ::encode(get_osdmap()->get_epoch(), infobl);
t.collection_setattr(coll, "info", infobl);
// potentially big stuff
@@ -2166,6 +2311,20 @@ void PG::write_info(ObjectStore::Transaction& t)
dirty_info = false;
}
+epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, bufferlist *bl)
+{
+ assert(bl);
+ store->collection_getattr(coll, "info", *bl);
+ bufferlist::iterator bp = bl->begin();
+ __u8 struct_v = 0;
+ ::decode(struct_v, bp);
+ if (struct_v < 5)
+ return 0;
+ epoch_t cur_epoch = 0;
+ ::decode(cur_epoch, bp);
+ return cur_epoch;
+}
+
void PG::write_log(ObjectStore::Transaction& t)
{
dout(10) << "write_log" << dendl;
@@ -2612,15 +2771,12 @@ std::string PG::get_corrupt_pg_log_name() const
return buf;
}
-void PG::read_state(ObjectStore *store)
+void PG::read_state(ObjectStore *store, bufferlist &bl)
{
- bufferlist bl;
- bufferlist::iterator p;
+ bufferlist::iterator p = bl.begin();
__u8 struct_v;
// info
- store->collection_getattr(coll, "info", bl);
- p = bl.begin();
::decode(struct_v, p);
if (struct_v < 4)
::decode(info, p);
@@ -4318,6 +4474,13 @@ bool PG::may_need_replay(const OSDMapRef osdmap) const
return crashed;
}
+bool PG::is_split(OSDMapRef lastmap, OSDMapRef nextmap)
+{
+ return info.pgid.is_split(
+ lastmap->get_pg_num(pool.id),
+ nextmap->get_pg_num(pool.id),
+ 0);
+}
bool PG::acting_up_affected(const vector<int>& newup, const vector<int>& newacting)
{
@@ -4426,14 +4589,14 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
info.history.same_interval_since,
info.history.last_epoch_clean,
osdmap,
- lastmap, info.pgid.pool(), &past_intervals);
+ lastmap, info.pgid.pool(), info.pgid, &past_intervals);
if (new_interval) {
dout(10) << " noting past " << past_intervals.rbegin()->second << dendl;
dirty_info = true;
}
}
- if (oldacting != acting || oldup != up) {
+ if (oldacting != acting || oldup != up || is_split(lastmap, osdmap)) {
info.history.same_interval_since = osdmap->get_epoch();
}
if (oldup != up) {
@@ -4706,6 +4869,24 @@ bool PG::can_discard_request(OpRequestRef op)
return true;
}
+bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
+{
+ unsigned mask = ~((~0)<<bits);
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
+ case MSG_OSD_SUBOP:
+ return false;
+ case MSG_OSD_SUBOPREPLY:
+ return false;
+ case MSG_OSD_PG_SCAN:
+ return false;
+ case MSG_OSD_PG_BACKFILL:
+ return false;
+ }
+ return false;
+}
+
bool PG::must_delay_request(OpRequestRef op)
{
switch (op->request->get_type()) {
@@ -4968,7 +5149,8 @@ boost::statechart::result PG::RecoveryState::Started::react(const AdvMap& advmap
{
dout(10) << "Started advmap" << dendl;
PG *pg = context< RecoveryMachine >().pg;
- if (pg->acting_up_affected(advmap.newup, advmap.newacting)) {
+ if (pg->acting_up_affected(advmap.newup, advmap.newacting) ||
+ pg->is_split(advmap.lastmap, advmap.osdmap)) {
dout(10) << "up or acting affected, transitioning to Reset" << dendl;
post_event(advmap);
return transit< Reset >();
@@ -5020,7 +5202,8 @@ boost::statechart::result PG::RecoveryState::Reset::react(const AdvMap& advmap)
pg->generate_past_intervals();
pg->remove_down_peer_info(advmap.osdmap);
- if (pg->acting_up_affected(advmap.newup, advmap.newacting)) {
+ if (pg->acting_up_affected(advmap.newup, advmap.newacting) ||
+ pg->is_split(advmap.lastmap, advmap.osdmap)) {
dout(10) << "up or acting affected, calling start_peering_interval again"
<< dendl;
pg->start_peering_interval(advmap.lastmap, advmap.newup, advmap.newacting);
diff --git a/src/osd/PG.h b/src/osd/PG.h
index b9693fb072a..2cf1173203d 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -180,6 +180,11 @@ public:
index();
}
+ void split_into(
+ pg_t child_pgid,
+ unsigned split_bits,
+ IndexedLog *olog);
+
void zero() {
unindex();
pg_log_t::clear();
@@ -653,6 +658,7 @@ protected:
waiting_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
+ void split_ops(PG *child, unsigned split_bits);
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
void requeue_ops(list<OpRequestRef> &l);
@@ -789,6 +795,9 @@ public:
void start_recovery_op(const hobject_t& soid);
void finish_recovery_op(const hobject_t& soid, bool dequeue=false);
+ void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
+
loff_t get_log_write_pos() {
return 0;
}
@@ -1700,7 +1709,9 @@ public:
void trim_peers();
std::string get_corrupt_pg_log_name() const;
- void read_state(ObjectStore *store);
+ void read_state(ObjectStore *store, bufferlist &bl);
+ static epoch_t peek_map_epoch(ObjectStore *store,
+ coll_t coll, bufferlist *bl);
coll_t make_snap_collection(ObjectStore::Transaction& t, snapid_t sn);
void update_snap_collections(vector<pg_log_entry_t> &log_entries,
ObjectStore::Transaction& t);
@@ -1729,6 +1740,7 @@ public:
void fulfill_info(int from, const pg_query_t &query,
pair<int, pg_info_t> &notify_info);
void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch);
+ bool is_split(OSDMapRef lastmap, OSDMapRef nextmap);
bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
// OpRequest queueing
@@ -1740,6 +1752,8 @@ public:
bool must_delay_request(OpRequestRef op);
+ static bool split_request(OpRequestRef op, unsigned match, unsigned bits);
+
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc
index 76ad5089493..cf1b1f14683 100644
--- a/src/osd/ReplicatedPG.cc
+++ b/src/osd/ReplicatedPG.cc
@@ -6063,6 +6063,10 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
unlock();
}
+void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)
+{
+ assert(repop_queue.empty());
+}
/*
* pg status change notification
@@ -7085,6 +7089,11 @@ void ReplicatedPG::_scrub_finish()
bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
+ if (info.stats.stats_invalid) {
+ info.stats.stats = scrub_cstat;
+ info.stats.stats_invalid = false;
+ }
+
dout(10) << mode << " got "
<< scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
<< scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h
index 5abc8e53657..fbc1b65571c 100644
--- a/src/osd/ReplicatedPG.h
+++ b/src/osd/ReplicatedPG.h
@@ -930,6 +930,7 @@ protected:
virtual void _scrub_finish();
object_stat_collection_t scrub_cstat;
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits);
void apply_and_flush_repops(bool requeue);
void calc_trim_to();
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index e8db13e50db..6fdac54aa5e 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -221,6 +221,27 @@ bool pg_t::is_split(unsigned old_pg_num, unsigned new_pg_num, set<pg_t> *childre
return split;
}
+unsigned pg_t::get_split_bits(unsigned pg_num) const {
+ assert(pg_num > 1);
+
+ // Find unique p such that pg_num \in [2^(p-1), 2^p)
+ unsigned p = pg_pool_t::calc_bits_of(pg_num);
+
+ if ((m_seed % (1<<(p-1))) < (pg_num % (1<<(p-1))))
+ return p;
+ else
+ return p - 1;
+}
+
+pg_t pg_t::get_parent() const
+{
+ unsigned bits = pg_pool_t::calc_bits_of(m_seed);
+ assert(bits);
+ pg_t retval = *this;
+ retval.m_seed &= ~((~0)<<(bits - 1));
+ return retval;
+}
+
void pg_t::dump(Formatter *f) const
{
f->dump_unsigned("pool", m_pool);
@@ -998,6 +1019,7 @@ void pg_stat_t::dump(Formatter *f) const
f->dump_stream("last_deep_scrub_stamp") << last_deep_scrub_stamp;
f->dump_unsigned("log_size", log_size);
f->dump_unsigned("ondisk_log_size", ondisk_log_size);
+ f->dump_stream("stats_invalid") << stats_invalid;
stats.dump(f);
f->open_array_section("up");
for (vector<int>::const_iterator p = up.begin(); p != up.end(); ++p)
@@ -1011,7 +1033,7 @@ void pg_stat_t::dump(Formatter *f) const
void pg_stat_t::encode(bufferlist &bl) const
{
- ENCODE_START(10, 8, bl);
+ ENCODE_START(11, 8, bl);
::encode(version, bl);
::encode(reported, bl);
::encode(state, bl);
@@ -1036,6 +1058,7 @@ void pg_stat_t::encode(bufferlist &bl) const
::encode(mapping_epoch, bl);
::encode(last_deep_scrub, bl);
::encode(last_deep_scrub_stamp, bl);
+ ::encode(stats_invalid, bl);
ENCODE_FINISH(bl);
}
@@ -1105,6 +1128,11 @@ void pg_stat_t::decode(bufferlist::iterator &bl)
}
}
}
+ if (struct_v < 11) {
+ stats_invalid = false;
+ } else {
+ ::decode(stats_invalid, bl);
+ }
DECODE_FINISH(bl);
}
@@ -1487,14 +1515,17 @@ bool pg_interval_t::check_new_interval(
OSDMapRef osdmap,
OSDMapRef lastmap,
int64_t pool_id,
+ pg_t pgid,
map<epoch_t, pg_interval_t> *past_intervals,
std::ostream *out)
{
// remember past interval
if (new_acting != old_acting || new_up != old_up ||
(!(lastmap->get_pools().count(pool_id))) ||
- lastmap->get_pools().find(pool_id)->second.min_size !=
- osdmap->get_pools().find(pool_id)->second.min_size) {
+ (lastmap->get_pools().find(pool_id)->second.min_size !=
+ osdmap->get_pools().find(pool_id)->second.min_size) ||
+ pgid.is_split(lastmap->get_pg_num(pgid.pool()),
+ osdmap->get_pg_num(pgid.pool()), 0)) {
pg_interval_t& i = (*past_intervals)[same_interval_since];
i.first = same_interval_since;
i.last = osdmap->get_epoch() - 1;
@@ -2029,6 +2060,24 @@ void pg_missing_t::got(const std::map<hobject_t, pg_missing_t::item>::iterator &
missing.erase(m);
}
+void pg_missing_t::split_into(
+ pg_t child_pgid,
+ unsigned split_bits,
+ pg_missing_t *omissing)
+{
+ unsigned mask = ~((~0)<<split_bits);
+ for (map<hobject_t, item>::iterator i = missing.begin();
+ i != missing.end();
+ ) {
+ if ((i->first.hash & mask) == child_pgid.m_seed) {
+ omissing->add(i->first, i->second.need, i->second.have);
+ rm(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
// -- pg_create_t --
void pg_create_t::encode(bufferlist &bl) const
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index c1e5aa26585..81f1aa2962d 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -235,11 +235,19 @@ struct pg_t {
m_preferred = osd;
}
+ pg_t get_parent() const;
+
int print(char *o, int maxlen) const;
bool parse(const char *s);
bool is_split(unsigned old_pg_num, unsigned new_pg_num, set<pg_t> *pchildren) const;
+ /**
+ * Returns b such that for all object o:
+ * ~((~0)<<b) & o.hash) == 0 iff o is in the pg for *this
+ */
+ unsigned get_split_bits(unsigned pg_num) const;
+
void encode(bufferlist& bl) const {
__u8 v = 1;
::encode(v, bl);
@@ -879,6 +887,7 @@ struct pg_stat_t {
utime_t last_deep_scrub_stamp;
object_stat_collection_t stats;
+ bool stats_invalid;
int64_t log_size;
int64_t ondisk_log_size; // >= active_log_size
@@ -890,6 +899,7 @@ struct pg_stat_t {
: state(0),
created(0), last_epoch_clean(0),
parent_split_bits(0),
+ stats_invalid(false),
log_size(0), ondisk_log_size(0),
mapping_epoch(0)
{ }
@@ -1144,6 +1154,7 @@ struct pg_interval_t {
std::tr1::shared_ptr<const OSDMap> osdmap, ///< [in] current map
std::tr1::shared_ptr<const OSDMap> lastmap, ///< [in] last map
int64_t poolid, ///< [in] pool for pg
+ pg_t pgid, ///< [in] pgid for pg
map<epoch_t, pg_interval_t> *past_intervals,///< [out] intervals
ostream *out = 0 ///< [out] debug ostream
);
@@ -1449,6 +1460,7 @@ struct pg_missing_t {
void rm(const std::map<hobject_t, pg_missing_t::item>::iterator &m);
void got(const hobject_t& oid, eversion_t v);
void got(const std::map<hobject_t, pg_missing_t::item>::iterator &m);
+ void split_into(pg_t child_pgid, unsigned split_bits, pg_missing_t *omissing);
void clear() {
missing.clear();
diff --git a/src/test/cli/ceph-conf/env-vs-args.t b/src/test/cli/ceph-conf/env-vs-args.t
index 7eefd98b793..76b2dec9cfb 100644
--- a/src/test/cli/ceph-conf/env-vs-args.t
+++ b/src/test/cli/ceph-conf/env-vs-args.t
@@ -5,6 +5,6 @@
# command-line arguments should override environment
$ env -u CEPH_CONF ceph-conf -c from-args
- global_init: unable to open config file. (re)
+ global_init: unable to open config file from search list from-args
[1]
diff --git a/src/test/filestore/store_test.cc b/src/test/filestore/store_test.cc
index af13c4493e6..c98ffb047ac 100644
--- a/src/test/filestore/store_test.cc
+++ b/src/test/filestore/store_test.cc
@@ -745,6 +745,84 @@ TEST_F(StoreTest, XattrTest) {
ASSERT_TRUE(bl2 == attrs["attr3"]);
}
+void colsplittest(
+ ObjectStore *store,
+ unsigned num_objects,
+ unsigned common_suffix_size
+ ) {
+ coll_t cid("from");
+ coll_t tid("to");
+ int r = 0;
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(cid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ {
+ ObjectStore::Transaction t;
+ for (uint32_t i = 0; i < 2*num_objects; ++i) {
+ stringstream objname;
+ objname << "obj" << i;
+ t.touch(cid, hobject_t(
+ objname.str(),
+ "",
+ CEPH_NOSNAP,
+ i<<common_suffix_size,
+ 0));
+ }
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.split_collection(cid, common_suffix_size+1, 0, tid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+ }
+
+ ObjectStore::Transaction t;
+ vector<hobject_t> objects;
+ r = store->collection_list(cid, objects);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(objects.size(), num_objects);
+ for (vector<hobject_t>::iterator i = objects.begin();
+ i != objects.end();
+ ++i) {
+ ASSERT_EQ(!(i->hash & (1<<common_suffix_size)), 0u);
+ t.remove(cid, *i);
+ }
+
+ objects.clear();
+ r = store->collection_list(tid, objects);
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(objects.size(), num_objects);
+ for (vector<hobject_t>::iterator i = objects.begin();
+ i != objects.end();
+ ++i) {
+ ASSERT_EQ(i->hash & (1<<common_suffix_size), 0u);
+ t.remove(tid, *i);
+ }
+
+ t.remove_collection(cid);
+ t.remove_collection(tid);
+ r = store->apply_transaction(t);
+ ASSERT_EQ(r, 0);
+}
+
+TEST_F(StoreTest, ColSplitTest1) {
+ colsplittest(store.get(), 10000, 11);
+}
+TEST_F(StoreTest, ColSplitTest2) {
+ colsplittest(store.get(), 100, 7);
+}
+
+#if 0
+TEST_F(StoreTest, ColSplitTest3) {
+ colsplittest(store.get(), 100000, 25);
+}
+#endif
+
int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);
@@ -752,6 +830,9 @@ int main(int argc, char **argv) {
global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
g_ceph_context->_conf->set_val("osd_journal_size", "400");
+ g_ceph_context->_conf->set_val("filestore_index_retry_probability", "1");
+ g_ceph_context->_conf->set_val("filestore_op_thread_timeout", "1000");
+ g_ceph_context->_conf->set_val("filestore_op_thread_suicide_timeout", "10000");
g_ceph_context->_conf->apply_changes(NULL);
::testing::InitGoogleTest(&argc, argv);