diff options
Diffstat (limited to 'src/mongo/db/repl/oplog.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 1329 |
1 files changed, 631 insertions, 698 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 26708ee8de7..2afa1b53c52 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -87,742 +87,679 @@ namespace mongo { - using std::endl; - using std::string; - using std::stringstream; +using std::endl; +using std::string; +using std::stringstream; namespace repl { - std::string rsOplogName = "local.oplog.rs"; - std::string masterSlaveOplogName = "local.oplog.$main"; - int OPLOG_VERSION = 2; +std::string rsOplogName = "local.oplog.rs"; +std::string masterSlaveOplogName = "local.oplog.$main"; +int OPLOG_VERSION = 2; namespace { - // cached copies of these...so don't rename them, drop them, etc.!!! - Database* _localDB = nullptr; - Collection* _localOplogCollection = nullptr; - - // Synchronizes the section where a new Timestamp is generated and when it actually - // appears in the oplog. - stdx::mutex newOpMutex; - stdx::condition_variable newTimestampNotifier; - - static std::string _oplogCollectionName; - - // so we can fail the same way - void checkOplogInsert( StatusWith<RecordId> result ) { - massert( 17322, - str::stream() << "write to oplog failed: " << result.getStatus().toString(), - result.isOK() ); - } +// cached copies of these...so don't rename them, drop them, etc.!!! +Database* _localDB = nullptr; +Collection* _localOplogCollection = nullptr; - /** - * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to - * reflect that new optime. Returns the new optime and the correct value of the "h" field for - * the new oplog entry. - * - * NOTE: From the time this function returns to the time that the new oplog entry is written - * to the storage system, all errors must be considered fatal. This is because the this - * function registers the new optime with the storage system and the replication coordinator, - * and provides no facility to revert those registrations on rollback. - */ - std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, - Collection* oplog, - const char* ns, - ReplicationCoordinator* replCoord, - const char* opstr) { - stdx::lock_guard<stdx::mutex> lk(newOpMutex); - Timestamp ts = getNextGlobalTimestamp(); - newTimestampNotifier.notify_all(); - - fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); - - long long hashNew = 0; - long long term = 0; - - // Set hash and term if we're in replset mode, otherwise they remain 0 in master/slave. - if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) { - // Current term. If we're not a replset of pv=1, it could be the default value (0) or - // the last valid term before downgrade. - term = ReplClientInfo::forClient(txn->getClient()).getTerm(); - - hashNew = BackgroundSync::get()->getLastAppliedHash(); - - // Check to make sure logOp() is legal at this point. - if (*opstr == 'n') { - // 'n' operations are always logged - invariant(*ns == '\0'); - // 'n' operations do not advance the hash, since they are not rolled back - } - else { - // Advance the hash - hashNew = (hashNew * 131 + ts.asLL()) * 17 + replCoord->getMyId(); +// Synchronizes the section where a new Timestamp is generated and when it actually +// appears in the oplog. +stdx::mutex newOpMutex; +stdx::condition_variable newTimestampNotifier; - BackgroundSync::get()->setLastAppliedHash(hashNew); - } - } +static std::string _oplogCollectionName; - OpTime opTime(ts, term); - replCoord->setMyLastOptime(opTime); - return std::pair<OpTime,long long>(opTime, hashNew); - } +// so we can fail the same way +void checkOplogInsert(StatusWith<RecordId> result) { + massert(17322, + str::stream() << "write to oplog failed: " << result.getStatus().toString(), + result.isOK()); +} - /** - * This allows us to stream the oplog entry directly into data region - * main goal is to avoid copying the o portion - * which can be very large - * TODO: can have this build the entire doc - */ - class OplogDocWriter : public DocWriter { - public: - OplogDocWriter( const BSONObj& frame, const BSONObj& oField ) - : _frame( frame ), _oField( oField ) { +/** + * Allocates an optime for a new entry in the oplog, and updates the replication coordinator to + * reflect that new optime. Returns the new optime and the correct value of the "h" field for + * the new oplog entry. + * + * NOTE: From the time this function returns to the time that the new oplog entry is written + * to the storage system, all errors must be considered fatal. This is because the this + * function registers the new optime with the storage system and the replication coordinator, + * and provides no facility to revert those registrations on rollback. + */ +std::pair<OpTime, long long> getNextOpTime(OperationContext* txn, + Collection* oplog, + const char* ns, + ReplicationCoordinator* replCoord, + const char* opstr) { + stdx::lock_guard<stdx::mutex> lk(newOpMutex); + Timestamp ts = getNextGlobalTimestamp(); + newTimestampNotifier.notify_all(); + + fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts)); + + long long hashNew = 0; + long long term = 0; + + // Set hash and term if we're in replset mode, otherwise they remain 0 in master/slave. + if (replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet) { + // Current term. If we're not a replset of pv=1, it could be the default value (0) or + // the last valid term before downgrade. + term = ReplClientInfo::forClient(txn->getClient()).getTerm(); + + hashNew = BackgroundSync::get()->getLastAppliedHash(); + + // Check to make sure logOp() is legal at this point. + if (*opstr == 'n') { + // 'n' operations are always logged + invariant(*ns == '\0'); + // 'n' operations do not advance the hash, since they are not rolled back + } else { + // Advance the hash + hashNew = (hashNew * 131 + ts.asLL()) * 17 + replCoord->getMyId(); + + BackgroundSync::get()->setLastAppliedHash(hashNew); } + } - ~OplogDocWriter(){} - - void writeDocument( char* start ) const { - char* buf = start; - - memcpy( buf, _frame.objdata(), _frame.objsize() - 1 ); // don't copy final EOO + OpTime opTime(ts, term); + replCoord->setMyLastOptime(opTime); + return std::pair<OpTime, long long>(opTime, hashNew); +} - reinterpret_cast<int*>( buf )[0] = documentSize(); +/** + * This allows us to stream the oplog entry directly into data region + * main goal is to avoid copying the o portion + * which can be very large + * TODO: can have this build the entire doc + */ +class OplogDocWriter : public DocWriter { +public: + OplogDocWriter(const BSONObj& frame, const BSONObj& oField) : _frame(frame), _oField(oField) {} - buf += ( _frame.objsize() - 1 ); - buf[0] = (char)Object; - buf[1] = 'o'; - buf[2] = 0; - memcpy( buf+3, _oField.objdata(), _oField.objsize() ); - buf += 3 + _oField.objsize(); - buf[0] = EOO; + ~OplogDocWriter() {} - verify( static_cast<size_t>( ( buf + 1 ) - start ) == documentSize() ); // DEV? - } + void writeDocument(char* start) const { + char* buf = start; - size_t documentSize() const { - return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */; - } + memcpy(buf, _frame.objdata(), _frame.objsize() - 1); // don't copy final EOO - private: - BSONObj _frame; - BSONObj _oField; - }; + reinterpret_cast<int*>(buf)[0] = documentSize(); -} // namespace + buf += (_frame.objsize() - 1); + buf[0] = (char)Object; + buf[1] = 'o'; + buf[2] = 0; + memcpy(buf + 3, _oField.objdata(), _oField.objsize()); + buf += 3 + _oField.objsize(); + buf[0] = EOO; - void setOplogCollectionName() { - if (getGlobalReplicationCoordinator()->getReplicationMode() == - ReplicationCoordinator::modeReplSet) { - _oplogCollectionName = rsOplogName; - } - else { - _oplogCollectionName = masterSlaveOplogName; - } + verify(static_cast<size_t>((buf + 1) - start) == documentSize()); // DEV? } - /* we write to local.oplog.rs: - { ts : ..., h: ..., v: ..., op: ..., etc } - ts: an OpTime timestamp - h: hash - v: version - op: - "i" insert - "u" update - "d" delete - "c" db cmd - "db" declares presence of a database (ns is set to the db name + '.') - "n" no op - - bb param: - if not null, specifies a boolean to pass along to the other side as b: param. - used for "justOne" or "upsert" flags on 'd', 'u' - - */ + size_t documentSize() const { + return _frame.objsize() + _oField.objsize() + 1 /* type */ + 2 /* "o" */; + } - void _logOp(OperationContext* txn, - const char *opstr, - const char *ns, - const BSONObj& obj, - BSONObj *o2, - bool fromMigrate) { - NamespaceString nss(ns); - if (nss.db() == "local") { - return; - } +private: + BSONObj _frame; + BSONObj _oField; +}; - if (nss.isSystemDotProfile()) { - return; - } +} // namespace - if (!getGlobalReplicationCoordinator()->isReplEnabled()) { - return; - } +void setOplogCollectionName() { + if (getGlobalReplicationCoordinator()->getReplicationMode() == + ReplicationCoordinator::modeReplSet) { + _oplogCollectionName = rsOplogName; + } else { + _oplogCollectionName = masterSlaveOplogName; + } +} + +/* we write to local.oplog.rs: + { ts : ..., h: ..., v: ..., op: ..., etc } + ts: an OpTime timestamp + h: hash + v: version + op: + "i" insert + "u" update + "d" delete + "c" db cmd + "db" declares presence of a database (ns is set to the db name + '.') + "n" no op + + bb param: + if not null, specifies a boolean to pass along to the other side as b: param. + used for "justOne" or "upsert" flags on 'd', 'u' - if (!txn->writesAreReplicated()) { - return; - } +*/ - fassert(28626, txn->recoveryUnit()); +void _logOp(OperationContext* txn, + const char* opstr, + const char* ns, + const BSONObj& obj, + BSONObj* o2, + bool fromMigrate) { + NamespaceString nss(ns); + if (nss.db() == "local") { + return; + } - Lock::DBLock lk(txn->lockState(), "local", MODE_IX); + if (nss.isSystemDotProfile()) { + return; + } - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + if (!getGlobalReplicationCoordinator()->isReplEnabled()) { + return; + } - if (ns[0] && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && - !replCoord->canAcceptWritesFor(nss)) { - severe() << "logOp() but can't accept write to collection " << ns; - fassertFailed(17405); - } - Lock::CollectionLock lk2(txn->lockState(), _oplogCollectionName, MODE_IX); + if (!txn->writesAreReplicated()) { + return; + } + fassert(28626, txn->recoveryUnit()); - if (_localOplogCollection == nullptr) { - OldClientContext ctx(txn, _oplogCollectionName); - _localDB = ctx.db(); - invariant(_localDB); - _localOplogCollection = _localDB->getCollection(_oplogCollectionName); - massert(13347, - "the oplog collection " + _oplogCollectionName + - " missing. did you drop it? if so, restart the server", - _localOplogCollection); - } + Lock::DBLock lk(txn->lockState(), "local", MODE_IX); - std::pair<OpTime, long long> slot = getNextOpTime(txn, - _localOplogCollection, - ns, - replCoord, - opstr); - - /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- - instead we do a single copy to the destination position in the memory mapped file. - */ - - BSONObjBuilder b(256); - b.append("ts", slot.first.getTimestamp()); - b.append("t", slot.first.getTerm()); - b.append("h", slot.second); - b.append("v", OPLOG_VERSION); - b.append("op", opstr); - b.append("ns", ns); - if (fromMigrate) { - b.appendBool("fromMigrate", true); - } + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if ( o2 ) { - b.append("o2", *o2); - } - BSONObj partial = b.done(); + if (ns[0] && replCoord->getReplicationMode() == ReplicationCoordinator::modeReplSet && + !replCoord->canAcceptWritesFor(nss)) { + severe() << "logOp() but can't accept write to collection " << ns; + fassertFailed(17405); + } + Lock::CollectionLock lk2(txn->lockState(), _oplogCollectionName, MODE_IX); - OplogDocWriter writer( partial, obj ); - checkOplogInsert( _localOplogCollection->insertDocument( txn, &writer, false ) ); - ReplClientInfo::forClient(txn->getClient()).setLastOp( slot.first ); + if (_localOplogCollection == nullptr) { + OldClientContext ctx(txn, _oplogCollectionName); + _localDB = ctx.db(); + invariant(_localDB); + _localOplogCollection = _localDB->getCollection(_oplogCollectionName); + massert(13347, + "the oplog collection " + _oplogCollectionName + + " missing. did you drop it? if so, restart the server", + _localOplogCollection); } - OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - - OpTime lastOptime; - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - lastOptime = replCoord->getMyLastOptime(); - invariant(!ops.empty()); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - - if ( _localOplogCollection == 0 ) { - OldClientContext ctx(txn, rsOplogName); - - _localDB = ctx.db(); - verify( _localDB ); - _localOplogCollection = _localDB->getCollection(rsOplogName); - massert(13389, - "local.oplog.rs missing. did you drop it? if so restart server", - _localOplogCollection); - } + std::pair<OpTime, long long> slot = + getNextOpTime(txn, _localOplogCollection, ns, replCoord, opstr); - OldClientContext ctx(txn, rsOplogName, _localDB); - WriteUnitOfWork wunit(txn); + /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- + instead we do a single copy to the destination position in the memory mapped file. + */ - for (std::deque<BSONObj>::const_iterator it = ops.begin(); - it != ops.end(); - ++it) { - const BSONObj& op = *it; - const OpTime optime = extractOpTime(op); + BSONObjBuilder b(256); + b.append("ts", slot.first.getTimestamp()); + b.append("t", slot.first.getTerm()); + b.append("h", slot.second); + b.append("v", OPLOG_VERSION); + b.append("op", opstr); + b.append("ns", ns); + if (fromMigrate) { + b.appendBool("fromMigrate", true); + } - checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false)); + if (o2) { + b.append("o2", *o2); + } + BSONObj partial = b.done(); - if (!(lastOptime < optime)) { - severe() << "replication oplog stream went back in time. " - "previous timestamp: " << lastOptime << " newest timestamp: " << optime - << ". Op being applied: " << op; - fassertFailedNoTrace(18905); - } - lastOptime = optime; - } - wunit.commit(); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns()); + OplogDocWriter writer(partial, obj); + checkOplogInsert(_localOplogCollection->insertDocument(txn, &writer, false)); - BackgroundSync* bgsync = BackgroundSync::get(); - // Keep this up-to-date, in case we step up to primary. - long long hash = ops.back()["h"].numberLong(); - bgsync->setLastAppliedHash(hash); + ReplClientInfo::forClient(txn->getClient()).setLastOp(slot.first); +} - return lastOptime; - } +OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - void createOplog(OperationContext* txn) { - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); + OpTime lastOptime; + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + lastOptime = replCoord->getMyLastOptime(); + invariant(!ops.empty()); + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - bool rs = !replSettings.replSet.empty(); + if (_localOplogCollection == 0) { + OldClientContext ctx(txn, rsOplogName); - OldClientContext ctx(txn, _oplogCollectionName); - Collection* collection = ctx.db()->getCollection( _oplogCollectionName ); + _localDB = ctx.db(); + verify(_localDB); + _localOplogCollection = _localDB->getCollection(rsOplogName); + massert(13389, + "local.oplog.rs missing. did you drop it? if so restart server", + _localOplogCollection); + } - if ( collection ) { + OldClientContext ctx(txn, rsOplogName, _localDB); + WriteUnitOfWork wunit(txn); - if (replSettings.oplogSize != 0) { - const CollectionOptions oplogOpts = - collection->getCatalogEntry()->getCollectionOptions(txn); + for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { + const BSONObj& op = *it; + const OpTime optime = extractOpTime(op); - int o = (int)(oplogOpts.cappedSize / ( 1024 * 1024 ) ); - int n = (int)(replSettings.oplogSize / (1024 * 1024)); - if ( n != o ) { - stringstream ss; - ss << "cmdline oplogsize (" << n << ") different than existing (" << o << ") see: http://dochub.mongodb.org/core/increase-oplog"; - log() << ss.str() << endl; - throw UserException( 13257 , ss.str() ); - } - } + checkOplogInsert(_localOplogCollection->insertDocument(txn, op, false)); - if ( !rs ) - initTimestampFromOplog(txn, _oplogCollectionName); - return; + if (!(lastOptime < optime)) { + severe() << "replication oplog stream went back in time. " + "previous timestamp: " << lastOptime << " newest timestamp: " << optime + << ". Op being applied: " << op; + fassertFailedNoTrace(18905); + } + lastOptime = optime; } - - /* create an oplog collection, if it doesn't yet exist. */ - long long sz = 0; - if ( replSettings.oplogSize != 0 ) { - sz = replSettings.oplogSize; + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns()); + + BackgroundSync* bgsync = BackgroundSync::get(); + // Keep this up-to-date, in case we step up to primary. + long long hash = ops.back()["h"].numberLong(); + bgsync->setLastAppliedHash(hash); + + return lastOptime; +} + +void createOplog(OperationContext* txn) { + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + + const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + bool rs = !replSettings.replSet.empty(); + + OldClientContext ctx(txn, _oplogCollectionName); + Collection* collection = ctx.db()->getCollection(_oplogCollectionName); + + if (collection) { + if (replSettings.oplogSize != 0) { + const CollectionOptions oplogOpts = + collection->getCatalogEntry()->getCollectionOptions(txn); + + int o = (int)(oplogOpts.cappedSize / (1024 * 1024)); + int n = (int)(replSettings.oplogSize / (1024 * 1024)); + if (n != o) { + stringstream ss; + ss << "cmdline oplogsize (" << n << ") different than existing (" << o + << ") see: http://dochub.mongodb.org/core/increase-oplog"; + log() << ss.str() << endl; + throw UserException(13257, ss.str()); + } } - else { - /* not specified. pick a default size */ - sz = 50LL * 1024LL * 1024LL; - if ( sizeof(int *) >= 8 ) { + + if (!rs) + initTimestampFromOplog(txn, _oplogCollectionName); + return; + } + + /* create an oplog collection, if it doesn't yet exist. */ + long long sz = 0; + if (replSettings.oplogSize != 0) { + sz = replSettings.oplogSize; + } else { + /* not specified. pick a default size */ + sz = 50LL * 1024LL * 1024LL; + if (sizeof(int*) >= 8) { #if defined(__APPLE__) - // typically these are desktops (dev machines), so keep it smallish - sz = (256-64) * 1024 * 1024; + // typically these are desktops (dev machines), so keep it smallish + sz = (256 - 64) * 1024 * 1024; #else - sz = 990LL * 1024 * 1024; - double free = - File::freeSpace(storageGlobalParams.dbpath); //-1 if call not supported. - long long fivePct = static_cast<long long>( free * 0.05 ); - if ( fivePct > sz ) - sz = fivePct; - // we use 5% of free space up to 50GB (1TB free) - static long long upperBound = 50LL * 1024 * 1024 * 1024; - if (fivePct > upperBound) - sz = upperBound; + sz = 990LL * 1024 * 1024; + double free = File::freeSpace(storageGlobalParams.dbpath); //-1 if call not supported. + long long fivePct = static_cast<long long>(free * 0.05); + if (fivePct > sz) + sz = fivePct; + // we use 5% of free space up to 50GB (1TB free) + static long long upperBound = 50LL * 1024 * 1024 * 1024; + if (fivePct > upperBound) + sz = upperBound; #endif - } } - - log() << "******" << endl; - log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB..." << endl; - - CollectionOptions options; - options.capped = true; - options.cappedSize = sz; - options.autoIndexId = CollectionOptions::NO; - - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork uow( txn ); - invariant(ctx.db()->createCollection(txn, _oplogCollectionName, options)); - if( !rs ) - getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj()); - uow.commit(); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", _oplogCollectionName); - - /* sync here so we don't get any surprising lag later when we try to sync */ - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - storageEngine->flushAllFiles(true); - log() << "******" << endl; } - // ------------------------------------- + log() << "******" << endl; + log() << "creating replication oplog of size: " << (int)(sz / (1024 * 1024)) << "MB..." << endl; -namespace { - NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) { - BSONElement first = cmdObj.firstElement(); - uassert(28635, - "no collection name specified", - first.canonicalType() == canonicalizeBSONType(mongo::String) - && first.valuestrsize() > 0); - std::string coll = first.valuestr(); - return NamespaceString(NamespaceString(ns).db().toString(), coll); + CollectionOptions options; + options.capped = true; + options.cappedSize = sz; + options.autoIndexId = CollectionOptions::NO; + + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork uow(txn); + invariant(ctx.db()->createCollection(txn, _oplogCollectionName, options)); + if (!rs) + getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj()); + uow.commit(); } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", _oplogCollectionName); - using OpApplyFn = stdx::function<Status (OperationContext*, const char*, BSONObj&)>; + /* sync here so we don't get any surprising lag later when we try to sync */ + StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + storageEngine->flushAllFiles(true); + log() << "******" << endl; +} - struct ApplyOpMetadata { - OpApplyFn applyFunc; - std::set<ErrorCodes::Error> acceptableErrors; +// ------------------------------------- - ApplyOpMetadata(OpApplyFn fun) { - applyFunc = fun; - } +namespace { +NamespaceString parseNs(const string& ns, const BSONObj& cmdObj) { + BSONElement first = cmdObj.firstElement(); + uassert(28635, + "no collection name specified", + first.canonicalType() == canonicalizeBSONType(mongo::String) && + first.valuestrsize() > 0); + std::string coll = first.valuestr(); + return NamespaceString(NamespaceString(ns).db().toString(), coll); +} + +using OpApplyFn = stdx::function<Status(OperationContext*, const char*, BSONObj&)>; + +struct ApplyOpMetadata { + OpApplyFn applyFunc; + std::set<ErrorCodes::Error> acceptableErrors; + + ApplyOpMetadata(OpApplyFn fun) { + applyFunc = fun; + } - ApplyOpMetadata(OpApplyFn fun, std::set<ErrorCodes::Error> theAcceptableErrors) { - applyFunc = fun; - acceptableErrors = theAcceptableErrors; + ApplyOpMetadata(OpApplyFn fun, std::set<ErrorCodes::Error> theAcceptableErrors) { + applyFunc = fun; + acceptableErrors = theAcceptableErrors; + } +}; + +std::map<std::string, ApplyOpMetadata> opsMap = { + {"create", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) + -> Status { return createCollection(txn, NamespaceString(ns).db().toString(), cmd); }, + {ErrorCodes::NamespaceExists}}}, + {"collMod", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + }}}, + {"dropDatabase", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) + -> Status { return dropDatabase(txn, NamespaceString(ns).db().toString()); }, + {ErrorCodes::DatabaseNotFound}}}, + {"drop", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return dropCollection(txn, parseNs(ns, cmd), resultWeDontCareAbout); + }, + // IllegalOperation is necessary because in 3.0 we replicate drops of system.profile + // TODO(dannenberg) remove IllegalOperation once we no longer need 3.0 compatibility + {ErrorCodes::NamespaceNotFound, ErrorCodes::IllegalOperation}}}, + // deleteIndex(es) is deprecated but still works as of April 10, 2015 + {"deleteIndex", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + }, + {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, + {"deleteIndexes", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + }, + {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, + {"dropIndex", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + }, + {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, + {"dropIndexes", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); + }, + {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound}}}, + {"renameCollection", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + return renameCollection(txn, + NamespaceString(cmd.firstElement().valuestrsafe()), + NamespaceString(cmd["to"].valuestrsafe()), + cmd["stayTemp"].trueValue(), + cmd["dropTarget"].trueValue()); + }, + {ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists}}}, + {"applyOps", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { + BSONObjBuilder resultWeDontCareAbout; + return applyOps(txn, nsToDatabase(ns), cmd, &resultWeDontCareAbout); + }, + {ErrorCodes::UnknownError}}}, + {"convertToCapped", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) + -> Status { return convertToCapped(txn, parseNs(ns, cmd), cmd["size"].number()); }}}, + {"emptycapped", + {[](OperationContext* txn, const char* ns, BSONObj& cmd) + -> Status { return emptyCapped(txn, parseNs(ns, cmd)); }}}, +}; + +} // namespace + +// @return failure status if an update should have happened and the document DNE. +// See replset initial sync code. +Status applyOperation_inlock(OperationContext* txn, + Database* db, + const BSONObj& op, + bool convertUpdateToUpsert) { + LOG(3) << "applying op: " << op << endl; + + OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters; + + const char* names[] = {"o", "ns", "op", "b", "o2"}; + BSONElement fields[5]; + op.getFields(5, names, fields); + BSONElement& fieldO = fields[0]; + BSONElement& fieldNs = fields[1]; + BSONElement& fieldOp = fields[2]; + BSONElement& fieldB = fields[3]; + BSONElement& fieldO2 = fields[4]; + + BSONObj o; + if (fieldO.isABSONObj()) + o = fieldO.embeddedObject(); + + const char* ns = fieldNs.valuestrsafe(); + + BSONObj o2; + if (fieldO2.isABSONObj()) + o2 = fieldO2.Obj(); + + bool valueB = fieldB.booleanSafe(); + + if (nsIsFull(ns)) { + if (supportsDocLocking()) { + // WiredTiger, and others requires MODE_IX since the applier threads driving + // this allow writes to the same collection on any thread. + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX)); + } else { + // mmapV1 ensures that all operations to the same collection are executed from + // the same worker thread, so it takes an exclusive lock (MODE_X) + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); } - }; - - std::map<std::string, ApplyOpMetadata> opsMap = { - {"create", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return createCollection(txn, NamespaceString(ns).db().toString(), cmd); - }, - {ErrorCodes::NamespaceExists} - } - }, - {"collMod", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return collMod(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); - } - } - }, - {"dropDatabase", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return dropDatabase(txn, NamespaceString(ns).db().toString()); - }, - {ErrorCodes::DatabaseNotFound} - } - }, - {"drop", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return dropCollection(txn, parseNs(ns, cmd), resultWeDontCareAbout); - }, - // IllegalOperation is necessary because in 3.0 we replicate drops of system.profile - // TODO(dannenberg) remove IllegalOperation once we no longer need 3.0 compatibility - {ErrorCodes::NamespaceNotFound, ErrorCodes::IllegalOperation} - } - }, - // deleteIndex(es) is deprecated but still works as of April 10, 2015 - {"deleteIndex", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); - }, - {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound} - } - }, - {"deleteIndexes", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); - }, - {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound} - } - }, - {"dropIndex", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); - }, - {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound} - } - }, - {"dropIndexes", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return dropIndexes(txn, parseNs(ns, cmd), cmd, &resultWeDontCareAbout); - }, - {ErrorCodes::NamespaceNotFound, ErrorCodes::IndexNotFound} - } - }, - {"renameCollection", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return renameCollection(txn, - NamespaceString(cmd.firstElement().valuestrsafe()), - NamespaceString(cmd["to"].valuestrsafe()), - cmd["stayTemp"].trueValue(), - cmd["dropTarget"].trueValue()); - }, - {ErrorCodes::NamespaceNotFound, ErrorCodes::NamespaceExists} - } - }, - {"applyOps", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - BSONObjBuilder resultWeDontCareAbout; - return applyOps(txn, nsToDatabase(ns), cmd, &resultWeDontCareAbout); - }, - {ErrorCodes::UnknownError} - } - }, - {"convertToCapped", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return convertToCapped(txn, - parseNs(ns, cmd), - cmd["size"].number()); - } - } - }, - {"emptycapped", - { - [](OperationContext* txn, const char* ns, BSONObj& cmd) -> Status { - return emptyCapped(txn, parseNs(ns, cmd)); - } - } - }, - }; - -} // namespace - - // @return failure status if an update should have happened and the document DNE. - // See replset initial sync code. - Status applyOperation_inlock(OperationContext* txn, - Database* db, - const BSONObj& op, - bool convertUpdateToUpsert) { - LOG(3) << "applying op: " << op << endl; - - OpCounters * opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters; - - const char *names[] = { "o", "ns", "op", "b", "o2" }; - BSONElement fields[5]; - op.getFields(5, names, fields); - BSONElement& fieldO = fields[0]; - BSONElement& fieldNs = fields[1]; - BSONElement& fieldOp = fields[2]; - BSONElement& fieldB = fields[3]; - BSONElement& fieldO2 = fields[4]; - - BSONObj o; - if( fieldO.isABSONObj() ) - o = fieldO.embeddedObject(); - - const char *ns = fieldNs.valuestrsafe(); - - BSONObj o2; - if (fieldO2.isABSONObj()) - o2 = fieldO2.Obj(); - - bool valueB = fieldB.booleanSafe(); - - if (nsIsFull(ns)) { - if (supportsDocLocking()) { - // WiredTiger, and others requires MODE_IX since the applier threads driving - // this allow writes to the same collection on any thread. - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_IX)); + } + Collection* collection = db->getCollection(ns); + IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); + + // operation type -- see logOp() comments for types + const char* opType = fieldOp.valuestrsafe(); + invariant(*opType != 'c'); // commands are processed in applyCommand_inlock() + + if (*opType == 'i') { + opCounters->gotInsert(); + + const char* p = strchr(ns, '.'); + if (p && nsToCollectionSubstring(p) == "system.indexes") { + if (o["background"].trueValue()) { + IndexBuilder* builder = new IndexBuilder(o); + // This spawns a new thread and returns immediately. + builder->go(); + // Wait for thread to start and register itself + Lock::TempRelease release(txn->lockState()); + IndexBuilder::waitForBgIndexStarting(); } else { - // mmapV1 ensures that all operations to the same collection are executed from - // the same worker thread, so it takes an exclusive lock (MODE_X) - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + IndexBuilder builder(o); + Status status = builder.buildInForeground(txn, db); + uassertStatusOK(status); } - } - Collection* collection = db->getCollection( ns ); - IndexCatalog* indexCatalog = collection == nullptr ? nullptr : collection->getIndexCatalog(); - - // operation type -- see logOp() comments for types - const char *opType = fieldOp.valuestrsafe(); - invariant(*opType != 'c'); // commands are processed in applyCommand_inlock() - - if ( *opType == 'i' ) { - opCounters->gotInsert(); - - const char *p = strchr(ns, '.'); - if ( p && nsToCollectionSubstring( p ) == "system.indexes" ) { - if (o["background"].trueValue()) { - IndexBuilder* builder = new IndexBuilder(o); - // This spawns a new thread and returns immediately. - builder->go(); - // Wait for thread to start and register itself - Lock::TempRelease release(txn->lockState()); - IndexBuilder::waitForBgIndexStarting(); - } - else { - IndexBuilder builder(o); - Status status = builder.buildInForeground(txn, db); - uassertStatusOK(status); - } - } - else { - // do upserts for inserts as we might get replayed more than once - OpDebug debug; - - uassert(ErrorCodes::NamespaceNotFound, str::stream() << - "Failed to apply insert due to missing collection: " << op.toString(), - collection); - - // No _id. - // This indicates an issue with the upstream server: - // The oplog entry is corrupted; or - // The version of the upstream server is obsolete. - uassert(ErrorCodes::NoSuchKey, str::stream() << - "Failed to apply insert due to missing _id: " << op.toString(), - o.hasField("_id")); - - // TODO: It may be better to do an insert here, and then catch the duplicate - // key exception and do update then. Very few upserts will not be inserts... - BSONObjBuilder b; - b.append(o.getField("_id")); - - const NamespaceString requestNs(ns); - UpdateRequest request(requestNs); - - request.setQuery(b.done()); - request.setUpdates(o); - request.setUpsert(); - UpdateLifecycleImpl updateLifecycle(true, requestNs); - request.setLifecycle(&updateLifecycle); - - update(txn, db, request, &debug); - } - } - else if ( *opType == 'u' ) { - opCounters->gotUpdate(); - + } else { + // do upserts for inserts as we might get replayed more than once OpDebug debug; - BSONObj updateCriteria = o2; - const bool upsert = valueB || convertUpdateToUpsert; - uassert(ErrorCodes::NoSuchKey, str::stream() << - "Failed to apply update due to missing _id: " << op.toString(), - updateCriteria.hasField("_id")); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Failed to apply insert due to missing collection: " + << op.toString(), + collection); + + // No _id. + // This indicates an issue with the upstream server: + // The oplog entry is corrupted; or + // The version of the upstream server is obsolete. + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Failed to apply insert due to missing _id: " << op.toString(), + o.hasField("_id")); + + // TODO: It may be better to do an insert here, and then catch the duplicate + // key exception and do update then. Very few upserts will not be inserts... + BSONObjBuilder b; + b.append(o.getField("_id")); const NamespaceString requestNs(ns); UpdateRequest request(requestNs); - request.setQuery(updateCriteria); + request.setQuery(b.done()); request.setUpdates(o); - request.setUpsert(upsert); + request.setUpsert(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); - UpdateResult ur = update(txn, db, request, &debug); - - if( ur.numMatched == 0 ) { - if( ur.modifiers ) { - if( updateCriteria.nFields() == 1 ) { - // was a simple { _id : ... } update criteria - string msg = str::stream() << "failed to apply update: " << op.toString(); - error() << msg; - return Status(ErrorCodes::OperationFailed, msg); - } - // Need to check to see if it isn't present so we can exit early with a - // failure. Note that adds some overhead for this extra check in some cases, - // such as an updateCriteria - // of the form - // { _id:..., { x : {$size:...} } - // thus this is not ideal. - if (collection == NULL || - (indexCatalog->haveIdIndex(txn) && - Helpers::findById(txn, collection, updateCriteria).isNull()) || - // capped collections won't have an _id index - (!indexCatalog->haveIdIndex(txn) && - Helpers::findOne(txn, collection, updateCriteria, false).isNull())) { - string msg = str::stream() << "couldn't find doc: " << op.toString(); - error() << msg; - return Status(ErrorCodes::OperationFailed, msg); - } - - // Otherwise, it's present; zero objects were updated because of additional specifiers - // in the query for idempotence + update(txn, db, request, &debug); + } + } else if (*opType == 'u') { + opCounters->gotUpdate(); + + OpDebug debug; + BSONObj updateCriteria = o2; + const bool upsert = valueB || convertUpdateToUpsert; + + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Failed to apply update due to missing _id: " << op.toString(), + updateCriteria.hasField("_id")); + + const NamespaceString requestNs(ns); + UpdateRequest request(requestNs); + + request.setQuery(updateCriteria); + request.setUpdates(o); + request.setUpsert(upsert); + UpdateLifecycleImpl updateLifecycle(true, requestNs); + request.setLifecycle(&updateLifecycle); + + UpdateResult ur = update(txn, db, request, &debug); + + if (ur.numMatched == 0) { + if (ur.modifiers) { + if (updateCriteria.nFields() == 1) { + // was a simple { _id : ... } update criteria + string msg = str::stream() << "failed to apply update: " << op.toString(); + error() << msg; + return Status(ErrorCodes::OperationFailed, msg); } - else { - // this could happen benignly on an oplog duplicate replay of an upsert - // (because we are idempotent), - // if an regular non-mod update fails the item is (presumably) missing. - if( !upsert ) { - string msg = str::stream() << "update of non-mod failed: " << op.toString(); - error() << msg; - return Status(ErrorCodes::OperationFailed, msg); - } + // Need to check to see if it isn't present so we can exit early with a + // failure. Note that adds some overhead for this extra check in some cases, + // such as an updateCriteria + // of the form + // { _id:..., { x : {$size:...} } + // thus this is not ideal. + if (collection == NULL || + (indexCatalog->haveIdIndex(txn) && + Helpers::findById(txn, collection, updateCriteria).isNull()) || + // capped collections won't have an _id index + (!indexCatalog->haveIdIndex(txn) && + Helpers::findOne(txn, collection, updateCriteria, false).isNull())) { + string msg = str::stream() << "couldn't find doc: " << op.toString(); + error() << msg; + return Status(ErrorCodes::OperationFailed, msg); } - } - } - else if ( *opType == 'd' ) { - opCounters->gotDelete(); - - uassert(ErrorCodes::NoSuchKey, str::stream() << - "Failed to apply delete due to missing _id: " << op.toString(), - o.hasField("_id")); - if (opType[1] == 0) { - deleteObjects(txn, db, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB); + // Otherwise, it's present; zero objects were updated because of additional specifiers + // in the query for idempotence + } else { + // this could happen benignly on an oplog duplicate replay of an upsert + // (because we are idempotent), + // if an regular non-mod update fails the item is (presumably) missing. + if (!upsert) { + string msg = str::stream() << "update of non-mod failed: " << op.toString(); + error() << msg; + return Status(ErrorCodes::OperationFailed, msg); + } } - else - verify( opType[1] == 'b' ); // "db" advertisement - } - else if ( *opType == 'n' ) { - // no op } - else { - throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) ); - } - - // AuthorizationManager's logOp method registers a RecoveryUnit::Change - // and to do so we need to have begun a UnitOfWork - WriteUnitOfWork wuow(txn); - getGlobalAuthorizationManager()->logOp( - txn, - opType, - ns, - o, - fieldO2.isABSONObj() ? &o2 : NULL); - wuow.commit(); - - return Status::OK(); + } else if (*opType == 'd') { + opCounters->gotDelete(); + + uassert(ErrorCodes::NoSuchKey, + str::stream() << "Failed to apply delete due to missing _id: " << op.toString(), + o.hasField("_id")); + + if (opType[1] == 0) { + deleteObjects(txn, db, ns, o, PlanExecutor::YIELD_MANUAL, /*justOne*/ valueB); + } else + verify(opType[1] == 'b'); // "db" advertisement + } else if (*opType == 'n') { + // no op + } else { + throw MsgAssertionException(14825, + ErrorMsg("error in applyOperation : unknown opType ", *opType)); } - Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) { - const char *names[] = { "o", "ns", "op" }; - BSONElement fields[3]; - op.getFields(3, names, fields); - BSONElement& fieldO = fields[0]; - BSONElement& fieldNs = fields[1]; - BSONElement& fieldOp = fields[2]; - - const char* opType = fieldOp.valuestrsafe(); - invariant(*opType == 'c'); // only commands are processed here + // AuthorizationManager's logOp method registers a RecoveryUnit::Change + // and to do so we need to have begun a UnitOfWork + WriteUnitOfWork wuow(txn); + getGlobalAuthorizationManager()->logOp(txn, opType, ns, o, fieldO2.isABSONObj() ? &o2 : NULL); + wuow.commit(); + + return Status::OK(); +} + +Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) { + const char* names[] = {"o", "ns", "op"}; + BSONElement fields[3]; + op.getFields(3, names, fields); + BSONElement& fieldO = fields[0]; + BSONElement& fieldNs = fields[1]; + BSONElement& fieldOp = fields[2]; + + const char* opType = fieldOp.valuestrsafe(); + invariant(*opType == 'c'); // only commands are processed here + + BSONObj o; + if (fieldO.isABSONObj()) { + o = fieldO.embeddedObject(); + } - BSONObj o; - if (fieldO.isABSONObj()) { - o = fieldO.embeddedObject(); - } + const char* ns = fieldNs.valuestrsafe(); - const char* ns = fieldNs.valuestrsafe(); + // Applying commands in repl is done under Global W-lock, so it is safe to not + // perform the current DB checks after reacquiring the lock. + invariant(txn->lockState()->isW()); - // Applying commands in repl is done under Global W-lock, so it is safe to not - // perform the current DB checks after reacquiring the lock. - invariant(txn->lockState()->isW()); - - bool done = false; + bool done = false; - while (!done) { - ApplyOpMetadata curOpToApply = opsMap.find(o.firstElementFieldName())->second; - Status status = Status::OK(); - try { - status = curOpToApply.applyFunc(txn, ns, o); - } - catch (...) { - status = exceptionToStatus(); - } - switch (status.code()) { + while (!done) { + ApplyOpMetadata curOpToApply = opsMap.find(o.firstElementFieldName())->second; + Status status = Status::OK(); + try { + status = curOpToApply.applyFunc(txn, ns, o); + } catch (...) { + status = exceptionToStatus(); + } + switch (status.code()) { case ErrorCodes::WriteConflict: { // Need to throw this up to a higher level where it will be caught and the // operation retried. @@ -848,69 +785,65 @@ namespace { if (_oplogCollectionName == masterSlaveOplogName) { error() << "Failed command " << o << " on " << nsToDatabaseSubstring(ns) << " with status " << status << " during oplog application"; - } - else if (curOpToApply.acceptableErrors.find(status.code()) - == curOpToApply.acceptableErrors.end()) { + } else if (curOpToApply.acceptableErrors.find(status.code()) == + curOpToApply.acceptableErrors.end()) { error() << "Failed command " << o << " on " << nsToDatabaseSubstring(ns) << " with status " << status << " during oplog application"; return status; } - // fallthrough + // fallthrough case ErrorCodes::OK: done = true; break; - } } - - // AuthorizationManager's logOp method registers a RecoveryUnit::Change - // and to do so we need to have begun a UnitOfWork - WriteUnitOfWork wuow(txn); - getGlobalAuthorizationManager()->logOp(txn, opType, ns, o, nullptr); - wuow.commit(); - - return Status::OK(); } - void waitUpToOneSecondForTimestampChange(const Timestamp& referenceTime) { - stdx::unique_lock<stdx::mutex> lk(newOpMutex); + // AuthorizationManager's logOp method registers a RecoveryUnit::Change + // and to do so we need to have begun a UnitOfWork + WriteUnitOfWork wuow(txn); + getGlobalAuthorizationManager()->logOp(txn, opType, ns, o, nullptr); + wuow.commit(); - while (referenceTime == getLastSetTimestamp()) { - if (!newTimestampNotifier.timed_wait(lk, boost::posix_time::seconds(1))) - return; - } - } + return Status::OK(); +} - void setNewTimestamp(const Timestamp& newTime) { - stdx::lock_guard<stdx::mutex> lk(newOpMutex); - setGlobalTimestamp(newTime); - newTimestampNotifier.notify_all(); - } +void waitUpToOneSecondForTimestampChange(const Timestamp& referenceTime) { + stdx::unique_lock<stdx::mutex> lk(newOpMutex); - OpTime extractOpTime(const BSONObj& op) { - const Timestamp ts = op["ts"].timestamp(); - const long long term = op["t"].numberLong(); // Default to 0 if it's absent - return OpTime(ts, term); + while (referenceTime == getLastSetTimestamp()) { + if (!newTimestampNotifier.timed_wait(lk, boost::posix_time::seconds(1))) + return; } - - void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { - DBDirectClient c(txn); - BSONObj lastOp = c.findOne(oplogNS, - Query().sort(reverseNaturalObj), - NULL, - QueryOption_SlaveOk); - - if (!lastOp.isEmpty()) { - LOG(1) << "replSet setting last Timestamp"; - setNewTimestamp(lastOp[ "ts" ].timestamp()); - } +} + +void setNewTimestamp(const Timestamp& newTime) { + stdx::lock_guard<stdx::mutex> lk(newOpMutex); + setGlobalTimestamp(newTime); + newTimestampNotifier.notify_all(); +} + +OpTime extractOpTime(const BSONObj& op) { + const Timestamp ts = op["ts"].timestamp(); + const long long term = op["t"].numberLong(); // Default to 0 if it's absent + return OpTime(ts, term); +} + +void initTimestampFromOplog(OperationContext* txn, const std::string& oplogNS) { + DBDirectClient c(txn); + BSONObj lastOp = c.findOne(oplogNS, Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk); + + if (!lastOp.isEmpty()) { + LOG(1) << "replSet setting last Timestamp"; + setNewTimestamp(lastOp["ts"].timestamp()); } +} - void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { - invariant(txn->lockState()->isW()); +void oplogCheckCloseDatabase(OperationContext* txn, Database* db) { + invariant(txn->lockState()->isW()); - _localDB = nullptr; - _localOplogCollection = nullptr; - } + _localDB = nullptr; + _localOplogCollection = nullptr; +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |