diff options
27 files changed, 384 insertions, 362 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index b78849189c4..5b3b79bec1f 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -1004,8 +1004,6 @@ static void shutdownTask() { getGlobalServiceContext()->setKillAllOperations(); - repl::getGlobalReplicationCoordinator()->shutdown(); - Client& client = cc(); ServiceContext::UniqueOperationContext uniqueTxn; OperationContext* txn = client.getOperationContext(); @@ -1014,6 +1012,8 @@ static void shutdownTask() { txn = uniqueTxn.get(); } + repl::ReplicationCoordinator::get(txn)->shutdown(txn); + ShardingState::get(txn)->shutDown(txn); // We should always be able to acquire the global lock at shutdown. diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 9aecb8d1d94..c10e88e686f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -113,6 +113,9 @@ bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching( std::unique_ptr<ThreadPool> makeThreadPool() { ThreadPool::Options threadPoolOptions; threadPoolOptions.poolName = "rsBackgroundSync"; + threadPoolOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; return stdx::make_unique<ThreadPool>(threadPoolOptions); } @@ -163,12 +166,12 @@ BackgroundSync::BackgroundSync(std::unique_ptr<OplogBuffer> oplogBuffer) bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get()); } -void BackgroundSync::shutdown() { +void BackgroundSync::shutdown(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lock(_mutex); // Clear the buffer in case the producerThread is waiting in push() due to a full queue. invariant(inShutdown()); - clearBuffer(); + clearBuffer(txn); _stopped = true; if (_oplogFetcher) { @@ -181,12 +184,18 @@ void BackgroundSync::producerThread( Client::initThread("rsBackgroundSync"); AuthorizationSession::get(cc())->grantInternalAuthorization(); - _oplogBuffer->startup(); + { + auto txn = cc().makeOperationContext(); + _oplogBuffer->startup(txn.get()); + } + _threadPoolTaskExecutor.startup(); ON_BLOCK_EXIT([this]() { _threadPoolTaskExecutor.shutdown(); _threadPoolTaskExecutor.join(); - _oplogBuffer->shutdown(); + + auto txn = cc().makeOperationContext(); + _oplogBuffer->shutdown(txn.get()); }); while (!inShutdown()) { @@ -205,13 +214,13 @@ void BackgroundSync::producerThread( stop(); } -void BackgroundSync::_signalNoNewDataForApplier() { +void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) { // Signal to consumers that we have entered the stopped state // if the signal isn't already in the queue. - const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed(); + const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed(txn); if (!lastObjectPushed || !lastObjectPushed->isEmpty()) { const BSONObj sentinelDoc; - _oplogBuffer->pushEvenIfFull(sentinelDoc); + _oplogBuffer->pushEvenIfFull(txn, sentinelDoc); bufferCountGauge.increment(); bufferSizeGauge.increment(sentinelDoc.objsize()); } @@ -226,7 +235,8 @@ void BackgroundSync::_producerThread( stop(); } if (_replCoord->isWaitingForApplierToDrain()) { - _signalNoNewDataForApplier(); + auto txn = cc().makeOperationContext(); + _signalNoNewDataForApplier(txn.get()); } sleepsecs(1); return; @@ -246,13 +256,12 @@ void BackgroundSync::_producerThread( } // we want to start when we're no longer primary // start() also loads _lastOpTimeFetched, which we know is set from the "if" - const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext(); - OperationContext& txn = *txnPtr; + auto txn = cc().makeOperationContext(); if (isStopped()) { - start(&txn); + start(txn.get()); } - _produce(&txn, replicationCoordinatorExternalState); + _produce(txn.get(), replicationCoordinatorExternalState); } void BackgroundSync::_produce( @@ -484,6 +493,8 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info, Milliseconds getMoreElapsed) { + auto txn = cc().makeOperationContext(); + // If this is the first batch of operations returned from the query, "toApplyDocumentCount" will // be one fewer than "networkDocumentCount" because the first document (which was applied // previously) is skipped. @@ -493,14 +504,14 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, } // Wait for enough space. - _oplogBuffer->waitForSpace(info.toApplyDocumentBytes); + _oplogBuffer->waitForSpace(txn.get(), info.toApplyDocumentBytes); OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes"; + LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize(txn.get()) << " bytes"; } // Buffer docs for later application. - _oplogBuffer->pushAllNonBlocking(begin, end); + _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end); // Update last fetched info. { @@ -513,21 +524,21 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin, _recordStats(info, getMoreElapsed); } -bool BackgroundSync::peek(BSONObj* op) { - return _oplogBuffer->peek(op); +bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) { + return _oplogBuffer->peek(txn, op); } -void BackgroundSync::waitForMore() { +void BackgroundSync::waitForMore(OperationContext* txn) { BSONObj op; // Block for one second before timing out. // Ignore the value of the op we peeked at. - _oplogBuffer->blockingPeek(&op, Seconds(1)); + _oplogBuffer->blockingPeek(txn, &op, Seconds(1)); } -void BackgroundSync::consume() { +void BackgroundSync::consume(OperationContext* txn) { // this is just to get the op off the queue, it's been peeked at // and queued for application already - BSONObj op = _oplogBuffer->blockingPop(); + BSONObj op = _oplogBuffer->blockingPop(txn); bufferCountGauge.decrement(1); bufferSizeGauge.decrement(getSize(op)); } @@ -543,11 +554,11 @@ void BackgroundSync::_rollback(OperationContext* txn, _replCoord); if (status.isOK()) { // When the syncTail thread sees there is no new data by adding something to the buffer. - _signalNoNewDataForApplier(); + _signalNoNewDataForApplier(txn); // Wait until the buffer is empty. // This is an indication that syncTail has removed the sentinal marker from the buffer // and reset its local lastAppliedOpTime via the replCoord. - while (!_oplogBuffer->isEmpty()) { + while (!_oplogBuffer->isEmpty(txn)) { sleepmillis(10); if (inShutdown()) { return; @@ -603,7 +614,7 @@ void BackgroundSync::stop() { } void BackgroundSync::start(OperationContext* txn) { - massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty()); + massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty(txn)); long long lastFetchedHash = _readLastAppliedHash(txn); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -621,8 +632,8 @@ bool BackgroundSync::isStopped() const { return _stopped; } -void BackgroundSync::clearBuffer() { - _oplogBuffer->clear(); +void BackgroundSync::clearBuffer(OperationContext* txn) { + _oplogBuffer->clear(txn); const auto count = bufferCountGauge.get(); bufferCountGauge.decrement(count); const auto size = bufferSizeGauge.get(); @@ -690,8 +701,8 @@ bool BackgroundSync::shouldStopFetching() const { return false; } -void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) { - _oplogBuffer->push(op); +void BackgroundSync::pushTestOpToBuffer(OperationContext* txn, const BSONObj& op) { + _oplogBuffer->push(txn, op); bufferCountGauge.increment(); bufferSizeGauge.increment(op.objsize()); } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 817746350d8..a62fa90c69b 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -68,7 +68,7 @@ public: // stop syncing (when this node becomes a primary, e.g.) void stop(); - void shutdown(); + void shutdown(OperationContext* txn); bool isStopped() const; @@ -86,16 +86,16 @@ public: // Interface implementation - bool peek(BSONObj* op); - void consume(); + bool peek(OperationContext* txn, BSONObj* op); + void consume(OperationContext* txn); void clearSyncTarget(); - void waitForMore(); + void waitForMore(OperationContext* txn); // For monitoring BSONObj getCounters(); // Clears any fetched and buffered oplog entries. - void clearBuffer(); + void clearBuffer(OperationContext* txn); /** * Cancel existing find/getMore commands on the sync source's oplog collection. @@ -112,7 +112,7 @@ public: bool shouldStopFetching() const; // Testing related stuff - void pushTestOpToBuffer(const BSONObj& op); + void pushTestOpToBuffer(OperationContext* txn, const BSONObj& op); private: // Production thread @@ -126,7 +126,7 @@ private: * * NOTE: Used after rollback and during draining to transition to Primary role; */ - void _signalNoNewDataForApplier(); + void _signalNoNewDataForApplier(OperationContext* txn); /** * Record metrics. diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index daa47a6c56c..72b96deb4ba 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -451,7 +451,7 @@ Status DataReplicator::start() { _fetcherPaused = false; _reporterPaused = false; _oplogBuffer = _dataReplicatorExternalState->makeSteadyStateOplogBuffer(); - _oplogBuffer->startup(); + _oplogBuffer->startup(nullptr); _doNextActions_Steady_inlock(); return Status::OK(); } @@ -494,7 +494,7 @@ Timestamp DataReplicator::getLastTimestampApplied() const { size_t DataReplicator::getOplogBufferCount() const { // Oplog buffer is internally synchronized. - return _oplogBuffer->getCount(); + return _oplogBuffer->getCount(nullptr); } std::string DataReplicator::getDiagnosticString() const { @@ -502,7 +502,7 @@ std::string DataReplicator::getDiagnosticString() const { str::stream out; out << "DataReplicator -" << " opts: " << _opts.toString() << " oplogFetcher: " << _fetcher->toString() - << " opsBuffered: " << _oplogBuffer->getSize() << " state: " << toString(_state); + << " opsBuffered: " << _oplogBuffer->getSize(nullptr) << " state: " << toString(_state); switch (_state) { case DataReplicatorState::InitialSync: out << " opsAppied: " << _initialSyncState->appliedOps @@ -572,7 +572,7 @@ TimestampStatus DataReplicator::flushAndPause() { void DataReplicator::_resetState_inlock(Timestamp lastAppliedOpTime) { invariant(!_anyActiveHandles_inlock()); _lastTimestampApplied = _lastTimestampFetched = lastAppliedOpTime; - _oplogBuffer->clear(); + _oplogBuffer->clear(nullptr); } void DataReplicator::slavesHaveProgressed() { @@ -632,9 +632,9 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) { _applierPaused = true; _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(); - _oplogBuffer->startup(); + _oplogBuffer->startup(nullptr); ON_BLOCK_EXIT([this]() { - _oplogBuffer->shutdown(); + _oplogBuffer->shutdown(nullptr); _oplogBuffer.reset(); }); @@ -948,7 +948,7 @@ void DataReplicator::_doNextActions_Steady_inlock() { } // Check if no active apply and ops to apply - if (!_applierActive && _oplogBuffer->getSize()) { + if (!_applierActive && _oplogBuffer->getSize(nullptr)) { _scheduleApplyBatch_inlock(); } @@ -975,7 +975,8 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { // * only OplogEntries from before the slaveDelay point // * a single command OplogEntry (including index builds, which appear to be inserts) // * consequently, commands bound the previous batch to be in a batch of their own - while (_oplogBuffer->peek(&op)) { + OperationContext* txn = nullptr; + while (_oplogBuffer->peek(txn, &op)) { auto entry = OplogEntry(op); // Check for ops that must be processed one at a time. @@ -986,7 +987,7 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { if (ops.empty()) { // Apply commands one-at-a-time. ops.push_back(std::move(entry)); - _oplogBuffer->tryPop(&op); + _oplogBuffer->tryPop(txn, &op); invariant(entry == OplogEntry(op)); } @@ -1027,7 +1028,7 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() { // Add op to buffer. ops.push_back(entry); totalBytes += entry.raw.objsize(); - _oplogBuffer->tryPop(&op); + _oplogBuffer->tryPop(txn, &op); invariant(entry == OplogEntry(op)); } return ops; @@ -1299,7 +1300,7 @@ Status DataReplicator::scheduleShutdown() { invariant(!_onShutdown.isValid()); _onShutdown = eventStatus.getValue(); _cancelAllHandles_inlock(); - _oplogBuffer->shutdown(); + _oplogBuffer->shutdown(nullptr); _oplogBuffer.reset(); } @@ -1345,14 +1346,14 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, // Wait for enough space. // Gets unblocked on shutdown. - _oplogBuffer->waitForSpace(info.toApplyDocumentBytes); + _oplogBuffer->waitForSpace(nullptr, info.toApplyDocumentBytes); OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes"; + LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize(nullptr) << " bytes"; } // Buffer docs for later application. - fassert(40143, _oplogBuffer->pushAllNonBlocking(begin, end)); + fassert(40143, _oplogBuffer->pushAllNonBlocking(nullptr, begin, end)); _lastTimestampFetched = info.lastDocument.opTime.getTimestamp(); diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h index 3c172cf650e..46df1c35d27 100644 --- a/src/mongo/db/repl/oplog_buffer.h +++ b/src/mongo/db/repl/oplog_buffer.h @@ -37,6 +37,9 @@ #include "mongo/util/time_support.h" namespace mongo { + +class OperationContext; + namespace repl { /** @@ -67,7 +70,7 @@ public: * create backing storage, etc). This method may be called at most once for the lifetime of an * oplog buffer. */ - virtual void startup() = 0; + virtual void startup(OperationContext* txn) = 0; /** * Signals to the oplog buffer that it should shut down. This method may block. After @@ -76,7 +79,7 @@ public: * It is legal to call this method multiple times, but it should only be called after startup * has been called. */ - virtual void shutdown() = 0; + virtual void shutdown(OperationContext* txn) = 0; /** * Pushes operation into oplog buffer, ignoring any size constraints. Does not block. @@ -84,31 +87,33 @@ public: * the limit returned by getMaxSize() but should not otherwise adversely affect normal * functionality such as pushing and popping operations from the oplog buffer. */ - virtual void pushEvenIfFull(const Value& value) = 0; + virtual void pushEvenIfFull(OperationContext* txn, const Value& value) = 0; /** * Pushes operation into oplog buffer. * If there are size constraints on the oplog buffer, this may block until sufficient space * is made available (by popping) to complete this operation. */ - virtual void push(const Value& value) = 0; + virtual void push(OperationContext* txn, const Value& value) = 0; /** * Pushes operations in the iterator range [begin, end) into the oplog buffer without blocking. * * Returns false if there is insufficient space to complete this operation successfully. */ - virtual bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) = 0; + virtual bool pushAllNonBlocking(OperationContext* txn, + Batch::const_iterator begin, + Batch::const_iterator end) = 0; /** * Returns when enough space is available. */ - virtual void waitForSpace(std::size_t size) = 0; + virtual void waitForSpace(OperationContext* txn, std::size_t size) = 0; /** * Returns true if oplog buffer is empty. */ - virtual bool isEmpty() const = 0; + virtual bool isEmpty(OperationContext* txn) const = 0; /** * Maximum size of all oplog entries that can be stored in this oplog buffer as measured by the @@ -122,47 +127,47 @@ public: * Total size of all oplog entries in this oplog buffer as measured by the BSONObj::objsize() * function. */ - virtual std::size_t getSize() const = 0; + virtual std::size_t getSize(OperationContext* txn) const = 0; /** * Returns the number/count of items in the oplog buffer. */ - virtual std::size_t getCount() const = 0; + virtual std::size_t getCount(OperationContext* txn) const = 0; /** * Clears oplog buffer. */ - virtual void clear() = 0; + virtual void clear(OperationContext* txn) = 0; /** * Returns false if oplog buffer is empty. "value" is left unchanged. * Otherwise, removes last item (saves in "value") from the oplog buffer and returns true. */ - virtual bool tryPop(Value* value) = 0; + virtual bool tryPop(OperationContext* txn, Value* value) = 0; /** * Pops the last operation in the oplog buffer. * If the oplog buffer is empty, waits until an operation is pushed. */ - virtual Value blockingPop() = 0; + virtual Value blockingPop(OperationContext* txn) = 0; /** * Waits "waitDuration" for an operation to be pushed into the oplog buffer. * Returns false if oplog buffer is still empty after "waitDuration". * Otherwise, returns true and sets "value" to last item in oplog buffer. */ - virtual bool blockingPeek(Value* value, Seconds waitDuration) = 0; + virtual bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) = 0; /** * Returns false if oplog buffer is empty. * Otherwise, returns true and sets "value" to last item in oplog buffer. */ - virtual bool peek(Value* value) = 0; + virtual bool peek(OperationContext* txn, Value* value) = 0; /** * Returns the item most recently added to the oplog buffer or nothing if the buffer is empty. */ - virtual boost::optional<Value> lastObjectPushed() const = 0; + virtual boost::optional<Value> lastObjectPushed(OperationContext* txn) const = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp index ff575cae145..bb7801f2da7 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp @@ -47,31 +47,32 @@ size_t getDocumentSize(const BSONObj& o) { OplogBufferBlockingQueue::OplogBufferBlockingQueue() : _queue(kOplogBufferSize, &getDocumentSize) {} -void OplogBufferBlockingQueue::startup() {} +void OplogBufferBlockingQueue::startup(OperationContext*) {} -void OplogBufferBlockingQueue::shutdown() { - clear(); +void OplogBufferBlockingQueue::shutdown(OperationContext* txn) { + clear(txn); } -void OplogBufferBlockingQueue::pushEvenIfFull(const Value& value) { +void OplogBufferBlockingQueue::pushEvenIfFull(OperationContext*, const Value& value) { _queue.pushEvenIfFull(value); } -void OplogBufferBlockingQueue::push(const Value& value) { +void OplogBufferBlockingQueue::push(OperationContext*, const Value& value) { _queue.push(value); } -bool OplogBufferBlockingQueue::pushAllNonBlocking(Batch::const_iterator begin, +bool OplogBufferBlockingQueue::pushAllNonBlocking(OperationContext*, + Batch::const_iterator begin, Batch::const_iterator end) { _queue.pushAllNonBlocking(begin, end); return true; } -void OplogBufferBlockingQueue::waitForSpace(std::size_t size) { +void OplogBufferBlockingQueue::waitForSpace(OperationContext*, std::size_t size) { _queue.waitForSpace(size); } -bool OplogBufferBlockingQueue::isEmpty() const { +bool OplogBufferBlockingQueue::isEmpty(OperationContext*) const { return _queue.empty(); } @@ -79,35 +80,36 @@ std::size_t OplogBufferBlockingQueue::getMaxSize() const { return kOplogBufferSize; } -std::size_t OplogBufferBlockingQueue::getSize() const { +std::size_t OplogBufferBlockingQueue::getSize(OperationContext*) const { return _queue.size(); } -std::size_t OplogBufferBlockingQueue::getCount() const { +std::size_t OplogBufferBlockingQueue::getCount(OperationContext*) const { return _queue.count(); } -void OplogBufferBlockingQueue::clear() { +void OplogBufferBlockingQueue::clear(OperationContext*) { _queue.clear(); } -bool OplogBufferBlockingQueue::tryPop(Value* value) { +bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) { return _queue.tryPop(*value); } -OplogBuffer::Value OplogBufferBlockingQueue::blockingPop() { +OplogBuffer::Value OplogBufferBlockingQueue::blockingPop(OperationContext*) { return _queue.blockingPop(); } -bool OplogBufferBlockingQueue::blockingPeek(Value* value, Seconds waitDuration) { +bool OplogBufferBlockingQueue::blockingPeek(OperationContext*, Value* value, Seconds waitDuration) { return _queue.blockingPeek(*value, static_cast<int>(durationCount<Seconds>(waitDuration))); } -bool OplogBufferBlockingQueue::peek(Value* value) { +bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) { return _queue.peek(*value); } -boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed() const { +boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed( + OperationContext*) const { return _queue.lastObjectPushed(); } diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h index 0c40c8f1256..d45d26d47ad 100644 --- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h +++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h @@ -37,26 +37,28 @@ namespace repl { /** * Oplog buffer backed by in memory blocking queue of BSONObj. */ -class OplogBufferBlockingQueue : public OplogBuffer { +class OplogBufferBlockingQueue final : public OplogBuffer { public: OplogBufferBlockingQueue(); - void startup() override; - void shutdown() override; - void pushEvenIfFull(const Value& value) override; - void push(const Value& value) override; - bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override; - void waitForSpace(std::size_t size) override; - bool isEmpty() const override; + void startup(OperationContext* txn) override; + void shutdown(OperationContext* txn) override; + void pushEvenIfFull(OperationContext* txn, const Value& value) override; + void push(OperationContext* txn, const Value& value) override; + bool pushAllNonBlocking(OperationContext* txn, + Batch::const_iterator begin, + Batch::const_iterator end) override; + void waitForSpace(OperationContext* txn, std::size_t size) override; + bool isEmpty(OperationContext* txn) const override; std::size_t getMaxSize() const override; - std::size_t getSize() const override; - std::size_t getCount() const override; - void clear() override; - bool tryPop(Value* value) override; - Value blockingPop() override; - bool blockingPeek(Value* value, Seconds waitDuration) override; - bool peek(Value* value) override; - boost::optional<Value> lastObjectPushed() const override; + std::size_t getSize(OperationContext* txn) const override; + std::size_t getCount(OperationContext* txn) const override; + void clear(OperationContext* txn) override; + bool tryPop(OperationContext* txn, Value* value) override; + Value blockingPop(OperationContext* txn) override; + bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override; + bool peek(OperationContext* txn, Value* value) override; + boost::optional<Value> lastObjectPushed(OperationContext* txn) const override; private: BlockingQueue<BSONObj> _queue; diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index 7af1057e022..a042f4311b1 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -45,14 +45,6 @@ namespace { const char kDefaultOplogCollectionNamespace[] = "local.oplog.initialSyncTempBuffer"; -/** - * Creates an operation context for the current oplog buffer operation. - * Crashes if there is already an existing operation context registered with the current client. - */ -ServiceContext::UniqueOperationContext makeOperationContext() { - return cc().makeOperationContext(); -} - } // namespace NamespaceString OplogBufferCollection::getDefaultNamespace() { @@ -67,10 +59,7 @@ NamespaceString OplogBufferCollection::getNamespace() const { return _nss; } -void OplogBufferCollection::startup() { - auto txnPtr = makeOperationContext(); - auto txn = txnPtr.get(); - +void OplogBufferCollection::startup(OperationContext* txn) { // TODO: use storage interface to create collection. MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { AutoGetOrCreateDb databaseWriteGuard(txn, _nss.db(), MODE_X); @@ -84,20 +73,21 @@ void OplogBufferCollection::startup() { MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "OplogBufferCollection::startup", _nss.ns()); } -void OplogBufferCollection::shutdown() {} +void OplogBufferCollection::shutdown(OperationContext* txn) {} -void OplogBufferCollection::pushEvenIfFull(const Value& value) {} +void OplogBufferCollection::pushEvenIfFull(OperationContext* txn, const Value& value) {} -void OplogBufferCollection::push(const Value& value) {} +void OplogBufferCollection::push(OperationContext* txn, const Value& value) {} -bool OplogBufferCollection::pushAllNonBlocking(Batch::const_iterator begin, +bool OplogBufferCollection::pushAllNonBlocking(OperationContext* txn, + Batch::const_iterator begin, Batch::const_iterator end) { return false; } -void OplogBufferCollection::waitForSpace(std::size_t size) {} +void OplogBufferCollection::waitForSpace(OperationContext* txn, std::size_t size) {} -bool OplogBufferCollection::isEmpty() const { +bool OplogBufferCollection::isEmpty(OperationContext* txn) const { return true; } @@ -105,33 +95,36 @@ std::size_t OplogBufferCollection::getMaxSize() const { return 0; } -std::size_t OplogBufferCollection::getSize() const { +std::size_t OplogBufferCollection::getSize(OperationContext* txn) const { return 0; } -std::size_t OplogBufferCollection::getCount() const { +std::size_t OplogBufferCollection::getCount(OperationContext* txn) const { return 0; } -void OplogBufferCollection::clear() {} +void OplogBufferCollection::clear(OperationContext* txn) {} -bool OplogBufferCollection::tryPop(Value* value) { +bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) { return false; } -OplogBuffer::Value OplogBufferCollection::blockingPop() { +OplogBuffer::Value OplogBufferCollection::blockingPop(OperationContext* txn) { return {}; } -bool OplogBufferCollection::blockingPeek(Value* value, Seconds waitDuration) { +bool OplogBufferCollection::blockingPeek(OperationContext* txn, + Value* value, + Seconds waitDuration) { return false; } -bool OplogBufferCollection::peek(Value* value) { +bool OplogBufferCollection::peek(OperationContext* txn, Value* value) { return false; } -boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed() const { +boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( + OperationContext* txn) const { return {}; } diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index c422c2f2c1c..0270673aae7 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -54,22 +54,24 @@ public: */ NamespaceString getNamespace() const; - void startup() override; - void shutdown() override; - void pushEvenIfFull(const Value& value) override; - void push(const Value& value) override; - bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override; - void waitForSpace(std::size_t size) override; - bool isEmpty() const override; + void startup(OperationContext* txn) override; + void shutdown(OperationContext* txn) override; + void pushEvenIfFull(OperationContext* txn, const Value& value) override; + void push(OperationContext* txn, const Value& value) override; + bool pushAllNonBlocking(OperationContext* txn, + Batch::const_iterator begin, + Batch::const_iterator end) override; + void waitForSpace(OperationContext* txn, std::size_t size) override; + bool isEmpty(OperationContext* txn) const override; std::size_t getMaxSize() const override; - std::size_t getSize() const override; - std::size_t getCount() const override; - void clear() override; - bool tryPop(Value* value) override; - Value blockingPop() override; - bool blockingPeek(Value* value, Seconds waitDuration) override; - bool peek(Value* value) override; - boost::optional<Value> lastObjectPushed() const override; + std::size_t getSize(OperationContext* txn) const override; + std::size_t getCount(OperationContext* txn) const override; + void clear(OperationContext* txn) override; + bool tryPop(OperationContext* txn, Value* value) override; + Value blockingPop(OperationContext* txn) override; + bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override; + bool peek(OperationContext* txn, Value* value) override; + boost::optional<Value> lastObjectPushed(OperationContext* txn) const override; private: const NamespaceString _nss; diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 90b54c2b1e6..c34d03b0686 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -28,10 +28,14 @@ #include "mongo/platform/basic.h" +#include "mongo/db/catalog/database.h" #include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/oplog_buffer_collection.h" +#include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface.h" @@ -52,8 +56,11 @@ protected: protected: ServiceContext::UniqueOperationContext makeOperationContext() const; + ServiceContext::UniqueOperationContext _txn; + private: void setUp() override; + void tearDown() override; }; void OplogBufferCollectionTest::setUp() { @@ -72,6 +79,14 @@ void OplogBufferCollectionTest::setUp() { stdx::make_unique<ReplicationCoordinatorMock>(replSettings)); StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceImpl>()); + + _txn = makeOperationContext(); +} + +void OplogBufferCollectionTest::tearDown() { + _txn.reset(); + + ServiceContextMongoDTest::tearDown(); } ServiceContext::UniqueOperationContext OplogBufferCollectionTest::makeOperationContext() const { @@ -105,10 +120,10 @@ TEST_F(OplogBufferCollectionTest, StartupCreatesCollection) { OplogBufferCollection oplogBuffer(nss); // Collection should not exist until startup() is called. - ASSERT_FALSE(AutoGetCollectionForRead(makeOperationContext().get(), nss).getCollection()); + ASSERT_FALSE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); - oplogBuffer.startup(); - ASSERT_TRUE(AutoGetCollectionForRead(makeOperationContext().get(), nss).getCollection()); + oplogBuffer.startup(_txn.get()); + ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); } } // namespace diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 3e915211977..37c89b5c67f 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -132,7 +132,7 @@ public: * components of the replication system to shut down and stop any threads they are using, * blocking until all replication-related shutdown tasks are complete. */ - virtual void shutdown() = 0; + virtual void shutdown(OperationContext* txn) = 0; /** * Returns a pointer to the ReplicationExecutor. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 72234a44020..0cda02dba15 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -103,7 +103,7 @@ public: * Performs any necessary external state specific shutdown tasks, such as cleaning up * the threads it started. */ - virtual void shutdown() = 0; + virtual void shutdown(OperationContext* txn) = 0; /** * Creates the oplog, writes the first entry and stores the replica set config document. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index d407d32f741..0efaa840544 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -180,7 +180,7 @@ void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* repl::startMasterSlave(txn); } -void ReplicationCoordinatorExternalStateImpl::shutdown() { +void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_threadMutex); if (_startedThreads) { log() << "Stopping replication applier threads"; @@ -193,7 +193,7 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() { } if (_producerThread) { - _bgSync->shutdown(); + _bgSync->shutdown(txn); _producerThread->join(); } if (_snapshotThread) diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 5ccb055e280..28c37b1cc56 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -60,7 +60,7 @@ public: virtual void startSteadyStateReplication() override; virtual void startMasterSlave(OperationContext* txn); - virtual void shutdown(); + virtual void shutdown(OperationContext* txn); virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config); virtual void logTransitionToPrimaryToOplog(OperationContext* txn); virtual void forwardSlaveProgress(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index a9cdf79ff82..259cdcd8010 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -71,7 +71,7 @@ Status ReplicationCoordinatorExternalStateMock::initializeReplSetStorage(Operati return storeLocalConfigDocument(txn, config); } -void ReplicationCoordinatorExternalStateMock::shutdown() {} +void ReplicationCoordinatorExternalStateMock::shutdown(OperationContext*) {} void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {} OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 130ec3f99ad..b1242b0d169 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -60,7 +60,7 @@ public: virtual void startInitialSync(OnInitialSyncFinishedFn finished) override; virtual void startSteadyStateReplication() override; virtual void startMasterSlave(OperationContext*); - virtual void shutdown(); + virtual void shutdown(OperationContext* txn); virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config); virtual void logTransitionToPrimaryToOplog(OperationContext* txn); virtual void forwardSlaveProgress(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 0111bfde3c0..172608bd0de 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -545,7 +545,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) { } } -void ReplicationCoordinatorImpl::shutdown() { +void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { // Shutdown must: // * prevent new threads from blocking in awaitReplication // * wake up all existing threads blocking in awaitReplication @@ -578,7 +578,7 @@ void ReplicationCoordinatorImpl::shutdown() { // joining the replication executor is blocking so it must be run outside of the mutex _replExecutor.shutdown(); _replExecutor.join(); - _externalState->shutdown(); + _externalState->shutdown(txn); } const ReplSettings& ReplicationCoordinatorImpl::getSettings() const { diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 41c11a156d7..eb337c11da2 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -110,7 +110,7 @@ public: virtual void startup(OperationContext* txn) override; - virtual void shutdown() override; + virtual void shutdown(OperationContext* txn) override; virtual ReplicationExecutor* getExecutor() override { return &_replExecutor; diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index cff5dd7dbff..c6ca9bf9800 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -401,7 +401,7 @@ TEST_F(ReplCoordTest, getReplCoord()->processReplSetReconfig(txn.get(), args, &result)); ASSERT_TRUE(result.obj().isEmpty()); - shutdown(); + shutdown(txn.get()); reconfigThread.join(); } @@ -438,7 +438,7 @@ TEST_F(ReplCoordTest, NodeReturnsConfigurationInProgressWhenReceivingAReconfigWh getReplCoord()->processReplSetReconfig(txn.get(), args, &result)); ASSERT_TRUE(result.obj().isEmpty()); - shutdown(); + shutdown(txn.get()); initateThread.join(); } @@ -609,7 +609,7 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi stopCapturingLogMessages(); ASSERT_EQUALS( 1, countLogLinesContaining("because already in the midst of a configuration process")); - shutdown(); + shutdown(txn.get()); reconfigThread.join(); logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Log()); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 84973365218..3a7bd83bda9 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -236,15 +236,14 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingViaANodeThatCannotBecomePrimary) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); // Starting uninitialized, show that we can perform the initiate behavior. BSONObjBuilder result1; auto status = - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "mySet" << "version" @@ -266,17 +265,16 @@ TEST_F(ReplCoordTest, InitiateShouldSucceedWithAValidConfigEvenIfItHasFailedWithAnInvalidConfigPreviously) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); BSONObjBuilder result; ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig, - getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result)); + getReplCoord()->processReplSetInitiate(txn.get(), BSONObj(), &result)); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); // Having failed to initiate once, show that we can now initiate. BSONObjBuilder result1; ASSERT_OK( - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "mySet" << "version" @@ -293,11 +291,10 @@ TEST_F(ReplCoordTest, BSONObjBuilder result; init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS( ErrorCodes::InvalidReplicaSetConfig, - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "mySet" << "version" @@ -396,14 +393,13 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAConfigWithAMismatchedSetName) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; ASSERT_EQUALS( ErrorCodes::InvalidReplicaSetConfig, - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "wrongSet" << "version" @@ -418,12 +414,11 @@ TEST_F(ReplCoordTest, TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAnEmptyConfig) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; - auto status = getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result1); + auto status = getReplCoord()->processReplSetInitiate(txn.get(), BSONObj(), &result1); ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig, status); ASSERT_STRING_CONTAINS(status.reason(), "Missing expected field \"_id\""); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); @@ -432,13 +427,12 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAnEmpt TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithoutAn_idField) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; auto status = getReplCoord()->processReplSetInitiate( - &txn, + txn.get(), BSON("version" << 1 << "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "node1:12345"))), &result1); @@ -451,13 +445,12 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAConfigVersionNotEqualToOne) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; auto status = - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "mySet" << "version" @@ -474,14 +467,13 @@ TEST_F(ReplCoordTest, TEST_F(ReplCoordTest, InitiateFailsWithoutReplSetFlag) { init(""); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; ASSERT_EQUALS( ErrorCodes::NoReplicationEnabled, - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "mySet" << "version" @@ -496,8 +488,7 @@ TEST_F(ReplCoordTest, InitiateFailsWithoutReplSetFlag) { TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenInitiateCannotWriteConfigToDisk) { init("mySet"); start(HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); BSONObjBuilder result1; @@ -505,7 +496,7 @@ TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenInitiateCannotWriteConfigToDi Status(ErrorCodes::OutOfDiskSpace, "The test set this")); ASSERT_EQUALS( ErrorCodes::OutOfDiskSpace, - getReplCoord()->processReplSetInitiate(&txn, + getReplCoord()->processReplSetInitiate(txn.get(), BSON("_id" << "mySet" << "version" @@ -593,8 +584,8 @@ TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCall TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) { init(""); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + OpTimeWithTermZero time(100, 1); WriteConcernOptions writeConcern; @@ -604,7 +595,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstASta // Because we didn't set ReplSettings.replSet, it will think we're a standalone so // awaitReplication will always work. ReplicationCoordinator::StatusAndDuration statusAndDur = - getReplCoord()->awaitReplication(&txn, time, writeConcern); + getReplCoord()->awaitReplication(txn.get(), time, writeConcern); ASSERT_OK(statusAndDur.status); } @@ -612,8 +603,8 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas ReplSettings settings; settings.setMaster(true); init(settings); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + OpTimeWithTermZero time(100, 1); WriteConcernOptions writeConcern; @@ -622,7 +613,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas writeConcern.wMode = WriteConcernOptions::kMajority; // w:majority always works on master/slave ReplicationCoordinator::StatusAndDuration statusAndDur = - getReplCoord()->awaitReplication(&txn, time, writeConcern); + getReplCoord()->awaitReplication(txn.get(), time, writeConcern); ASSERT_OK(statusAndDur.status); } @@ -646,8 +637,8 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec << 2))), HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + OpTimeWithTermZero time(100, 1); WriteConcernOptions writeConcern; @@ -657,7 +648,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec // Node should fail to awaitReplication when not primary. ReplicationCoordinator::StatusAndDuration statusAndDur = - getReplCoord()->awaitReplication(&txn, time, writeConcern); + getReplCoord()->awaitReplication(txn.get(), time, writeConcern); ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status); } @@ -820,44 +811,44 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; writeConcern.wNumNodes = 1; - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + // 1 node waiting for time 1 ReplicationCoordinator::StatusAndDuration statusAndDur = - getReplCoord()->awaitReplication(&txn, time1, writeConcern); + getReplCoord()->awaitReplication(txn.get(), time1, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); - statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, writeConcern); ASSERT_OK(statusAndDur.status); // 2 nodes waiting for time1 writeConcern.wNumNodes = 2; - statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); - statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, writeConcern); ASSERT_OK(statusAndDur.status); // 2 nodes waiting for time2 - statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); - statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time2)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 2, time2)); - statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern); ASSERT_OK(statusAndDur.status); // 3 nodes waiting for time2 writeConcern.wNumNodes = 3; - statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 3, time2)); - statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern); + statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern); ASSERT_OK(statusAndDur.status); } @@ -1271,7 +1262,10 @@ TEST_F(ReplCoordTest, awaiter.start(); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1)); - shutdown(); + { + auto txn = makeOperationContext(); + shutdown(txn.get()); + } ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, statusAndDur.status); awaiter.reset(); @@ -1405,9 +1399,9 @@ private: TEST_F(ReplCoordTest, NodeReturnsBadValueWhenUpdateTermIsRunAgainstANonReplNode) { init(ReplSettings()); ASSERT_TRUE(ReplicationCoordinator::modeNone == getReplCoord()->getReplicationMode()); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - ASSERT_EQUALS(ErrorCodes::BadValue, getReplCoord()->updateTerm(&txn, 0).code()); + auto txn = makeOperationContext(); + + ASSERT_EQUALS(ErrorCodes::BadValue, getReplCoord()->updateTerm(txn.get(), 0).code()); } TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppliesAHigherTerm) { @@ -1433,31 +1427,31 @@ TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppli ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); simulateSuccessfulV1Election(); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_EQUALS(1, getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); // lower term, no change - ASSERT_OK(getReplCoord()->updateTerm(&txn, 0)); + ASSERT_OK(getReplCoord()->updateTerm(txn.get(), 0)); ASSERT_EQUALS(1, getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); // same term, no change - ASSERT_OK(getReplCoord()->updateTerm(&txn, 1)); + ASSERT_OK(getReplCoord()->updateTerm(txn.get(), 1)); ASSERT_EQUALS(1, getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); // higher term, step down and change term Handle cbHandle; - ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(&txn, 2).code()); + ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(txn.get(), 2).code()); // Term hasn't been incremented yet, as we need another try to update it after stepdown. ASSERT_EQUALS(1, getReplCoord()->getTerm()); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); // Now update term should actually update the term, as stepdown is complete. - ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(&txn, 2).code()); + ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(txn.get(), 2).code()); ASSERT_EQUALS(2, getReplCoord()->getTerm()); } @@ -1538,8 +1532,8 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha } TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); + OpTimeWithTermZero optime1(100, 1); // All nodes are caught up getReplCoord()->setMyLastAppliedOpTime(optime1); @@ -1547,7 +1541,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1)); - Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(0)); + Status status = getReplCoord()->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(0)); ASSERT_EQUALS(ErrorCodes::NotMaster, status); ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); } @@ -1563,13 +1557,12 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); // Make sure stepDown cannot grab the global shared lock - Lock::GlobalWrite lk(txn.lockState()); + Lock::GlobalWrite lk(txn->lockState()); - Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000)); + Status status = getReplCoord()->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(1000)); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); } @@ -1610,11 +1603,11 @@ TEST_F(StepDownTest, getNet()->runReadyNetworkOperations(); exitNetwork(); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - ASSERT_OK(getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000))); + ASSERT_OK(getReplCoord()->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(1000))); enterNetwork(); // So we can safely inspect the topology coordinator ASSERT_EQUALS(getNet()->now() + Seconds(1), getTopoCoord().getStepDownTime()); ASSERT_TRUE(getTopoCoord().getMemberState().secondary()); @@ -1634,9 +1627,9 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle << "test1:1234"))), HostAndPort("test1", 1234)); runSingleNodeElection(makeOperationContext(), getReplCoord()); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - ASSERT_OK(getReplCoord()->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000))); + const auto txn = makeOperationContext(); + + ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000))); getNet()->enterNetwork(); // Must do this before inspecting the topocoord Date_t stepdownUntil = getNet()->now() + Seconds(1); ASSERT_EQUALS(stepdownUntil, getTopoCoord().getStepDownTime()); @@ -1665,11 +1658,10 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); // Try to stepDown but time out because no secondaries are caught up. - auto status = repl->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000)); + auto status = repl->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(1000)); ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status); ASSERT_TRUE(repl->getMemberState().primary()); @@ -1684,7 +1676,7 @@ TEST_F(StepDownTest, } getNet()->exitNetwork(); ASSERT_TRUE(repl->getMemberState().primary()); - status = repl->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000)); + status = repl->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)); ASSERT_OK(status); ASSERT_TRUE(repl->getMemberState().secondary()); } @@ -1702,8 +1694,8 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); + // Step down where the secondary actually has to catch up before the stepDown can succeed. // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for @@ -1711,11 +1703,11 @@ TEST_F(StepDownTest, // This makes it unnecessary to advance the clock after entering the network to process // the heartbeat requests. Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = - repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result); + auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( + txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); const auto& eventHandle = globalReadLockAndEventHandle.second; ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn.lockState()->isReadLocked()); + ASSERT_TRUE(txn->lockState()->isReadLocked()); // Make a secondary actually catch up enterNetwork(); @@ -1763,8 +1755,8 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); + // Step down where the secondary actually has to catch up before the stepDown can succeed. // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for @@ -1772,11 +1764,11 @@ TEST_F(StepDownTest, // This makes it unnecessary to advance the clock after entering the network to process // the heartbeat requests. Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = - repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result); + auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( + txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); const auto& eventHandle = globalReadLockAndEventHandle.second; ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn.lockState()->isReadLocked()); + ASSERT_TRUE(txn->lockState()->isReadLocked()); // Secondary has not caught up on first round of heartbeats. enterNetwork(); @@ -1849,23 +1841,23 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { simulateSuccessfulV1Election(); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - const unsigned int opID = txn.getOpID(); + const auto txn = makeOperationContext(); + + const unsigned int opID = txn->getOpID(); ASSERT_TRUE(repl->getMemberState().primary()); // stepDown where the secondary actually has to catch up before the stepDown can succeed. Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = - repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result); + auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( + txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); const auto& eventHandle = globalReadLockAndEventHandle.second; ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn.lockState()->isReadLocked()); + ASSERT_TRUE(txn->lockState()->isReadLocked()); { - stdx::lock_guard<Client> lk(*txn.getClient()); - txn.markKilled(ErrorCodes::Interrupted); + stdx::lock_guard<Client> lk(*(txn->getClient())); + txn->markKilled(ErrorCodes::Interrupted); } getReplCoord()->interrupt(opID); @@ -2212,11 +2204,11 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) { ASSERT_EQUALS(ErrorCodes::NotSecondary, status); ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + // Step down from primary. - getReplCoord()->updateTerm(&txn, getReplCoord()->getTerm() + 1); + getReplCoord()->updateTerm(txn.get(), getReplCoord()->getTerm() + 1); ASSERT_OK(getReplCoord()->waitForMemberState(MemberState::RS_SECONDARY, Seconds(1))); status = getReplCoord()->setMaintenanceMode(false); @@ -2248,11 +2240,11 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection) // TODO this election shouldn't have to happen. simulateSuccessfulV1Election(); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + // Step down from primary. - getReplCoord()->updateTerm(&txn, getReplCoord()->getTerm() + 1); + getReplCoord()->updateTerm(txn.get(), getReplCoord()->getTerm() + 1); getReplCoord()->waitForMemberState(MemberState::RS_SECONDARY, Milliseconds(10 * 1000)); // Can't modify maintenance mode when running for election (before and after dry run). @@ -2379,8 +2371,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast settings.setMaster(true); init(settings); HostAndPort clientHost("node2:12345"); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + OID client = OID::gen(); OpTimeWithTermZero time1(100, 1); @@ -2389,7 +2381,7 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast getExternalState()->setClientHostAndPort(clientHost); HandshakeArgs handshake; ASSERT_OK(handshake.initialize(BSON("handshake" << client))); - ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake)); + ASSERT_OK(getReplCoord()->processHandshake(txn.get(), handshake)); getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); @@ -2557,7 +2549,10 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { TEST_F(ReplCoordTest, LogAMessageWhenShutDownBeforeReplicationStartUpFinished) { init(); startCapturingLogMessages(); - getReplCoord()->shutdown(); + { + auto txn = makeOperationContext(); + getReplCoord()->shutdown(txn.get()); + } stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("shutdown() called before startup() finished")); } @@ -2595,11 +2590,11 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; writeConcern.wNumNodes = 1; - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); // receive updatePosition containing ourself, should not process the update for self UpdatePositionArgs args; @@ -2618,7 +2613,7 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf) { @@ -2655,11 +2650,11 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; writeConcern.wNumNodes = 1; - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); // receive updatePosition containing ourself, should not process the update for self OldUpdatePositionArgs args; @@ -2675,7 +2670,7 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) { @@ -2726,14 +2721,14 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) << UpdatePositionArgs::kAppliedOpTimeFieldName << BSON("ts" << time2.getTimestamp() << "t" << 3)))))); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + long long cfgver; ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig, getReplCoord()->processReplSetUpdatePosition(args, &cfgver)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorrect) { @@ -2782,14 +2777,14 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre << OldUpdatePositionArgs::kOpTimeFieldName << time2.timestamp))))); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + long long cfgver; ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig, getReplCoord()->processReplSetUpdatePosition(args, &cfgver)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConfig) { @@ -2840,12 +2835,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf << UpdatePositionArgs::kAppliedOpTimeFieldName << BSON("ts" << time2.getTimestamp() << "t" << 2)))))); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_EQUALS(ErrorCodes::NodeNotFound, getReplCoord()->processReplSetUpdatePosition(args, 0)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheConfig) { @@ -2894,12 +2889,12 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC << OldUpdatePositionArgs::kOpTimeFieldName << time2.timestamp))))); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_EQUALS(ErrorCodes::NodeNotFound, getReplCoord()->processReplSetUpdatePosition(args, 0)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } TEST_F(ReplCoordTest, @@ -2958,14 +2953,14 @@ TEST_F(ReplCoordTest, << OldUpdatePositionArgs::kOpTimeFieldName << time2.timestamp))))); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0)); - ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); writeConcern.wNumNodes = 3; - ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status); + ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status); } void doReplSetReconfig(ReplicationCoordinatorImpl* replCoord, Status* status) { @@ -3199,8 +3194,8 @@ TEST_F(ReplCoordTest, writeConcern.wMode = WriteConcernOptions::kMajority; writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE; - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ReplicationAwaiter awaiter(getReplCoord(), getServiceContext()); awaiter.setOpTime(time); @@ -3214,7 +3209,7 @@ TEST_F(ReplCoordTest, writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE; ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time, writeConcern2).status); + getReplCoord()->awaitReplication(txn.get(), time, writeConcern2).status); // reconfig to three nodes Status status(ErrorCodes::InternalError, "Not Set"); @@ -3277,30 +3272,30 @@ TEST_F(ReplCoordTest, majorityWriteConcern.wMode = WriteConcernOptions::kMajority; majorityWriteConcern.syncMode = WriteConcernOptions::SyncMode::JOURNAL; - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status); + getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 1, time)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status); + getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); // this member does not vote and as a result should not count towards write concern ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 3, time)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 3, time)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status); + getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time)); ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 2, time)); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, - getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status); + getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); getReplCoord()->onSnapshotCreate(time, SnapshotName(1)); - ASSERT_OK(getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status); + ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status); } TEST_F(ReplCoordTest, @@ -3391,13 +3386,12 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); - shutdown(); + auto txn = makeOperationContext(); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + shutdown(txn.get()); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus()); @@ -3418,16 +3412,15 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); { - stdx::lock_guard<Client> lk(*txn.getClient()); - txn.markKilled(ErrorCodes::Interrupted); + stdx::lock_guard<Client> lk(*(txn->getClient())); + txn->markKilled(ErrorCodes::Interrupted); } auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); @@ -3445,10 +3438,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesNoOpTi << 0))), HostAndPort("node1", 12345)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - - auto result = getReplCoord()->waitUntilOpTime(&txn, ReadConcernArgs()); + auto txn = makeOperationContext(); + auto result = getReplCoord()->waitUntilOpTime(txn.get(), ReadConcernArgs()); ASSERT(result.didWait()); ASSERT_OK(result.getStatus()); @@ -3469,11 +3460,9 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - + auto txn = makeOperationContext(); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_OK(result.getStatus()); @@ -3496,11 +3485,11 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(time, ReadConcernLevel::kLocalReadConcern)); + txn.get(), ReadConcernArgs(time, ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_OK(result.getStatus()); @@ -3509,10 +3498,9 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi TEST_F(ReplCoordTest, NodeReturnsNotAReplicaSetWhenWaitUntilOpTimeIsRunWithoutMajorityReadConcernEnabled) { init(ReplSettings()); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_FALSE(result.didWait()); ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); @@ -3523,10 +3511,11 @@ TEST_F(ReplCoordTest, NodeReturnsNotAReplicaSetWhenWaitUntilOpTimeIsRunAgainstAS settings.setMajorityReadConcernEnabled(true); init(settings); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + auto txn = makeOperationContext(); + auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); + txn.get(), + ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); ASSERT_FALSE(result.didWait()); ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); @@ -3544,14 +3533,15 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) { << "_id" << 0))), HostAndPort("node1", 12345)); + runSingleNodeElection(makeOperationContext(), getReplCoord()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); - shutdown(); - auto txn = makeOperationContext(); + shutdown(txn.get()); + auto result = getReplCoord()->waitUntilOpTime( txn.get(), ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); @@ -3572,19 +3562,19 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { << 0))), HostAndPort("node1", 12345)); runSingleNodeElection(makeOperationContext(), getReplCoord()); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; + const auto txn = makeOperationContext(); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); { - stdx::lock_guard<Client> lk(*txn.getClient()); - txn.markKilled(ErrorCodes::Interrupted); + stdx::lock_guard<Client> lk(*(txn->getClient())); + txn->markKilled(ErrorCodes::Interrupted); } auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); + txn.get(), + ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); @@ -3785,9 +3775,8 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet HostAndPort("node1", 12345)); getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - getReplCoord()->updateTerm(&txn, 1); + auto txn = makeOperationContext(); + getReplCoord()->updateTerm(txn.get(), 1); ASSERT_EQUALS(1, getReplCoord()->getTerm()); OpTime time(Timestamp(10, 0), 1); @@ -3851,9 +3840,8 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr << 1), HostAndPort("node1", 12345)); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - getReplCoord()->updateTerm(&txn, 1); + auto txn = makeOperationContext(); + getReplCoord()->updateTerm(txn.get(), 1); ASSERT_EQUALS(1, getReplCoord()->getTerm()); // higher term, should change @@ -3931,9 +3919,8 @@ TEST_F(ReplCoordTest, << 1), HostAndPort("node1", 12345)); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); - auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - getReplCoord()->updateTerm(&txn, 1); + auto txn = makeOperationContext(); + getReplCoord()->updateTerm(txn.get(), 1); ASSERT_EQUALS(1, getReplCoord()->getTerm()); auto replCoord = getReplCoord(); @@ -4805,9 +4792,8 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1))); - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; - replCoord->signalDrainComplete(&txn); + const auto txn = makeOperationContext(); + replCoord->signalDrainComplete(txn.get()); ASSERT_OK(replCoord->waitForDrainFinish(timeout)); // Zero timeout is fine. diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index ca59fd1a833..91893d37325 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -53,7 +53,7 @@ void ReplicationCoordinatorMock::startup(OperationContext* txn) { // TODO } -void ReplicationCoordinatorMock::shutdown() { +void ReplicationCoordinatorMock::shutdown(OperationContext*) { // TODO } diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index d2343d883ea..ec76a9677ac 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -54,7 +54,7 @@ public: virtual void startup(OperationContext* txn); - virtual void shutdown(); + virtual void shutdown(OperationContext* txn); virtual ReplicationExecutor* getExecutor() override { return nullptr; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 319f67c3893..5ffdce430db 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -83,7 +83,8 @@ void ReplCoordTest::tearDown() { _externalState->setStoreLocalConfigDocumentToHang(false); } if (_callShutdown) { - shutdown(); + auto txn = makeOperationContext(); + shutdown(txn.get()); } } @@ -267,8 +268,6 @@ void ReplCoordTest::simulateSuccessfulDryRun() { } void ReplCoordTest::simulateSuccessfulV1Election() { - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); @@ -323,7 +322,10 @@ void ReplCoordTest::simulateSuccessfulV1Election() { replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - replCoord->signalDrainComplete(&txn); + { + auto txn = makeOperationContext(); + replCoord->signalDrainComplete(txn.get()); + } replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -332,8 +334,6 @@ void ReplCoordTest::simulateSuccessfulV1Election() { } void ReplCoordTest::simulateSuccessfulElection() { - const auto txnPtr = makeOperationContext(); - auto& txn = *txnPtr; ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); @@ -384,7 +384,10 @@ void ReplCoordTest::simulateSuccessfulElection() { replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - replCoord->signalDrainComplete(&txn); + { + auto txn = makeOperationContext(); + replCoord->signalDrainComplete(txn.get()); + } replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -392,10 +395,10 @@ void ReplCoordTest::simulateSuccessfulElection() { ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); } -void ReplCoordTest::shutdown() { +void ReplCoordTest::shutdown(OperationContext* txn) { invariant(_callShutdown); _net->exitNetwork(); - _repl->shutdown(); + _repl->shutdown(txn); _callShutdown = false; } diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index ad43f44a6ef..6e983aa346d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -228,7 +228,7 @@ protected: /** * Shuts down the objects under test. */ - void shutdown(); + void shutdown(OperationContext* txn); /** * Receive the heartbeat request from replication coordinator and reply with a response. diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index a82725ddc9c..8c50820b31e 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -96,7 +96,7 @@ void truncateAndResetOplog(OperationContext* txn, // because the bgsync thread, while running, may update the blacklist. replCoord->resetMyLastOpTimes(); bgsync->stop(); - bgsync->clearBuffer(); + bgsync->clearBuffer(txn); replCoord->clearSyncSourceBlacklist(); @@ -214,7 +214,7 @@ bool _initialSyncClone(OperationContext* txn, * @param r the oplog reader. * @return if applying the oplog succeeded. */ -bool _initialSyncApplyOplog(OperationContext* ctx, +bool _initialSyncApplyOplog(OperationContext* txn, repl::InitialSync* syncer, OplogReader* r, BackgroundSync* bgsync) { @@ -224,9 +224,11 @@ bool _initialSyncApplyOplog(OperationContext* ctx, // If the fail point is set, exit failing. if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) { log() << "adding fake oplog entry to buffer."; - bgsync->pushTestOpToBuffer(BSON( - "ts" << startOpTime.getTimestamp() << "t" << startOpTime.getTerm() << "v" << 1 << "op" - << "n")); + bgsync->pushTestOpToBuffer( + txn, + BSON("ts" << startOpTime.getTimestamp() << "t" << startOpTime.getTerm() << "v" << 1 + << "op" + << "n")); return false; } @@ -267,7 +269,7 @@ bool _initialSyncApplyOplog(OperationContext* ctx, // apply till stopOpTime try { LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime; - syncer->oplogApplication(ctx, stopOpTime); + syncer->oplogApplication(txn, stopOpTime); if (inShutdown()) { return false; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 1e01f0d11cf..a4514386671 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -295,8 +295,8 @@ std::unique_ptr<OldThreadPool> SyncTail::makeWriterPool() { return stdx::make_unique<OldThreadPool>(replWriterThreadCount, "repl writer worker "); } -bool SyncTail::peek(BSONObj* op) { - return _networkQueue->peek(op); +bool SyncTail::peek(OperationContext* txn, BSONObj* op) { + return _networkQueue->peek(txn, op); } // static @@ -821,13 +821,13 @@ void SyncTail::oplogApplication() { bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) { BSONObj op; // Check to see if there are ops waiting in the bgsync queue - bool peek_success = peek(&op); + bool peek_success = peek(txn, &op); if (!peek_success) { // if we don't have anything in the queue, wait a bit for something to appear if (ops->empty()) { // block up to 1 second - _networkQueue->waitForMore(); + _networkQueue->waitForMore(txn); return false; } @@ -846,7 +846,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op if (ops->empty()) { // apply commands one-at-a-time ops->push_back(std::move(entry)); - _networkQueue->consume(); + _networkQueue->consume(txn); } // otherwise, apply what we have so far and come back for the command @@ -869,7 +869,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op // Copy the op to the deque and remove it from the bgsync queue. ops->push_back(std::move(entry)); - _networkQueue->consume(); + _networkQueue->consume(txn); // Go back for more ops return false; diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 69cac0a3943..9df725e9f4a 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -108,7 +108,7 @@ public: static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert); void oplogApplication(); - bool peek(BSONObj* obj); + bool peek(OperationContext* txn, BSONObj* obj); class OpQueue { public: |