diff options
Diffstat (limited to 'src/mongo/s/distlock_test.cpp')
-rw-r--r-- | src/mongo/s/distlock_test.cpp | 639 |
1 files changed, 318 insertions, 321 deletions
diff --git a/src/mongo/s/distlock_test.cpp b/src/mongo/s/distlock_test.cpp index 9b1dd3f8ab3..d0fea0b3448 100644 --- a/src/mongo/s/distlock_test.cpp +++ b/src/mongo/s/distlock_test.cpp @@ -73,375 +73,372 @@ // TODO: Make a method in BSONObj if useful, don't modify for now -#define string_field(obj, name, def) ( obj.hasField(name) ? obj[name].String() : def ) -#define number_field(obj, name, def) ( obj.hasField(name) ? obj[name].Number() : def ) +#define string_field(obj, name, def) (obj.hasField(name) ? obj[name].String() : def) +#define number_field(obj, name, def) (obj.hasField(name) ? obj[name].Number() : def) namespace mongo { - using std::shared_ptr; - using std::endl; - using std::string; - using std::stringstream; - using std::vector; - - /** - * Stress test distributed lock by running multiple threads to contend with a single lock. - * Also has an option to make some thread terminate while holding the lock and have some - * other thread take over it after takeoverMS has elapsed. Note that this test does not check - * whether the lock was eventually overtaken and this is only valid if the LockPinger frequency - * is faster than takeoverMS. - * - * { - * _testDistLockWithSkew: 1, - * - * lockName: <string for distributed lock>, - * host: <connection string for config server>, - * seed: <numeric seed for random generator>, - * numThreads: <num of threads to spawn and grab the lock>, - * - * takeoverMS: <duration of missed ping in milliSeconds before a lock can be overtaken>, - * wait: <time in milliseconds before stopping the test threads>, - * skewHosts: <Array<Numeric>, numbers to be used when calling _skewClockCommand - * against each config server>, - * threadWait: <upper bound wait in milliSeconds while holding a lock>, - * - * hangThreads: <integer n, where 1 out of n threads will abort after acquiring lock>, - * threadSleep: <upper bound sleep duration in mSecs between each round of lock operation>, - * skewRange: <maximum skew variance in milliSeconds for a thread's clock, delta will never - * be greater than skewRange/2> - * } - */ - class TestDistLockWithSkew: public Command { - public: - - static const int logLvl = 1; +using std::shared_ptr; +using std::endl; +using std::string; +using std::stringstream; +using std::vector; + +/** + * Stress test distributed lock by running multiple threads to contend with a single lock. + * Also has an option to make some thread terminate while holding the lock and have some + * other thread take over it after takeoverMS has elapsed. Note that this test does not check + * whether the lock was eventually overtaken and this is only valid if the LockPinger frequency + * is faster than takeoverMS. + * + * { + * _testDistLockWithSkew: 1, + * + * lockName: <string for distributed lock>, + * host: <connection string for config server>, + * seed: <numeric seed for random generator>, + * numThreads: <num of threads to spawn and grab the lock>, + * + * takeoverMS: <duration of missed ping in milliSeconds before a lock can be overtaken>, + * wait: <time in milliseconds before stopping the test threads>, + * skewHosts: <Array<Numeric>, numbers to be used when calling _skewClockCommand + * against each config server>, + * threadWait: <upper bound wait in milliSeconds while holding a lock>, + * + * hangThreads: <integer n, where 1 out of n threads will abort after acquiring lock>, + * threadSleep: <upper bound sleep duration in mSecs between each round of lock operation>, + * skewRange: <maximum skew variance in milliSeconds for a thread's clock, delta will never + * be greater than skewRange/2> + * } + */ +class TestDistLockWithSkew : public Command { +public: + static const int logLvl = 1; - TestDistLockWithSkew() : - Command("_testDistLockWithSkew") { - } - virtual void help(stringstream& help) const { - help << "should not be calling this directly" << endl; - } + TestDistLockWithSkew() : Command("_testDistLockWithSkew") {} + virtual void help(stringstream& help) const { + help << "should not be calling this directly" << endl; + } - virtual bool slaveOk() const { - return false; - } - virtual bool adminOnly() const { - return true; + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual bool isWriteCommandForConfigServer() const { + return false; + } + // No auth needed because it only works when enabled via command line. + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) {} + + void runThread(ConnectionString& hostConn, + unsigned threadId, + unsigned seed, + BSONObj& cmdObj, + BSONObjBuilder& result) { + stringstream ss; + ss << "thread-" << threadId; + setThreadName(ss.str().c_str()); + + // Lock name + string lockName = string_field(cmdObj, "lockName", this->name + "_lock"); + + // Range of clock skew in diff threads + int skewRange = (int)number_field(cmdObj, "skewRange", 1); + + // How long to wait with the lock + int threadWait = (int)number_field(cmdObj, "threadWait", 30); + if (threadWait <= 0) + threadWait = 1; + + // Max amount of time (ms) a thread waits before checking the lock again + int threadSleep = (int)number_field(cmdObj, "threadSleep", 30); + if (threadSleep <= 0) + threadSleep = 1; + + // How long until the lock is forced in ms, only compared locally + unsigned long long takeoverMS = (unsigned long long)number_field(cmdObj, "takeoverMS", 0); + + // Whether or not we should hang some threads + int hangThreads = (int)number_field(cmdObj, "hangThreads", 0); + + + boost::mt19937 gen((boost::mt19937::result_type)seed); + + boost::variate_generator<boost::mt19937&, boost::uniform_int<>> randomSkew( + gen, boost::uniform_int<>(0, skewRange)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<>> randomWait( + gen, boost::uniform_int<>(1, threadWait)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<>> randomSleep( + gen, boost::uniform_int<>(1, threadSleep)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<>> randomNewLock( + gen, boost::uniform_int<>(0, 3)); + + + int skew = 0; + if (!lock.get()) { + // Pick a skew, but the first two threads skew the whole range + if (threadId == 0) + skew = -skewRange / 2; + else if (threadId == 1) + skew = skewRange / 2; + else + skew = randomSkew() - (skewRange / 2); + + // Skew this thread + jsTimeVirtualThreadSkew(skew); + + log() << "Initializing lock with skew of " << skew << " for thread " << threadId + << endl; + + lock.reset(new DistributedLock(hostConn, lockName, takeoverMS, true)); + + log() << "Skewed time " << jsTime() << " for thread " << threadId << endl + << " max wait (with lock: " << threadWait << ", after lock: " << threadSleep + << ")" << endl + << " takeover in " << takeoverMS << "(ms remote)" << endl; } - virtual bool isWriteCommandForConfigServer() const { return false; } - // No auth needed because it only works when enabled via command line. - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) {} - - void runThread(ConnectionString& hostConn, unsigned threadId, unsigned seed, - BSONObj& cmdObj, BSONObjBuilder& result) { - - stringstream ss; - ss << "thread-" << threadId; - setThreadName(ss.str().c_str()); - - // Lock name - string lockName = string_field(cmdObj, "lockName", this->name + "_lock"); - - // Range of clock skew in diff threads - int skewRange = (int) number_field(cmdObj, "skewRange", 1); - - // How long to wait with the lock - int threadWait = (int) number_field(cmdObj, "threadWait", 30); - if(threadWait <= 0) threadWait = 1; - - // Max amount of time (ms) a thread waits before checking the lock again - int threadSleep = (int) number_field(cmdObj, "threadSleep", 30); - if(threadSleep <= 0) threadSleep = 1; - - // How long until the lock is forced in ms, only compared locally - unsigned long long takeoverMS = (unsigned long long) number_field(cmdObj, "takeoverMS", 0); - // Whether or not we should hang some threads - int hangThreads = (int) number_field(cmdObj, "hangThreads", 0); + DistributedLock* myLock = lock.get(); + bool errors = false; + BSONObj lockObj; + while (keepGoing.loadRelaxed()) { + Status pingStatus = _pinger.startPing( + *myLock, stdx::chrono::milliseconds(takeoverMS / LOCK_SKEW_FACTOR)); - boost::mt19937 gen((boost::mt19937::result_type) seed); - - boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange)); - boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait)); - boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep)); - boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomNewLock(gen, boost::uniform_int<>(0, 3)); - - - int skew = 0; - if (!lock.get()) { - - // Pick a skew, but the first two threads skew the whole range - if(threadId == 0) - skew = -skewRange / 2; - else if(threadId == 1) - skew = skewRange / 2; - else skew = randomSkew() - (skewRange / 2); - - // Skew this thread - jsTimeVirtualThreadSkew( skew ); - - log() << "Initializing lock with skew of " << skew << " for thread " << threadId << endl; - - lock.reset(new DistributedLock(hostConn, lockName, takeoverMS, true )); - - log() << "Skewed time " << jsTime() << " for thread " << threadId << endl - << " max wait (with lock: " << threadWait << ", after lock: " << threadSleep << ")" << endl - << " takeover in " << takeoverMS << "(ms remote)" << endl; - + if (!pingStatus.isOK()) { + log() << "**** Not good for pinging: " << pingStatus; + break; } - DistributedLock* myLock = lock.get(); - - bool errors = false; - BSONObj lockObj; - while (keepGoing.loadRelaxed()) { - Status pingStatus = _pinger.startPing(*myLock, - stdx::chrono::milliseconds(takeoverMS / LOCK_SKEW_FACTOR)); + try { + if (myLock->lock_try("Testing distributed lock with skew.", &lockObj)) { + log() << "**** Locked for thread " << threadId << " with ts " << lockObj["ts"] + << endl; + + if (count.loadRelaxed() % 3 == 1 && + myLock->lock_try("Testing lock non-re-entry.")) { + errors = true; + log() << "**** !Invalid lock re-entry" << endl; + break; + } - if (!pingStatus.isOK()) { - log() << "**** Not good for pinging: " << pingStatus; - break; - } + int before = count.addAndFetch(1); + int sleep = randomWait(); + sleepmillis(sleep); + int after = count.loadRelaxed(); - try { - - if (myLock->lock_try("Testing distributed lock with skew.", &lockObj)) { - - log() << "**** Locked for thread " << threadId << " with ts " << lockObj["ts"] << endl; - - if (count.loadRelaxed() % 3 == 1 && - myLock->lock_try( "Testing lock non-re-entry.")) { - errors = true; - log() << "**** !Invalid lock re-entry" << endl; - break; - } - - int before = count.addAndFetch(1); - int sleep = randomWait(); - sleepmillis(sleep); - int after = count.loadRelaxed(); - - if(after != before) { - errors = true; - log() << "**** !Bad increment while sleeping with lock for: " << sleep << "ms" << endl; - break; - } - - // Unlock only half the time... - if(hangThreads == 0 || threadId % hangThreads != 0) { - log() << "**** Unlocking for thread " << threadId << " with ts " << lockObj["ts"] << endl; - myLock->unlock(lockObj["ts"].OID()); - } - else { - log() << "**** Not unlocking for thread " << threadId << endl; - _pinger.stopPing(myLock->getRemoteConnection(), myLock->getProcessId()); - // We're simulating a crashed process... - break; - } + if (after != before) { + errors = true; + log() << "**** !Bad increment while sleeping with lock for: " << sleep + << "ms" << endl; + break; } - } - catch( const DBException& ex ) { - log() << "*** !Could not try distributed lock." << causedBy( ex ) << endl; - break; - } - - // Create a new lock 1/3 of the time - if( randomNewLock() > 1 ){ - lock.reset(new DistributedLock( hostConn, lockName, takeoverMS, true )); - myLock = lock.get(); + // Unlock only half the time... + if (hangThreads == 0 || threadId % hangThreads != 0) { + log() << "**** Unlocking for thread " << threadId << " with ts " + << lockObj["ts"] << endl; + myLock->unlock(lockObj["ts"].OID()); + } else { + log() << "**** Not unlocking for thread " << threadId << endl; + _pinger.stopPing(myLock->getRemoteConnection(), myLock->getProcessId()); + // We're simulating a crashed process... + break; + } } - sleepmillis(randomSleep()); + } catch (const DBException& ex) { + log() << "*** !Could not try distributed lock." << causedBy(ex) << endl; + break; } - result << "errors" << errors - << "skew" << skew - << "takeover" << (long long) takeoverMS - << "localTimeout" << (takeoverMS > 0); - - } + // Create a new lock 1/3 of the time + if (randomNewLock() > 1) { + lock.reset(new DistributedLock(hostConn, lockName, takeoverMS, true)); + myLock = lock.get(); + } - void test(ConnectionString& hostConn, string& lockName, unsigned seed) { - return; + sleepmillis(randomSleep()); } - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { - - Timer t; - - ConnectionString hostConn(cmdObj["host"].String(), - ConnectionString::SYNC); - - unsigned seed = (unsigned) number_field(cmdObj, "seed", 0); - int numThreads = (int) number_field(cmdObj, "numThreads", 4); - int wait = (int) number_field(cmdObj, "wait", 10000); - - log() << "Starting " << this->name << " with -" << endl - << " seed: " << seed << endl - << " numThreads: " << numThreads << endl - << " total wait: " << wait << endl << endl; - - // Skew host clocks if needed - try { - skewClocks( hostConn, cmdObj ); - } - catch( DBException e ) { - errmsg = str::stream() << "Clocks could not be skewed." << causedBy( e ); - return false; - } - - count.store(0); - keepGoing.store(true); - - vector<shared_ptr<stdx::thread> > threads; - vector<shared_ptr<BSONObjBuilder> > results; - for (int i = 0; i < numThreads; i++) { - results.push_back(shared_ptr<BSONObjBuilder> (new BSONObjBuilder())); - threads.push_back(shared_ptr<stdx::thread> (new stdx::thread( - stdx::bind(&TestDistLockWithSkew::runThread, this, - hostConn, (unsigned) i, seed + i, boost::ref(cmdObj), - boost::ref(*(results[i].get())))))); - } + result << "errors" << errors << "skew" << skew << "takeover" << (long long)takeoverMS + << "localTimeout" << (takeoverMS > 0); + } - sleepsecs(wait / 1000); - keepGoing.store(false); + void test(ConnectionString& hostConn, string& lockName, unsigned seed) { + return; + } - bool errors = false; - for (unsigned i = 0; i < threads.size(); i++) { - threads[i]->join(); - errors = errors || results[i].get()->obj()["errors"].Bool(); - } + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + Timer t; + + ConnectionString hostConn(cmdObj["host"].String(), ConnectionString::SYNC); + + unsigned seed = (unsigned)number_field(cmdObj, "seed", 0); + int numThreads = (int)number_field(cmdObj, "numThreads", 4); + int wait = (int)number_field(cmdObj, "wait", 10000); + + log() << "Starting " << this->name << " with -" << endl + << " seed: " << seed << endl + << " numThreads: " << numThreads << endl + << " total wait: " << wait << endl + << endl; + + // Skew host clocks if needed + try { + skewClocks(hostConn, cmdObj); + } catch (DBException e) { + errmsg = str::stream() << "Clocks could not be skewed." << causedBy(e); + return false; + } - result.append("count", count.loadRelaxed()); - result.append("errors", errors); - result.append("timeMS", t.millis()); + count.store(0); + keepGoing.store(true); + + vector<shared_ptr<stdx::thread>> threads; + vector<shared_ptr<BSONObjBuilder>> results; + for (int i = 0; i < numThreads; i++) { + results.push_back(shared_ptr<BSONObjBuilder>(new BSONObjBuilder())); + threads.push_back(shared_ptr<stdx::thread>( + new stdx::thread(stdx::bind(&TestDistLockWithSkew::runThread, + this, + hostConn, + (unsigned)i, + seed + i, + boost::ref(cmdObj), + boost::ref(*(results[i].get())))))); + } - return !errors; + sleepsecs(wait / 1000); + keepGoing.store(false); + bool errors = false; + for (unsigned i = 0; i < threads.size(); i++) { + threads[i]->join(); + errors = errors || results[i].get()->obj()["errors"].Bool(); } - /** - * Skews the clocks of a remote cluster by a particular amount, specified by - * the "skewHosts" element in a BSONObj. - */ - static void skewClocks( ConnectionString& cluster, BSONObj& cmdObj ) { - - vector<long long> skew; - if(cmdObj.hasField("skewHosts")) { - bsonArrToNumVector<long long>(cmdObj["skewHosts"], skew); - } - else { - LOG( logLvl ) << "No host clocks to skew." << endl; - return; - } + result.append("count", count.loadRelaxed()); + result.append("errors", errors); + result.append("timeMS", t.millis()); - LOG( logLvl ) << "Skewing clocks of hosts " << cluster << endl; + return !errors; + } - unsigned s = 0; - for(vector<long long>::iterator i = skew.begin(); i != skew.end(); ++i,s++) { + /** + * Skews the clocks of a remote cluster by a particular amount, specified by + * the "skewHosts" element in a BSONObj. + */ + static void skewClocks(ConnectionString& cluster, BSONObj& cmdObj) { + vector<long long> skew; + if (cmdObj.hasField("skewHosts")) { + bsonArrToNumVector<long long>(cmdObj["skewHosts"], skew); + } else { + LOG(logLvl) << "No host clocks to skew." << endl; + return; + } - ConnectionString server( cluster.getServers()[s] ); - ScopedDbConnection conn(server.toString()); + LOG(logLvl) << "Skewing clocks of hosts " << cluster << endl; - BSONObj result; - try { - bool success = conn->runCommand( string("admin"), - BSON( "_skewClockCommand" << 1 - << "skew" << *i ), - result ); + unsigned s = 0; + for (vector<long long>::iterator i = skew.begin(); i != skew.end(); ++i, s++) { + ConnectionString server(cluster.getServers()[s]); + ScopedDbConnection conn(server.toString()); - uassert(13678, str::stream() << "Could not communicate with server " << server.toString() << " in cluster " << cluster.toString() << " to change skew by " << *i, success ); + BSONObj result; + try { + bool success = conn->runCommand( + string("admin"), BSON("_skewClockCommand" << 1 << "skew" << *i), result); - LOG( logLvl + 1 ) << " Skewed host " << server << " clock by " << *i << endl; - } - catch(...) { - conn.done(); - throw; - } + uassert(13678, + str::stream() << "Could not communicate with server " << server.toString() + << " in cluster " << cluster.toString() + << " to change skew by " << *i, + success); + LOG(logLvl + 1) << " Skewed host " << server << " clock by " << *i << endl; + } catch (...) { conn.done(); - + throw; } + conn.done(); } - - // variables for test - boost::thread_specific_ptr<DistributedLock> lock; - AtomicUInt32 count; - AtomicWord<bool> keepGoing; - - private: - LegacyDistLockPinger _pinger; - }; - MONGO_INITIALIZER(RegisterDistLockWithSkewCmd)(InitializerContext* context) { - if (Command::testCommandsEnabled) { - // Leaked intentionally: a Command registers itself when constructed. - new TestDistLockWithSkew(); - } - return Status::OK(); } - /** - * Utility command to virtually skew the clock of a mongo server a particular amount. - * This skews the clock globally, per-thread skew is also possible. - */ - class SkewClockCommand: public Command { - public: - SkewClockCommand() : - Command("_skewClockCommand") { - } - virtual void help(stringstream& help) const { - help << "should not be calling this directly" << endl; - } - - virtual bool slaveOk() const { - return false; - } - virtual bool adminOnly() const { - return true; - } - virtual bool isWriteCommandForConfigServer() const { return false; } - // No auth needed because it only works when enabled via command line. - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) {} - - bool run(OperationContext* txn, - const string&, - BSONObj& cmdObj, - int, - string& errmsg, - BSONObjBuilder& result) { + // variables for test + boost::thread_specific_ptr<DistributedLock> lock; + AtomicUInt32 count; + AtomicWord<bool> keepGoing; + +private: + LegacyDistLockPinger _pinger; +}; +MONGO_INITIALIZER(RegisterDistLockWithSkewCmd)(InitializerContext* context) { + if (Command::testCommandsEnabled) { + // Leaked intentionally: a Command registers itself when constructed. + new TestDistLockWithSkew(); + } + return Status::OK(); +} - long long skew = (long long) number_field(cmdObj, "skew", 0); +/** + * Utility command to virtually skew the clock of a mongo server a particular amount. + * This skews the clock globally, per-thread skew is also possible. + */ +class SkewClockCommand : public Command { +public: + SkewClockCommand() : Command("_skewClockCommand") {} + virtual void help(stringstream& help) const { + help << "should not be calling this directly" << endl; + } - log() << "Adjusting jsTime() clock skew to " << skew << endl; + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual bool isWriteCommandForConfigServer() const { + return false; + } + // No auth needed because it only works when enabled via command line. + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) {} - jsTimeVirtualSkew( skew ); + bool run(OperationContext* txn, + const string&, + BSONObj& cmdObj, + int, + string& errmsg, + BSONObjBuilder& result) { + long long skew = (long long)number_field(cmdObj, "skew", 0); - log() << "JSTime adjusted, now is " << jsTime() << endl; + log() << "Adjusting jsTime() clock skew to " << skew << endl; - return true; + jsTimeVirtualSkew(skew); - } + log() << "JSTime adjusted, now is " << jsTime() << endl; - }; - MONGO_INITIALIZER(RegisterSkewClockCmd)(InitializerContext* context) { - if (Command::testCommandsEnabled) { - // Leaked intentionally: a Command registers itself when constructed. - new SkewClockCommand(); - } - return Status::OK(); + return true; } +}; +MONGO_INITIALIZER(RegisterSkewClockCmd)(InitializerContext* context) { + if (Command::testCommandsEnabled) { + // Leaked intentionally: a Command registers itself when constructed. + new SkewClockCommand(); + } + return Status::OK(); +} } - |