summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/db.cpp4
-rw-r--r--src/mongo/db/repl/bgsync.cpp67
-rw-r--r--src/mongo/db/repl/bgsync.h14
-rw-r--r--src/mongo/db/repl/data_replicator.cpp29
-rw-r--r--src/mongo/db/repl/oplog_buffer.h35
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.cpp34
-rw-r--r--src/mongo/db/repl/oplog_buffer_blocking_queue.h34
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp45
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h32
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp21
-rw-r--r--src/mongo/db/repl/replication_coordinator.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp350
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp21
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h2
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp14
-rw-r--r--src/mongo/db/repl/sync_tail.cpp12
-rw-r--r--src/mongo/db/repl/sync_tail.h2
27 files changed, 384 insertions, 362 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index b78849189c4..5b3b79bec1f 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -1004,8 +1004,6 @@ static void shutdownTask() {
getGlobalServiceContext()->setKillAllOperations();
- repl::getGlobalReplicationCoordinator()->shutdown();
-
Client& client = cc();
ServiceContext::UniqueOperationContext uniqueTxn;
OperationContext* txn = client.getOperationContext();
@@ -1014,6 +1012,8 @@ static void shutdownTask() {
txn = uniqueTxn.get();
}
+ repl::ReplicationCoordinator::get(txn)->shutdown(txn);
+
ShardingState::get(txn)->shutDown(txn);
// We should always be able to acquire the global lock at shutdown.
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 9aecb8d1d94..c10e88e686f 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -113,6 +113,9 @@ bool DataReplicatorExternalStateBackgroundSync::shouldStopFetching(
std::unique_ptr<ThreadPool> makeThreadPool() {
ThreadPool::Options threadPoolOptions;
threadPoolOptions.poolName = "rsBackgroundSync";
+ threadPoolOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
return stdx::make_unique<ThreadPool>(threadPoolOptions);
}
@@ -163,12 +166,12 @@ BackgroundSync::BackgroundSync(std::unique_ptr<OplogBuffer> oplogBuffer)
bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get());
}
-void BackgroundSync::shutdown() {
+void BackgroundSync::shutdown(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
// Clear the buffer in case the producerThread is waiting in push() due to a full queue.
invariant(inShutdown());
- clearBuffer();
+ clearBuffer(txn);
_stopped = true;
if (_oplogFetcher) {
@@ -181,12 +184,18 @@ void BackgroundSync::producerThread(
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
- _oplogBuffer->startup();
+ {
+ auto txn = cc().makeOperationContext();
+ _oplogBuffer->startup(txn.get());
+ }
+
_threadPoolTaskExecutor.startup();
ON_BLOCK_EXIT([this]() {
_threadPoolTaskExecutor.shutdown();
_threadPoolTaskExecutor.join();
- _oplogBuffer->shutdown();
+
+ auto txn = cc().makeOperationContext();
+ _oplogBuffer->shutdown(txn.get());
});
while (!inShutdown()) {
@@ -205,13 +214,13 @@ void BackgroundSync::producerThread(
stop();
}
-void BackgroundSync::_signalNoNewDataForApplier() {
+void BackgroundSync::_signalNoNewDataForApplier(OperationContext* txn) {
// Signal to consumers that we have entered the stopped state
// if the signal isn't already in the queue.
- const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed();
+ const boost::optional<BSONObj> lastObjectPushed = _oplogBuffer->lastObjectPushed(txn);
if (!lastObjectPushed || !lastObjectPushed->isEmpty()) {
const BSONObj sentinelDoc;
- _oplogBuffer->pushEvenIfFull(sentinelDoc);
+ _oplogBuffer->pushEvenIfFull(txn, sentinelDoc);
bufferCountGauge.increment();
bufferSizeGauge.increment(sentinelDoc.objsize());
}
@@ -226,7 +235,8 @@ void BackgroundSync::_producerThread(
stop();
}
if (_replCoord->isWaitingForApplierToDrain()) {
- _signalNoNewDataForApplier();
+ auto txn = cc().makeOperationContext();
+ _signalNoNewDataForApplier(txn.get());
}
sleepsecs(1);
return;
@@ -246,13 +256,12 @@ void BackgroundSync::_producerThread(
}
// we want to start when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
- const ServiceContext::UniqueOperationContext txnPtr = cc().makeOperationContext();
- OperationContext& txn = *txnPtr;
+ auto txn = cc().makeOperationContext();
if (isStopped()) {
- start(&txn);
+ start(txn.get());
}
- _produce(&txn, replicationCoordinatorExternalState);
+ _produce(txn.get(), replicationCoordinatorExternalState);
}
void BackgroundSync::_produce(
@@ -484,6 +493,8 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info,
Milliseconds getMoreElapsed) {
+ auto txn = cc().makeOperationContext();
+
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
@@ -493,14 +504,14 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
}
// Wait for enough space.
- _oplogBuffer->waitForSpace(info.toApplyDocumentBytes);
+ _oplogBuffer->waitForSpace(txn.get(), info.toApplyDocumentBytes);
OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes";
+ LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize(txn.get()) << " bytes";
}
// Buffer docs for later application.
- _oplogBuffer->pushAllNonBlocking(begin, end);
+ _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end);
// Update last fetched info.
{
@@ -513,21 +524,21 @@ void BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
_recordStats(info, getMoreElapsed);
}
-bool BackgroundSync::peek(BSONObj* op) {
- return _oplogBuffer->peek(op);
+bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) {
+ return _oplogBuffer->peek(txn, op);
}
-void BackgroundSync::waitForMore() {
+void BackgroundSync::waitForMore(OperationContext* txn) {
BSONObj op;
// Block for one second before timing out.
// Ignore the value of the op we peeked at.
- _oplogBuffer->blockingPeek(&op, Seconds(1));
+ _oplogBuffer->blockingPeek(txn, &op, Seconds(1));
}
-void BackgroundSync::consume() {
+void BackgroundSync::consume(OperationContext* txn) {
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
- BSONObj op = _oplogBuffer->blockingPop();
+ BSONObj op = _oplogBuffer->blockingPop(txn);
bufferCountGauge.decrement(1);
bufferSizeGauge.decrement(getSize(op));
}
@@ -543,11 +554,11 @@ void BackgroundSync::_rollback(OperationContext* txn,
_replCoord);
if (status.isOK()) {
// When the syncTail thread sees there is no new data by adding something to the buffer.
- _signalNoNewDataForApplier();
+ _signalNoNewDataForApplier(txn);
// Wait until the buffer is empty.
// This is an indication that syncTail has removed the sentinal marker from the buffer
// and reset its local lastAppliedOpTime via the replCoord.
- while (!_oplogBuffer->isEmpty()) {
+ while (!_oplogBuffer->isEmpty(txn)) {
sleepmillis(10);
if (inShutdown()) {
return;
@@ -603,7 +614,7 @@ void BackgroundSync::stop() {
}
void BackgroundSync::start(OperationContext* txn) {
- massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty());
+ massert(16235, "going to start syncing, but buffer is not empty", _oplogBuffer->isEmpty(txn));
long long lastFetchedHash = _readLastAppliedHash(txn);
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -621,8 +632,8 @@ bool BackgroundSync::isStopped() const {
return _stopped;
}
-void BackgroundSync::clearBuffer() {
- _oplogBuffer->clear();
+void BackgroundSync::clearBuffer(OperationContext* txn) {
+ _oplogBuffer->clear(txn);
const auto count = bufferCountGauge.get();
bufferCountGauge.decrement(count);
const auto size = bufferSizeGauge.get();
@@ -690,8 +701,8 @@ bool BackgroundSync::shouldStopFetching() const {
return false;
}
-void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) {
- _oplogBuffer->push(op);
+void BackgroundSync::pushTestOpToBuffer(OperationContext* txn, const BSONObj& op) {
+ _oplogBuffer->push(txn, op);
bufferCountGauge.increment();
bufferSizeGauge.increment(op.objsize());
}
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 817746350d8..a62fa90c69b 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -68,7 +68,7 @@ public:
// stop syncing (when this node becomes a primary, e.g.)
void stop();
- void shutdown();
+ void shutdown(OperationContext* txn);
bool isStopped() const;
@@ -86,16 +86,16 @@ public:
// Interface implementation
- bool peek(BSONObj* op);
- void consume();
+ bool peek(OperationContext* txn, BSONObj* op);
+ void consume(OperationContext* txn);
void clearSyncTarget();
- void waitForMore();
+ void waitForMore(OperationContext* txn);
// For monitoring
BSONObj getCounters();
// Clears any fetched and buffered oplog entries.
- void clearBuffer();
+ void clearBuffer(OperationContext* txn);
/**
* Cancel existing find/getMore commands on the sync source's oplog collection.
@@ -112,7 +112,7 @@ public:
bool shouldStopFetching() const;
// Testing related stuff
- void pushTestOpToBuffer(const BSONObj& op);
+ void pushTestOpToBuffer(OperationContext* txn, const BSONObj& op);
private:
// Production thread
@@ -126,7 +126,7 @@ private:
*
* NOTE: Used after rollback and during draining to transition to Primary role;
*/
- void _signalNoNewDataForApplier();
+ void _signalNoNewDataForApplier(OperationContext* txn);
/**
* Record metrics.
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index daa47a6c56c..72b96deb4ba 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -451,7 +451,7 @@ Status DataReplicator::start() {
_fetcherPaused = false;
_reporterPaused = false;
_oplogBuffer = _dataReplicatorExternalState->makeSteadyStateOplogBuffer();
- _oplogBuffer->startup();
+ _oplogBuffer->startup(nullptr);
_doNextActions_Steady_inlock();
return Status::OK();
}
@@ -494,7 +494,7 @@ Timestamp DataReplicator::getLastTimestampApplied() const {
size_t DataReplicator::getOplogBufferCount() const {
// Oplog buffer is internally synchronized.
- return _oplogBuffer->getCount();
+ return _oplogBuffer->getCount(nullptr);
}
std::string DataReplicator::getDiagnosticString() const {
@@ -502,7 +502,7 @@ std::string DataReplicator::getDiagnosticString() const {
str::stream out;
out << "DataReplicator -"
<< " opts: " << _opts.toString() << " oplogFetcher: " << _fetcher->toString()
- << " opsBuffered: " << _oplogBuffer->getSize() << " state: " << toString(_state);
+ << " opsBuffered: " << _oplogBuffer->getSize(nullptr) << " state: " << toString(_state);
switch (_state) {
case DataReplicatorState::InitialSync:
out << " opsAppied: " << _initialSyncState->appliedOps
@@ -572,7 +572,7 @@ TimestampStatus DataReplicator::flushAndPause() {
void DataReplicator::_resetState_inlock(Timestamp lastAppliedOpTime) {
invariant(!_anyActiveHandles_inlock());
_lastTimestampApplied = _lastTimestampFetched = lastAppliedOpTime;
- _oplogBuffer->clear();
+ _oplogBuffer->clear(nullptr);
}
void DataReplicator::slavesHaveProgressed() {
@@ -632,9 +632,9 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) {
_applierPaused = true;
_oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer();
- _oplogBuffer->startup();
+ _oplogBuffer->startup(nullptr);
ON_BLOCK_EXIT([this]() {
- _oplogBuffer->shutdown();
+ _oplogBuffer->shutdown(nullptr);
_oplogBuffer.reset();
});
@@ -948,7 +948,7 @@ void DataReplicator::_doNextActions_Steady_inlock() {
}
// Check if no active apply and ops to apply
- if (!_applierActive && _oplogBuffer->getSize()) {
+ if (!_applierActive && _oplogBuffer->getSize(nullptr)) {
_scheduleApplyBatch_inlock();
}
@@ -975,7 +975,8 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
// * only OplogEntries from before the slaveDelay point
// * a single command OplogEntry (including index builds, which appear to be inserts)
// * consequently, commands bound the previous batch to be in a batch of their own
- while (_oplogBuffer->peek(&op)) {
+ OperationContext* txn = nullptr;
+ while (_oplogBuffer->peek(txn, &op)) {
auto entry = OplogEntry(op);
// Check for ops that must be processed one at a time.
@@ -986,7 +987,7 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
if (ops.empty()) {
// Apply commands one-at-a-time.
ops.push_back(std::move(entry));
- _oplogBuffer->tryPop(&op);
+ _oplogBuffer->tryPop(txn, &op);
invariant(entry == OplogEntry(op));
}
@@ -1027,7 +1028,7 @@ StatusWith<Operations> DataReplicator::_getNextApplierBatch_inlock() {
// Add op to buffer.
ops.push_back(entry);
totalBytes += entry.raw.objsize();
- _oplogBuffer->tryPop(&op);
+ _oplogBuffer->tryPop(txn, &op);
invariant(entry == OplogEntry(op));
}
return ops;
@@ -1299,7 +1300,7 @@ Status DataReplicator::scheduleShutdown() {
invariant(!_onShutdown.isValid());
_onShutdown = eventStatus.getValue();
_cancelAllHandles_inlock();
- _oplogBuffer->shutdown();
+ _oplogBuffer->shutdown(nullptr);
_oplogBuffer.reset();
}
@@ -1345,14 +1346,14 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
// Wait for enough space.
// Gets unblocked on shutdown.
- _oplogBuffer->waitForSpace(info.toApplyDocumentBytes);
+ _oplogBuffer->waitForSpace(nullptr, info.toApplyDocumentBytes);
OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize() << " bytes";
+ LOG(2) << "bgsync buffer has " << _oplogBuffer->getSize(nullptr) << " bytes";
}
// Buffer docs for later application.
- fassert(40143, _oplogBuffer->pushAllNonBlocking(begin, end));
+ fassert(40143, _oplogBuffer->pushAllNonBlocking(nullptr, begin, end));
_lastTimestampFetched = info.lastDocument.opTime.getTimestamp();
diff --git a/src/mongo/db/repl/oplog_buffer.h b/src/mongo/db/repl/oplog_buffer.h
index 3c172cf650e..46df1c35d27 100644
--- a/src/mongo/db/repl/oplog_buffer.h
+++ b/src/mongo/db/repl/oplog_buffer.h
@@ -37,6 +37,9 @@
#include "mongo/util/time_support.h"
namespace mongo {
+
+class OperationContext;
+
namespace repl {
/**
@@ -67,7 +70,7 @@ public:
* create backing storage, etc). This method may be called at most once for the lifetime of an
* oplog buffer.
*/
- virtual void startup() = 0;
+ virtual void startup(OperationContext* txn) = 0;
/**
* Signals to the oplog buffer that it should shut down. This method may block. After
@@ -76,7 +79,7 @@ public:
* It is legal to call this method multiple times, but it should only be called after startup
* has been called.
*/
- virtual void shutdown() = 0;
+ virtual void shutdown(OperationContext* txn) = 0;
/**
* Pushes operation into oplog buffer, ignoring any size constraints. Does not block.
@@ -84,31 +87,33 @@ public:
* the limit returned by getMaxSize() but should not otherwise adversely affect normal
* functionality such as pushing and popping operations from the oplog buffer.
*/
- virtual void pushEvenIfFull(const Value& value) = 0;
+ virtual void pushEvenIfFull(OperationContext* txn, const Value& value) = 0;
/**
* Pushes operation into oplog buffer.
* If there are size constraints on the oplog buffer, this may block until sufficient space
* is made available (by popping) to complete this operation.
*/
- virtual void push(const Value& value) = 0;
+ virtual void push(OperationContext* txn, const Value& value) = 0;
/**
* Pushes operations in the iterator range [begin, end) into the oplog buffer without blocking.
*
* Returns false if there is insufficient space to complete this operation successfully.
*/
- virtual bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) = 0;
+ virtual bool pushAllNonBlocking(OperationContext* txn,
+ Batch::const_iterator begin,
+ Batch::const_iterator end) = 0;
/**
* Returns when enough space is available.
*/
- virtual void waitForSpace(std::size_t size) = 0;
+ virtual void waitForSpace(OperationContext* txn, std::size_t size) = 0;
/**
* Returns true if oplog buffer is empty.
*/
- virtual bool isEmpty() const = 0;
+ virtual bool isEmpty(OperationContext* txn) const = 0;
/**
* Maximum size of all oplog entries that can be stored in this oplog buffer as measured by the
@@ -122,47 +127,47 @@ public:
* Total size of all oplog entries in this oplog buffer as measured by the BSONObj::objsize()
* function.
*/
- virtual std::size_t getSize() const = 0;
+ virtual std::size_t getSize(OperationContext* txn) const = 0;
/**
* Returns the number/count of items in the oplog buffer.
*/
- virtual std::size_t getCount() const = 0;
+ virtual std::size_t getCount(OperationContext* txn) const = 0;
/**
* Clears oplog buffer.
*/
- virtual void clear() = 0;
+ virtual void clear(OperationContext* txn) = 0;
/**
* Returns false if oplog buffer is empty. "value" is left unchanged.
* Otherwise, removes last item (saves in "value") from the oplog buffer and returns true.
*/
- virtual bool tryPop(Value* value) = 0;
+ virtual bool tryPop(OperationContext* txn, Value* value) = 0;
/**
* Pops the last operation in the oplog buffer.
* If the oplog buffer is empty, waits until an operation is pushed.
*/
- virtual Value blockingPop() = 0;
+ virtual Value blockingPop(OperationContext* txn) = 0;
/**
* Waits "waitDuration" for an operation to be pushed into the oplog buffer.
* Returns false if oplog buffer is still empty after "waitDuration".
* Otherwise, returns true and sets "value" to last item in oplog buffer.
*/
- virtual bool blockingPeek(Value* value, Seconds waitDuration) = 0;
+ virtual bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) = 0;
/**
* Returns false if oplog buffer is empty.
* Otherwise, returns true and sets "value" to last item in oplog buffer.
*/
- virtual bool peek(Value* value) = 0;
+ virtual bool peek(OperationContext* txn, Value* value) = 0;
/**
* Returns the item most recently added to the oplog buffer or nothing if the buffer is empty.
*/
- virtual boost::optional<Value> lastObjectPushed() const = 0;
+ virtual boost::optional<Value> lastObjectPushed(OperationContext* txn) const = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
index ff575cae145..bb7801f2da7 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.cpp
@@ -47,31 +47,32 @@ size_t getDocumentSize(const BSONObj& o) {
OplogBufferBlockingQueue::OplogBufferBlockingQueue() : _queue(kOplogBufferSize, &getDocumentSize) {}
-void OplogBufferBlockingQueue::startup() {}
+void OplogBufferBlockingQueue::startup(OperationContext*) {}
-void OplogBufferBlockingQueue::shutdown() {
- clear();
+void OplogBufferBlockingQueue::shutdown(OperationContext* txn) {
+ clear(txn);
}
-void OplogBufferBlockingQueue::pushEvenIfFull(const Value& value) {
+void OplogBufferBlockingQueue::pushEvenIfFull(OperationContext*, const Value& value) {
_queue.pushEvenIfFull(value);
}
-void OplogBufferBlockingQueue::push(const Value& value) {
+void OplogBufferBlockingQueue::push(OperationContext*, const Value& value) {
_queue.push(value);
}
-bool OplogBufferBlockingQueue::pushAllNonBlocking(Batch::const_iterator begin,
+bool OplogBufferBlockingQueue::pushAllNonBlocking(OperationContext*,
+ Batch::const_iterator begin,
Batch::const_iterator end) {
_queue.pushAllNonBlocking(begin, end);
return true;
}
-void OplogBufferBlockingQueue::waitForSpace(std::size_t size) {
+void OplogBufferBlockingQueue::waitForSpace(OperationContext*, std::size_t size) {
_queue.waitForSpace(size);
}
-bool OplogBufferBlockingQueue::isEmpty() const {
+bool OplogBufferBlockingQueue::isEmpty(OperationContext*) const {
return _queue.empty();
}
@@ -79,35 +80,36 @@ std::size_t OplogBufferBlockingQueue::getMaxSize() const {
return kOplogBufferSize;
}
-std::size_t OplogBufferBlockingQueue::getSize() const {
+std::size_t OplogBufferBlockingQueue::getSize(OperationContext*) const {
return _queue.size();
}
-std::size_t OplogBufferBlockingQueue::getCount() const {
+std::size_t OplogBufferBlockingQueue::getCount(OperationContext*) const {
return _queue.count();
}
-void OplogBufferBlockingQueue::clear() {
+void OplogBufferBlockingQueue::clear(OperationContext*) {
_queue.clear();
}
-bool OplogBufferBlockingQueue::tryPop(Value* value) {
+bool OplogBufferBlockingQueue::tryPop(OperationContext*, Value* value) {
return _queue.tryPop(*value);
}
-OplogBuffer::Value OplogBufferBlockingQueue::blockingPop() {
+OplogBuffer::Value OplogBufferBlockingQueue::blockingPop(OperationContext*) {
return _queue.blockingPop();
}
-bool OplogBufferBlockingQueue::blockingPeek(Value* value, Seconds waitDuration) {
+bool OplogBufferBlockingQueue::blockingPeek(OperationContext*, Value* value, Seconds waitDuration) {
return _queue.blockingPeek(*value, static_cast<int>(durationCount<Seconds>(waitDuration)));
}
-bool OplogBufferBlockingQueue::peek(Value* value) {
+bool OplogBufferBlockingQueue::peek(OperationContext*, Value* value) {
return _queue.peek(*value);
}
-boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed() const {
+boost::optional<OplogBuffer::Value> OplogBufferBlockingQueue::lastObjectPushed(
+ OperationContext*) const {
return _queue.lastObjectPushed();
}
diff --git a/src/mongo/db/repl/oplog_buffer_blocking_queue.h b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
index 0c40c8f1256..d45d26d47ad 100644
--- a/src/mongo/db/repl/oplog_buffer_blocking_queue.h
+++ b/src/mongo/db/repl/oplog_buffer_blocking_queue.h
@@ -37,26 +37,28 @@ namespace repl {
/**
* Oplog buffer backed by in memory blocking queue of BSONObj.
*/
-class OplogBufferBlockingQueue : public OplogBuffer {
+class OplogBufferBlockingQueue final : public OplogBuffer {
public:
OplogBufferBlockingQueue();
- void startup() override;
- void shutdown() override;
- void pushEvenIfFull(const Value& value) override;
- void push(const Value& value) override;
- bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override;
- void waitForSpace(std::size_t size) override;
- bool isEmpty() const override;
+ void startup(OperationContext* txn) override;
+ void shutdown(OperationContext* txn) override;
+ void pushEvenIfFull(OperationContext* txn, const Value& value) override;
+ void push(OperationContext* txn, const Value& value) override;
+ bool pushAllNonBlocking(OperationContext* txn,
+ Batch::const_iterator begin,
+ Batch::const_iterator end) override;
+ void waitForSpace(OperationContext* txn, std::size_t size) override;
+ bool isEmpty(OperationContext* txn) const override;
std::size_t getMaxSize() const override;
- std::size_t getSize() const override;
- std::size_t getCount() const override;
- void clear() override;
- bool tryPop(Value* value) override;
- Value blockingPop() override;
- bool blockingPeek(Value* value, Seconds waitDuration) override;
- bool peek(Value* value) override;
- boost::optional<Value> lastObjectPushed() const override;
+ std::size_t getSize(OperationContext* txn) const override;
+ std::size_t getCount(OperationContext* txn) const override;
+ void clear(OperationContext* txn) override;
+ bool tryPop(OperationContext* txn, Value* value) override;
+ Value blockingPop(OperationContext* txn) override;
+ bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override;
+ bool peek(OperationContext* txn, Value* value) override;
+ boost::optional<Value> lastObjectPushed(OperationContext* txn) const override;
private:
BlockingQueue<BSONObj> _queue;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index 7af1057e022..a042f4311b1 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -45,14 +45,6 @@ namespace {
const char kDefaultOplogCollectionNamespace[] = "local.oplog.initialSyncTempBuffer";
-/**
- * Creates an operation context for the current oplog buffer operation.
- * Crashes if there is already an existing operation context registered with the current client.
- */
-ServiceContext::UniqueOperationContext makeOperationContext() {
- return cc().makeOperationContext();
-}
-
} // namespace
NamespaceString OplogBufferCollection::getDefaultNamespace() {
@@ -67,10 +59,7 @@ NamespaceString OplogBufferCollection::getNamespace() const {
return _nss;
}
-void OplogBufferCollection::startup() {
- auto txnPtr = makeOperationContext();
- auto txn = txnPtr.get();
-
+void OplogBufferCollection::startup(OperationContext* txn) {
// TODO: use storage interface to create collection.
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
AutoGetOrCreateDb databaseWriteGuard(txn, _nss.db(), MODE_X);
@@ -84,20 +73,21 @@ void OplogBufferCollection::startup() {
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "OplogBufferCollection::startup", _nss.ns());
}
-void OplogBufferCollection::shutdown() {}
+void OplogBufferCollection::shutdown(OperationContext* txn) {}
-void OplogBufferCollection::pushEvenIfFull(const Value& value) {}
+void OplogBufferCollection::pushEvenIfFull(OperationContext* txn, const Value& value) {}
-void OplogBufferCollection::push(const Value& value) {}
+void OplogBufferCollection::push(OperationContext* txn, const Value& value) {}
-bool OplogBufferCollection::pushAllNonBlocking(Batch::const_iterator begin,
+bool OplogBufferCollection::pushAllNonBlocking(OperationContext* txn,
+ Batch::const_iterator begin,
Batch::const_iterator end) {
return false;
}
-void OplogBufferCollection::waitForSpace(std::size_t size) {}
+void OplogBufferCollection::waitForSpace(OperationContext* txn, std::size_t size) {}
-bool OplogBufferCollection::isEmpty() const {
+bool OplogBufferCollection::isEmpty(OperationContext* txn) const {
return true;
}
@@ -105,33 +95,36 @@ std::size_t OplogBufferCollection::getMaxSize() const {
return 0;
}
-std::size_t OplogBufferCollection::getSize() const {
+std::size_t OplogBufferCollection::getSize(OperationContext* txn) const {
return 0;
}
-std::size_t OplogBufferCollection::getCount() const {
+std::size_t OplogBufferCollection::getCount(OperationContext* txn) const {
return 0;
}
-void OplogBufferCollection::clear() {}
+void OplogBufferCollection::clear(OperationContext* txn) {}
-bool OplogBufferCollection::tryPop(Value* value) {
+bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) {
return false;
}
-OplogBuffer::Value OplogBufferCollection::blockingPop() {
+OplogBuffer::Value OplogBufferCollection::blockingPop(OperationContext* txn) {
return {};
}
-bool OplogBufferCollection::blockingPeek(Value* value, Seconds waitDuration) {
+bool OplogBufferCollection::blockingPeek(OperationContext* txn,
+ Value* value,
+ Seconds waitDuration) {
return false;
}
-bool OplogBufferCollection::peek(Value* value) {
+bool OplogBufferCollection::peek(OperationContext* txn, Value* value) {
return false;
}
-boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed() const {
+boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
+ OperationContext* txn) const {
return {};
}
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index c422c2f2c1c..0270673aae7 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -54,22 +54,24 @@ public:
*/
NamespaceString getNamespace() const;
- void startup() override;
- void shutdown() override;
- void pushEvenIfFull(const Value& value) override;
- void push(const Value& value) override;
- bool pushAllNonBlocking(Batch::const_iterator begin, Batch::const_iterator end) override;
- void waitForSpace(std::size_t size) override;
- bool isEmpty() const override;
+ void startup(OperationContext* txn) override;
+ void shutdown(OperationContext* txn) override;
+ void pushEvenIfFull(OperationContext* txn, const Value& value) override;
+ void push(OperationContext* txn, const Value& value) override;
+ bool pushAllNonBlocking(OperationContext* txn,
+ Batch::const_iterator begin,
+ Batch::const_iterator end) override;
+ void waitForSpace(OperationContext* txn, std::size_t size) override;
+ bool isEmpty(OperationContext* txn) const override;
std::size_t getMaxSize() const override;
- std::size_t getSize() const override;
- std::size_t getCount() const override;
- void clear() override;
- bool tryPop(Value* value) override;
- Value blockingPop() override;
- bool blockingPeek(Value* value, Seconds waitDuration) override;
- bool peek(Value* value) override;
- boost::optional<Value> lastObjectPushed() const override;
+ std::size_t getSize(OperationContext* txn) const override;
+ std::size_t getCount(OperationContext* txn) const override;
+ void clear(OperationContext* txn) override;
+ bool tryPop(OperationContext* txn, Value* value) override;
+ Value blockingPop(OperationContext* txn) override;
+ bool blockingPeek(OperationContext* txn, Value* value, Seconds waitDuration) override;
+ bool peek(OperationContext* txn, Value* value) override;
+ boost::optional<Value> lastObjectPushed(OperationContext* txn) const override;
private:
const NamespaceString _nss;
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 90b54c2b1e6..c34d03b0686 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -28,10 +28,14 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/oplog_buffer_collection.h"
+#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface.h"
@@ -52,8 +56,11 @@ protected:
protected:
ServiceContext::UniqueOperationContext makeOperationContext() const;
+ ServiceContext::UniqueOperationContext _txn;
+
private:
void setUp() override;
+ void tearDown() override;
};
void OplogBufferCollectionTest::setUp() {
@@ -72,6 +79,14 @@ void OplogBufferCollectionTest::setUp() {
stdx::make_unique<ReplicationCoordinatorMock>(replSettings));
StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceImpl>());
+
+ _txn = makeOperationContext();
+}
+
+void OplogBufferCollectionTest::tearDown() {
+ _txn.reset();
+
+ ServiceContextMongoDTest::tearDown();
}
ServiceContext::UniqueOperationContext OplogBufferCollectionTest::makeOperationContext() const {
@@ -105,10 +120,10 @@ TEST_F(OplogBufferCollectionTest, StartupCreatesCollection) {
OplogBufferCollection oplogBuffer(nss);
// Collection should not exist until startup() is called.
- ASSERT_FALSE(AutoGetCollectionForRead(makeOperationContext().get(), nss).getCollection());
+ ASSERT_FALSE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
- oplogBuffer.startup();
- ASSERT_TRUE(AutoGetCollectionForRead(makeOperationContext().get(), nss).getCollection());
+ oplogBuffer.startup(_txn.get());
+ ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
}
} // namespace
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 3e915211977..37c89b5c67f 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -132,7 +132,7 @@ public:
* components of the replication system to shut down and stop any threads they are using,
* blocking until all replication-related shutdown tasks are complete.
*/
- virtual void shutdown() = 0;
+ virtual void shutdown(OperationContext* txn) = 0;
/**
* Returns a pointer to the ReplicationExecutor.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 72234a44020..0cda02dba15 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -103,7 +103,7 @@ public:
* Performs any necessary external state specific shutdown tasks, such as cleaning up
* the threads it started.
*/
- virtual void shutdown() = 0;
+ virtual void shutdown(OperationContext* txn) = 0;
/**
* Creates the oplog, writes the first entry and stores the replica set config document.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index d407d32f741..0efaa840544 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -180,7 +180,7 @@ void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext*
repl::startMasterSlave(txn);
}
-void ReplicationCoordinatorExternalStateImpl::shutdown() {
+void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_threadMutex);
if (_startedThreads) {
log() << "Stopping replication applier threads";
@@ -193,7 +193,7 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() {
}
if (_producerThread) {
- _bgSync->shutdown();
+ _bgSync->shutdown(txn);
_producerThread->join();
}
if (_snapshotThread)
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 5ccb055e280..28c37b1cc56 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -60,7 +60,7 @@ public:
virtual void startSteadyStateReplication() override;
virtual void startMasterSlave(OperationContext* txn);
- virtual void shutdown();
+ virtual void shutdown(OperationContext* txn);
virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config);
virtual void logTransitionToPrimaryToOplog(OperationContext* txn);
virtual void forwardSlaveProgress();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index a9cdf79ff82..259cdcd8010 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -71,7 +71,7 @@ Status ReplicationCoordinatorExternalStateMock::initializeReplSetStorage(Operati
return storeLocalConfigDocument(txn, config);
}
-void ReplicationCoordinatorExternalStateMock::shutdown() {}
+void ReplicationCoordinatorExternalStateMock::shutdown(OperationContext*) {}
void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {}
OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 130ec3f99ad..b1242b0d169 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -60,7 +60,7 @@ public:
virtual void startInitialSync(OnInitialSyncFinishedFn finished) override;
virtual void startSteadyStateReplication() override;
virtual void startMasterSlave(OperationContext*);
- virtual void shutdown();
+ virtual void shutdown(OperationContext* txn);
virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config);
virtual void logTransitionToPrimaryToOplog(OperationContext* txn);
virtual void forwardSlaveProgress();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 0111bfde3c0..172608bd0de 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -545,7 +545,7 @@ void ReplicationCoordinatorImpl::startup(OperationContext* txn) {
}
}
-void ReplicationCoordinatorImpl::shutdown() {
+void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
// Shutdown must:
// * prevent new threads from blocking in awaitReplication
// * wake up all existing threads blocking in awaitReplication
@@ -578,7 +578,7 @@ void ReplicationCoordinatorImpl::shutdown() {
// joining the replication executor is blocking so it must be run outside of the mutex
_replExecutor.shutdown();
_replExecutor.join();
- _externalState->shutdown();
+ _externalState->shutdown(txn);
}
const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 41c11a156d7..eb337c11da2 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -110,7 +110,7 @@ public:
virtual void startup(OperationContext* txn) override;
- virtual void shutdown() override;
+ virtual void shutdown(OperationContext* txn) override;
virtual ReplicationExecutor* getExecutor() override {
return &_replExecutor;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
index cff5dd7dbff..c6ca9bf9800 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp
@@ -401,7 +401,7 @@ TEST_F(ReplCoordTest,
getReplCoord()->processReplSetReconfig(txn.get(), args, &result));
ASSERT_TRUE(result.obj().isEmpty());
- shutdown();
+ shutdown(txn.get());
reconfigThread.join();
}
@@ -438,7 +438,7 @@ TEST_F(ReplCoordTest, NodeReturnsConfigurationInProgressWhenReceivingAReconfigWh
getReplCoord()->processReplSetReconfig(txn.get(), args, &result));
ASSERT_TRUE(result.obj().isEmpty());
- shutdown();
+ shutdown(txn.get());
initateThread.join();
}
@@ -609,7 +609,7 @@ TEST_F(ReplCoordTest, NodeDoesNotAcceptHeartbeatReconfigWhileInTheMidstOfReconfi
stopCapturingLogMessages();
ASSERT_EQUALS(
1, countLogLinesContaining("because already in the midst of a configuration process"));
- shutdown();
+ shutdown(txn.get());
reconfigThread.join();
logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Log());
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 84973365218..3a7bd83bda9 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -236,15 +236,14 @@ TEST_F(ReplCoordTest,
NodeReturnsInvalidReplicaSetConfigWhenInitiatingViaANodeThatCannotBecomePrimary) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
// Starting uninitialized, show that we can perform the initiate behavior.
BSONObjBuilder result1;
auto status =
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "mySet"
<< "version"
@@ -266,17 +265,16 @@ TEST_F(ReplCoordTest,
InitiateShouldSucceedWithAValidConfigEvenIfItHasFailedWithAnInvalidConfigPreviously) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
BSONObjBuilder result;
ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result));
+ getReplCoord()->processReplSetInitiate(txn.get(), BSONObj(), &result));
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
// Having failed to initiate once, show that we can now initiate.
BSONObjBuilder result1;
ASSERT_OK(
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "mySet"
<< "version"
@@ -293,11 +291,10 @@ TEST_F(ReplCoordTest,
BSONObjBuilder result;
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(
ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "mySet"
<< "version"
@@ -396,14 +393,13 @@ TEST_F(ReplCoordTest,
NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAConfigWithAMismatchedSetName) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
ASSERT_EQUALS(
ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "wrongSet"
<< "version"
@@ -418,12 +414,11 @@ TEST_F(ReplCoordTest,
TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAnEmptyConfig) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
- auto status = getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result1);
+ auto status = getReplCoord()->processReplSetInitiate(txn.get(), BSONObj(), &result1);
ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig, status);
ASSERT_STRING_CONTAINS(status.reason(), "Missing expected field \"_id\"");
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
@@ -432,13 +427,12 @@ TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAnEmpt
TEST_F(ReplCoordTest, NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithoutAn_idField) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
auto status = getReplCoord()->processReplSetInitiate(
- &txn,
+ txn.get(),
BSON("version" << 1 << "members" << BSON_ARRAY(BSON("_id" << 0 << "host"
<< "node1:12345"))),
&result1);
@@ -451,13 +445,12 @@ TEST_F(ReplCoordTest,
NodeReturnsInvalidReplicaSetConfigWhenInitiatingWithAConfigVersionNotEqualToOne) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
auto status =
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "mySet"
<< "version"
@@ -474,14 +467,13 @@ TEST_F(ReplCoordTest,
TEST_F(ReplCoordTest, InitiateFailsWithoutReplSetFlag) {
init("");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
ASSERT_EQUALS(
ErrorCodes::NoReplicationEnabled,
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "mySet"
<< "version"
@@ -496,8 +488,7 @@ TEST_F(ReplCoordTest, InitiateFailsWithoutReplSetFlag) {
TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenInitiateCannotWriteConfigToDisk) {
init("mySet");
start(HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
BSONObjBuilder result1;
@@ -505,7 +496,7 @@ TEST_F(ReplCoordTest, NodeReturnsOutOfDiskSpaceWhenInitiateCannotWriteConfigToDi
Status(ErrorCodes::OutOfDiskSpace, "The test set this"));
ASSERT_EQUALS(
ErrorCodes::OutOfDiskSpace,
- getReplCoord()->processReplSetInitiate(&txn,
+ getReplCoord()->processReplSetInitiate(txn.get(),
BSON("_id"
<< "mySet"
<< "version"
@@ -593,8 +584,8 @@ TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCall
TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) {
init("");
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
OpTimeWithTermZero time(100, 1);
WriteConcernOptions writeConcern;
@@ -604,7 +595,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstASta
// Because we didn't set ReplSettings.replSet, it will think we're a standalone so
// awaitReplication will always work.
ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ getReplCoord()->awaitReplication(txn.get(), time, writeConcern);
ASSERT_OK(statusAndDur.status);
}
@@ -612,8 +603,8 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas
ReplSettings settings;
settings.setMaster(true);
init(settings);
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
OpTimeWithTermZero time(100, 1);
WriteConcernOptions writeConcern;
@@ -622,7 +613,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas
writeConcern.wMode = WriteConcernOptions::kMajority;
// w:majority always works on master/slave
ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ getReplCoord()->awaitReplication(txn.get(), time, writeConcern);
ASSERT_OK(statusAndDur.status);
}
@@ -646,8 +637,8 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
<< 2))),
HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
OpTimeWithTermZero time(100, 1);
WriteConcernOptions writeConcern;
@@ -657,7 +648,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
// Node should fail to awaitReplication when not primary.
ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ getReplCoord()->awaitReplication(txn.get(), time, writeConcern);
ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
}
@@ -820,44 +811,44 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
writeConcern.wNumNodes = 1;
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
// 1 node waiting for time 1
ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ getReplCoord()->awaitReplication(txn.get(), time1, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, writeConcern);
ASSERT_OK(statusAndDur.status);
// 2 nodes waiting for time1
writeConcern.wNumNodes = 2;
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time1, writeConcern);
ASSERT_OK(statusAndDur.status);
// 2 nodes waiting for time2
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time2));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 2, time2));
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern);
ASSERT_OK(statusAndDur.status);
// 3 nodes waiting for time2
writeConcern.wNumNodes = 3;
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 3, time2));
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ statusAndDur = getReplCoord()->awaitReplication(txn.get(), time2, writeConcern);
ASSERT_OK(statusAndDur.status);
}
@@ -1271,7 +1262,10 @@ TEST_F(ReplCoordTest,
awaiter.start();
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1));
- shutdown();
+ {
+ auto txn = makeOperationContext();
+ shutdown(txn.get());
+ }
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, statusAndDur.status);
awaiter.reset();
@@ -1405,9 +1399,9 @@ private:
TEST_F(ReplCoordTest, NodeReturnsBadValueWhenUpdateTermIsRunAgainstANonReplNode) {
init(ReplSettings());
ASSERT_TRUE(ReplicationCoordinator::modeNone == getReplCoord()->getReplicationMode());
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- ASSERT_EQUALS(ErrorCodes::BadValue, getReplCoord()->updateTerm(&txn, 0).code());
+ auto txn = makeOperationContext();
+
+ ASSERT_EQUALS(ErrorCodes::BadValue, getReplCoord()->updateTerm(txn.get(), 0).code());
}
TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppliesAHigherTerm) {
@@ -1433,31 +1427,31 @@ TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppli
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
simulateSuccessfulV1Election();
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_EQUALS(1, getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
// lower term, no change
- ASSERT_OK(getReplCoord()->updateTerm(&txn, 0));
+ ASSERT_OK(getReplCoord()->updateTerm(txn.get(), 0));
ASSERT_EQUALS(1, getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
// same term, no change
- ASSERT_OK(getReplCoord()->updateTerm(&txn, 1));
+ ASSERT_OK(getReplCoord()->updateTerm(txn.get(), 1));
ASSERT_EQUALS(1, getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
// higher term, step down and change term
Handle cbHandle;
- ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(&txn, 2).code());
+ ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(txn.get(), 2).code());
// Term hasn't been incremented yet, as we need another try to update it after stepdown.
ASSERT_EQUALS(1, getReplCoord()->getTerm());
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
// Now update term should actually update the term, as stepdown is complete.
- ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(&txn, 2).code());
+ ASSERT_EQUALS(ErrorCodes::StaleTerm, getReplCoord()->updateTerm(txn.get(), 2).code());
ASSERT_EQUALS(2, getReplCoord()->getTerm());
}
@@ -1538,8 +1532,8 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
}
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
+
OpTimeWithTermZero optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
@@ -1547,7 +1541,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 2, optime1));
- Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(0));
+ Status status = getReplCoord()->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(0));
ASSERT_EQUALS(ErrorCodes::NotMaster, status);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
}
@@ -1563,13 +1557,12 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
// Make sure stepDown cannot grab the global shared lock
- Lock::GlobalWrite lk(txn.lockState());
+ Lock::GlobalWrite lk(txn->lockState());
- Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000));
+ Status status = getReplCoord()->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(1000));
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
}
@@ -1610,11 +1603,11 @@ TEST_F(StepDownTest,
getNet()->runReadyNetworkOperations();
exitNetwork();
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
+
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT_OK(getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000)));
+ ASSERT_OK(getReplCoord()->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(1000)));
enterNetwork(); // So we can safely inspect the topology coordinator
ASSERT_EQUALS(getNet()->now() + Seconds(1), getTopoCoord().getStepDownTime());
ASSERT_TRUE(getTopoCoord().getMemberState().secondary());
@@ -1634,9 +1627,9 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
<< "test1:1234"))),
HostAndPort("test1", 1234));
runSingleNodeElection(makeOperationContext(), getReplCoord());
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- ASSERT_OK(getReplCoord()->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000)));
+ const auto txn = makeOperationContext();
+
+ ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)));
getNet()->enterNetwork(); // Must do this before inspecting the topocoord
Date_t stepdownUntil = getNet()->now() + Seconds(1);
ASSERT_EQUALS(stepdownUntil, getTopoCoord().getStepDownTime());
@@ -1665,11 +1658,10 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
// Try to stepDown but time out because no secondaries are caught up.
- auto status = repl->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000));
+ auto status = repl->stepDown(txn.get(), false, Milliseconds(0), Milliseconds(1000));
ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
ASSERT_TRUE(repl->getMemberState().primary());
@@ -1684,7 +1676,7 @@ TEST_F(StepDownTest,
}
getNet()->exitNetwork();
ASSERT_TRUE(repl->getMemberState().primary());
- status = repl->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000));
+ status = repl->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000));
ASSERT_OK(status);
ASSERT_TRUE(repl->getMemberState().secondary());
}
@@ -1702,8 +1694,8 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
+
// Step down where the secondary actually has to catch up before the stepDown can succeed.
// On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
@@ -1711,11 +1703,11 @@ TEST_F(StepDownTest,
// This makes it unnecessary to advance the clock after entering the network to process
// the heartbeat requests.
Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle =
- repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result);
+ auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
+ txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
const auto& eventHandle = globalReadLockAndEventHandle.second;
ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn.lockState()->isReadLocked());
+ ASSERT_TRUE(txn->lockState()->isReadLocked());
// Make a secondary actually catch up
enterNetwork();
@@ -1763,8 +1755,8 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
+
// Step down where the secondary actually has to catch up before the stepDown can succeed.
// On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
@@ -1772,11 +1764,11 @@ TEST_F(StepDownTest,
// This makes it unnecessary to advance the clock after entering the network to process
// the heartbeat requests.
Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle =
- repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result);
+ auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
+ txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
const auto& eventHandle = globalReadLockAndEventHandle.second;
ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn.lockState()->isReadLocked());
+ ASSERT_TRUE(txn->lockState()->isReadLocked());
// Secondary has not caught up on first round of heartbeats.
enterNetwork();
@@ -1849,23 +1841,23 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
simulateSuccessfulV1Election();
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- const unsigned int opID = txn.getOpID();
+ const auto txn = makeOperationContext();
+
+ const unsigned int opID = txn->getOpID();
ASSERT_TRUE(repl->getMemberState().primary());
// stepDown where the secondary actually has to catch up before the stepDown can succeed.
Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle =
- repl->stepDown_nonBlocking(&txn, false, Milliseconds(10000), Milliseconds(60000), &result);
+ auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
+ txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
const auto& eventHandle = globalReadLockAndEventHandle.second;
ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn.lockState()->isReadLocked());
+ ASSERT_TRUE(txn->lockState()->isReadLocked());
{
- stdx::lock_guard<Client> lk(*txn.getClient());
- txn.markKilled(ErrorCodes::Interrupted);
+ stdx::lock_guard<Client> lk(*(txn->getClient()));
+ txn->markKilled(ErrorCodes::Interrupted);
}
getReplCoord()->interrupt(opID);
@@ -2212,11 +2204,11 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) {
ASSERT_EQUALS(ErrorCodes::NotSecondary, status);
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
// Step down from primary.
- getReplCoord()->updateTerm(&txn, getReplCoord()->getTerm() + 1);
+ getReplCoord()->updateTerm(txn.get(), getReplCoord()->getTerm() + 1);
ASSERT_OK(getReplCoord()->waitForMemberState(MemberState::RS_SECONDARY, Seconds(1)));
status = getReplCoord()->setMaintenanceMode(false);
@@ -2248,11 +2240,11 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection)
// TODO this election shouldn't have to happen.
simulateSuccessfulV1Election();
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
// Step down from primary.
- getReplCoord()->updateTerm(&txn, getReplCoord()->getTerm() + 1);
+ getReplCoord()->updateTerm(txn.get(), getReplCoord()->getTerm() + 1);
getReplCoord()->waitForMemberState(MemberState::RS_SECONDARY, Milliseconds(10 * 1000));
// Can't modify maintenance mode when running for election (before and after dry run).
@@ -2379,8 +2371,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast
settings.setMaster(true);
init(settings);
HostAndPort clientHost("node2:12345");
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
OID client = OID::gen();
OpTimeWithTermZero time1(100, 1);
@@ -2389,7 +2381,7 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast
getExternalState()->setClientHostAndPort(clientHost);
HandshakeArgs handshake;
ASSERT_OK(handshake.initialize(BSON("handshake" << client)));
- ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake));
+ ASSERT_OK(getReplCoord()->processHandshake(txn.get(), handshake));
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2557,7 +2549,10 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) {
TEST_F(ReplCoordTest, LogAMessageWhenShutDownBeforeReplicationStartUpFinished) {
init();
startCapturingLogMessages();
- getReplCoord()->shutdown();
+ {
+ auto txn = makeOperationContext();
+ getReplCoord()->shutdown(txn.get());
+ }
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("shutdown() called before startup() finished"));
}
@@ -2595,11 +2590,11 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
writeConcern.wNumNodes = 1;
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
// receive updatePosition containing ourself, should not process the update for self
UpdatePositionArgs args;
@@ -2618,7 +2613,7 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf) {
@@ -2655,11 +2650,11 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
writeConcern.wNumNodes = 1;
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
// receive updatePosition containing ourself, should not process the update for self
OldUpdatePositionArgs args;
@@ -2675,7 +2670,7 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) {
@@ -2726,14 +2721,14 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 3))))));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
long long cfgver;
ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
getReplCoord()->processReplSetUpdatePosition(args, &cfgver));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorrect) {
@@ -2782,14 +2777,14 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre
<< OldUpdatePositionArgs::kOpTimeFieldName
<< time2.timestamp)))));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
long long cfgver;
ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
getReplCoord()->processReplSetUpdatePosition(args, &cfgver));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConfig) {
@@ -2840,12 +2835,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
<< UpdatePositionArgs::kAppliedOpTimeFieldName
<< BSON("ts" << time2.getTimestamp() << "t" << 2))))));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_EQUALS(ErrorCodes::NodeNotFound, getReplCoord()->processReplSetUpdatePosition(args, 0));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheConfig) {
@@ -2894,12 +2889,12 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC
<< OldUpdatePositionArgs::kOpTimeFieldName
<< time2.timestamp)))));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_EQUALS(ErrorCodes::NodeNotFound, getReplCoord()->processReplSetUpdatePosition(args, 0));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
TEST_F(ReplCoordTest,
@@ -2958,14 +2953,14 @@ TEST_F(ReplCoordTest,
<< OldUpdatePositionArgs::kOpTimeFieldName
<< time2.timestamp)))));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
- ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
writeConcern.wNumNodes = 3;
- ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+ ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time2, writeConcern).status);
}
void doReplSetReconfig(ReplicationCoordinatorImpl* replCoord, Status* status) {
@@ -3199,8 +3194,8 @@ TEST_F(ReplCoordTest,
writeConcern.wMode = WriteConcernOptions::kMajority;
writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE;
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ReplicationAwaiter awaiter(getReplCoord(), getServiceContext());
awaiter.setOpTime(time);
@@ -3214,7 +3209,7 @@ TEST_F(ReplCoordTest,
writeConcern.syncMode = WriteConcernOptions::SyncMode::NONE;
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, writeConcern2).status);
+ getReplCoord()->awaitReplication(txn.get(), time, writeConcern2).status);
// reconfig to three nodes
Status status(ErrorCodes::InternalError, "Not Set");
@@ -3277,30 +3272,30 @@ TEST_F(ReplCoordTest,
majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
majorityWriteConcern.syncMode = WriteConcernOptions::SyncMode::JOURNAL;
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 1, time));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status);
// this member does not vote and as a result should not count towards write concern
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 3, time));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 3, time));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time));
ASSERT_OK(getReplCoord()->setLastDurableOptime_forTest(2, 2, time));
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+ getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status);
getReplCoord()->onSnapshotCreate(time, SnapshotName(1));
- ASSERT_OK(getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+ ASSERT_OK(getReplCoord()->awaitReplication(txn.get(), time, majorityWriteConcern).status);
}
TEST_F(ReplCoordTest,
@@ -3391,13 +3386,12 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
- shutdown();
+ auto txn = makeOperationContext();
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ shutdown(txn.get());
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
@@ -3418,16 +3412,15 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
{
- stdx::lock_guard<Client> lk(*txn.getClient());
- txn.markKilled(ErrorCodes::Interrupted);
+ stdx::lock_guard<Client> lk(*(txn->getClient()));
+ txn->markKilled(ErrorCodes::Interrupted);
}
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
@@ -3445,10 +3438,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesNoOpTi
<< 0))),
HostAndPort("node1", 12345));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
-
- auto result = getReplCoord()->waitUntilOpTime(&txn, ReadConcernArgs());
+ auto txn = makeOperationContext();
+ auto result = getReplCoord()->waitUntilOpTime(txn.get(), ReadConcernArgs());
ASSERT(result.didWait());
ASSERT_OK(result.getStatus());
@@ -3469,11 +3460,9 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
-
+ auto txn = makeOperationContext();
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_OK(result.getStatus());
@@ -3496,11 +3485,11 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
getReplCoord()->setMyLastAppliedOpTime(time);
getReplCoord()->setMyLastDurableOpTime(time);
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(time, ReadConcernLevel::kLocalReadConcern));
+ txn.get(), ReadConcernArgs(time, ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_OK(result.getStatus());
@@ -3509,10 +3498,9 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
TEST_F(ReplCoordTest,
NodeReturnsNotAReplicaSetWhenWaitUntilOpTimeIsRunWithoutMajorityReadConcernEnabled) {
init(ReplSettings());
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_FALSE(result.didWait());
ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
@@ -3523,10 +3511,11 @@ TEST_F(ReplCoordTest, NodeReturnsNotAReplicaSetWhenWaitUntilOpTimeIsRunAgainstAS
settings.setMajorityReadConcernEnabled(true);
init(settings);
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ auto txn = makeOperationContext();
+
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));
+ txn.get(),
+ ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));
ASSERT_FALSE(result.didWait());
ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
@@ -3544,14 +3533,15 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
+
runSingleNodeElection(makeOperationContext(), getReplCoord());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
- shutdown();
-
auto txn = makeOperationContext();
+ shutdown(txn.get());
+
auto result = getReplCoord()->waitUntilOpTime(
txn.get(),
ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));
@@ -3572,19 +3562,19 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
<< 0))),
HostAndPort("node1", 12345));
runSingleNodeElection(makeOperationContext(), getReplCoord());
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
+ const auto txn = makeOperationContext();
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
{
- stdx::lock_guard<Client> lk(*txn.getClient());
- txn.markKilled(ErrorCodes::Interrupted);
+ stdx::lock_guard<Client> lk(*(txn->getClient()));
+ txn->markKilled(ErrorCodes::Interrupted);
}
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));
+ txn.get(),
+ ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
@@ -3785,9 +3775,8 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet
HostAndPort("node1", 12345));
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- getReplCoord()->updateTerm(&txn, 1);
+ auto txn = makeOperationContext();
+ getReplCoord()->updateTerm(txn.get(), 1);
ASSERT_EQUALS(1, getReplCoord()->getTerm());
OpTime time(Timestamp(10, 0), 1);
@@ -3851,9 +3840,8 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
<< 1),
HostAndPort("node1", 12345));
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- getReplCoord()->updateTerm(&txn, 1);
+ auto txn = makeOperationContext();
+ getReplCoord()->updateTerm(txn.get(), 1);
ASSERT_EQUALS(1, getReplCoord()->getTerm());
// higher term, should change
@@ -3931,9 +3919,8 @@ TEST_F(ReplCoordTest,
<< 1),
HostAndPort("node1", 12345));
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
- auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- getReplCoord()->updateTerm(&txn, 1);
+ auto txn = makeOperationContext();
+ getReplCoord()->updateTerm(txn.get(), 1);
ASSERT_EQUALS(1, getReplCoord()->getTerm());
auto replCoord = getReplCoord();
@@ -4805,9 +4792,8 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1)));
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
- replCoord->signalDrainComplete(&txn);
+ const auto txn = makeOperationContext();
+ replCoord->signalDrainComplete(txn.get());
ASSERT_OK(replCoord->waitForDrainFinish(timeout));
// Zero timeout is fine.
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index ca59fd1a833..91893d37325 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -53,7 +53,7 @@ void ReplicationCoordinatorMock::startup(OperationContext* txn) {
// TODO
}
-void ReplicationCoordinatorMock::shutdown() {
+void ReplicationCoordinatorMock::shutdown(OperationContext*) {
// TODO
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index d2343d883ea..ec76a9677ac 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -54,7 +54,7 @@ public:
virtual void startup(OperationContext* txn);
- virtual void shutdown();
+ virtual void shutdown(OperationContext* txn);
virtual ReplicationExecutor* getExecutor() override {
return nullptr;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 319f67c3893..5ffdce430db 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -83,7 +83,8 @@ void ReplCoordTest::tearDown() {
_externalState->setStoreLocalConfigDocumentToHang(false);
}
if (_callShutdown) {
- shutdown();
+ auto txn = makeOperationContext();
+ shutdown(txn.get());
}
}
@@ -267,8 +268,6 @@ void ReplCoordTest::simulateSuccessfulDryRun() {
}
void ReplCoordTest::simulateSuccessfulV1Election() {
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
ReplicationCoordinatorImpl* replCoord = getReplCoord();
NetworkInterfaceMock* net = getNet();
@@ -323,7 +322,10 @@ void ReplCoordTest::simulateSuccessfulV1Election() {
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- replCoord->signalDrainComplete(&txn);
+ {
+ auto txn = makeOperationContext();
+ replCoord->signalDrainComplete(txn.get());
+ }
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -332,8 +334,6 @@ void ReplCoordTest::simulateSuccessfulV1Election() {
}
void ReplCoordTest::simulateSuccessfulElection() {
- const auto txnPtr = makeOperationContext();
- auto& txn = *txnPtr;
ReplicationCoordinatorImpl* replCoord = getReplCoord();
NetworkInterfaceMock* net = getNet();
ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest();
@@ -384,7 +384,10 @@ void ReplCoordTest::simulateSuccessfulElection() {
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- replCoord->signalDrainComplete(&txn);
+ {
+ auto txn = makeOperationContext();
+ replCoord->signalDrainComplete(txn.get());
+ }
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -392,10 +395,10 @@ void ReplCoordTest::simulateSuccessfulElection() {
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
}
-void ReplCoordTest::shutdown() {
+void ReplCoordTest::shutdown(OperationContext* txn) {
invariant(_callShutdown);
_net->exitNetwork();
- _repl->shutdown();
+ _repl->shutdown(txn);
_callShutdown = false;
}
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index ad43f44a6ef..6e983aa346d 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -228,7 +228,7 @@ protected:
/**
* Shuts down the objects under test.
*/
- void shutdown();
+ void shutdown(OperationContext* txn);
/**
* Receive the heartbeat request from replication coordinator and reply with a response.
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index a82725ddc9c..8c50820b31e 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -96,7 +96,7 @@ void truncateAndResetOplog(OperationContext* txn,
// because the bgsync thread, while running, may update the blacklist.
replCoord->resetMyLastOpTimes();
bgsync->stop();
- bgsync->clearBuffer();
+ bgsync->clearBuffer(txn);
replCoord->clearSyncSourceBlacklist();
@@ -214,7 +214,7 @@ bool _initialSyncClone(OperationContext* txn,
* @param r the oplog reader.
* @return if applying the oplog succeeded.
*/
-bool _initialSyncApplyOplog(OperationContext* ctx,
+bool _initialSyncApplyOplog(OperationContext* txn,
repl::InitialSync* syncer,
OplogReader* r,
BackgroundSync* bgsync) {
@@ -224,9 +224,11 @@ bool _initialSyncApplyOplog(OperationContext* ctx,
// If the fail point is set, exit failing.
if (MONGO_FAIL_POINT(failInitSyncWithBufferedEntriesLeft)) {
log() << "adding fake oplog entry to buffer.";
- bgsync->pushTestOpToBuffer(BSON(
- "ts" << startOpTime.getTimestamp() << "t" << startOpTime.getTerm() << "v" << 1 << "op"
- << "n"));
+ bgsync->pushTestOpToBuffer(
+ txn,
+ BSON("ts" << startOpTime.getTimestamp() << "t" << startOpTime.getTerm() << "v" << 1
+ << "op"
+ << "n"));
return false;
}
@@ -267,7 +269,7 @@ bool _initialSyncApplyOplog(OperationContext* ctx,
// apply till stopOpTime
try {
LOG(2) << "Applying oplog entries from " << startOpTime << " until " << stopOpTime;
- syncer->oplogApplication(ctx, stopOpTime);
+ syncer->oplogApplication(txn, stopOpTime);
if (inShutdown()) {
return false;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 1e01f0d11cf..a4514386671 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -295,8 +295,8 @@ std::unique_ptr<OldThreadPool> SyncTail::makeWriterPool() {
return stdx::make_unique<OldThreadPool>(replWriterThreadCount, "repl writer worker ");
}
-bool SyncTail::peek(BSONObj* op) {
- return _networkQueue->peek(op);
+bool SyncTail::peek(OperationContext* txn, BSONObj* op) {
+ return _networkQueue->peek(txn, op);
}
// static
@@ -821,13 +821,13 @@ void SyncTail::oplogApplication() {
bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* ops) {
BSONObj op;
// Check to see if there are ops waiting in the bgsync queue
- bool peek_success = peek(&op);
+ bool peek_success = peek(txn, &op);
if (!peek_success) {
// if we don't have anything in the queue, wait a bit for something to appear
if (ops->empty()) {
// block up to 1 second
- _networkQueue->waitForMore();
+ _networkQueue->waitForMore(txn);
return false;
}
@@ -846,7 +846,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
if (ops->empty()) {
// apply commands one-at-a-time
ops->push_back(std::move(entry));
- _networkQueue->consume();
+ _networkQueue->consume(txn);
}
// otherwise, apply what we have so far and come back for the command
@@ -869,7 +869,7 @@ bool SyncTail::tryPopAndWaitForMore(OperationContext* txn, SyncTail::OpQueue* op
// Copy the op to the deque and remove it from the bgsync queue.
ops->push_back(std::move(entry));
- _networkQueue->consume();
+ _networkQueue->consume(txn);
// Go back for more ops
return false;
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 69cac0a3943..9df725e9f4a 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -108,7 +108,7 @@ public:
static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert);
void oplogApplication();
- bool peek(BSONObj* obj);
+ bool peek(OperationContext* txn, BSONObj* obj);
class OpQueue {
public: