summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/SConscript7
-rw-r--r--src/mongo/db/repl/data_replicator.cpp2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state.h3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp3
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/oplog.cpp35
-rw-r--r--src/mongo/db/repl/oplog.h8
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.cpp159
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection.h9
-rw-r--r--src/mongo/db/repl/oplog_buffer_collection_test.cpp39
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/storage_interface.h54
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp200
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h20
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp200
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h95
22 files changed, 585 insertions, 277 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f745889c129..40d8cde4504 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -71,6 +71,8 @@ env.Library(
],
LIBDEPS=[
'storage_interface',
+ '$BUILD_DIR/mongo/db/common',
+ '$BUILD_DIR/mongo/db/query/internal_plans',
'$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl
],
)
@@ -149,12 +151,9 @@ env.Library(
],
LIBDEPS=[
'storage_interface',
+ '$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/db/service_context',
],
- LIBDEPS_TAGS=[
- # Depends on files like db_raii.cpp from serverOnlyFiles
- 'incomplete',
- ],
)
env.CppUnitTest(
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 20c9745df64..4e089ed88c4 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -631,7 +631,7 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) {
_reporterPaused = true;
_applierPaused = true;
- _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer();
+ _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(txn);
_oplogBuffer->startup(nullptr);
ON_BLOCK_EXIT([this]() {
_oplogBuffer->shutdown(nullptr);
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h
index dc2ebac50e4..aaa7fb8dc2d 100644
--- a/src/mongo/db/repl/data_replicator_external_state.h
+++ b/src/mongo/db/repl/data_replicator_external_state.h
@@ -83,7 +83,8 @@ public:
/**
* This function creates an oplog buffer of the type specified at server startup.
*/
- virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const = 0;
+ virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
+ OperationContext* txn) const = 0;
/**
* Creates an oplog buffer suitable for steady state replication.
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
index afa71d3cd73..5569ed57b7e 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp
@@ -71,8 +71,9 @@ bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& sour
return false;
}
-std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer() const {
- return _replicationCoordinatorExternalState->makeInitialSyncOplogBuffer();
+std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer(
+ OperationContext* txn) const {
+ return _replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn);
}
std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeSteadyStateOplogBuffer() const {
diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h
index f4bfddf2290..b8866701745 100644
--- a/src/mongo/db/repl/data_replicator_external_state_impl.h
+++ b/src/mongo/db/repl/data_replicator_external_state_impl.h
@@ -53,7 +53,7 @@ public:
bool shouldStopFetching(const HostAndPort& source,
const rpc::ReplSetMetadata& metadata) override;
- std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override;
+ std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override;
std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override;
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index d2f4db170c7..d990a69e03c 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -57,7 +57,8 @@ bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& sour
return shouldStopFetchingResult;
}
-std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeInitialSyncOplogBuffer() const {
+std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeInitialSyncOplogBuffer(
+ OperationContext* txn) const {
return stdx::make_unique<OplogBufferBlockingQueue>();
}
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h
index 09036e825cb..918d40ebc73 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.h
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.h
@@ -50,7 +50,7 @@ public:
bool shouldStopFetching(const HostAndPort& source,
const rpc::ReplSetMetadata& metadata) override;
- std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override;
+ std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override;
std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override;
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 820f4b11c96..ef3c20448a4 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -438,41 +438,6 @@ void logOps(OperationContext* txn,
_logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime);
}
-OpTime writeOpsToOplog(OperationContext* txn,
- const std::string& oplogName,
- const std::vector<BSONObj>& ops) {
- OpTime lastOptime;
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- invariant(!ops.empty());
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
-
- if (_localOplogCollection == 0) {
- OldClientContext ctx(txn, oplogName);
-
- _localDB = ctx.db();
- verify(_localDB);
- _localOplogCollection = _localDB->getCollection(oplogName);
- massert(13389,
- "local.oplog.rs missing. did you drop it? if so restart server",
- _localOplogCollection);
- }
-
- OldClientContext ctx(txn, oplogName, _localDB);
- WriteUnitOfWork wunit(txn);
-
- OpDebug* const nullOpDebug = nullptr;
- checkOplogInsert(_localOplogCollection->insertDocuments(
- txn, ops.begin(), ops.end(), nullOpDebug, false));
- lastOptime =
- fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back()));
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns());
-
- return lastOptime;
-}
-
namespace {
long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSettings) {
if (replSettings.getOplogSizeBytes() != 0) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 9015af9e238..2d92230dc61 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -65,14 +65,6 @@ void createOplog(OperationContext* txn, const std::string& oplogCollectionName,
*/
void createOplog(OperationContext* txn);
-// This function writes ops into the replica-set oplog;
-// used internally by replication secondaries after they have applied ops.
-// Updates the global optime.
-// Returns the optime for the last op inserted.
-OpTime writeOpsToOplog(OperationContext* txn,
- const std::string& oplogName,
- const std::vector<BSONObj>& ops);
-
extern std::string rsOplogName;
extern std::string masterSlaveOplogName;
diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp
index aed801558be..09d18389e03 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection.cpp
@@ -31,17 +31,12 @@
#include "mongo/db/repl/oplog_buffer_collection.h"
+#include <algorithm>
+#include <iterator>
+#include <numeric>
+
#include "mongo/db/catalog/collection_options.h"
-#include "mongo/db/catalog/database.h"
-#include "mongo/db/client.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/query/find_and_modify_request.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/s/write_ops/batched_insert_request.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -70,10 +65,12 @@ BSONObj OplogBufferCollection::extractEmbeddedOplogDocument(const BSONObj& orig)
}
-OplogBufferCollection::OplogBufferCollection() : OplogBufferCollection(getDefaultNamespace()) {}
+OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface)
+ : OplogBufferCollection(storageInterface, getDefaultNamespace()) {}
-OplogBufferCollection::OplogBufferCollection(const NamespaceString& nss)
- : _nss(nss), _count(0), _size(0) {}
+OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface,
+ const NamespaceString& nss)
+ : _storageInterface(storageInterface), _nss(nss), _count(0), _size(0) {}
NamespaceString OplogBufferCollection::getNamespace() const {
return _nss;
@@ -102,37 +99,22 @@ void OplogBufferCollection::push(OperationContext* txn, const Value& value) {
bool OplogBufferCollection::pushAllNonBlocking(OperationContext* txn,
Batch::const_iterator begin,
Batch::const_iterator end) {
- size_t numDocs = 0;
- size_t docSize = 0;
- // TODO: use storage interface to insert documents.
- try {
- DBDirectClient client(txn);
-
- BatchedInsertRequest req;
- req.setNS(_nss);
- for (Batch::const_iterator it = begin; it != end; ++it) {
- req.addToDocuments(addIdToDocument(*it));
- numDocs++;
- docSize += it->objsize();
- }
-
- BSONObj res;
- client.runCommand(_nss.db().toString(), req.toBSON(), res);
-
- BatchedCommandResponse response;
- std::string errmsg;
- if (!response.parseBSON(res, &errmsg)) {
- return false;
- }
+ size_t numDocs = std::distance(begin, end);
+ Batch docsToInsert(numDocs);
+ std::transform(begin, end, docsToInsert.begin(), addIdToDocument);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _count += numDocs;
- _size += docSize;
- return true;
- } catch (const DBException& e) {
- LOG(1) << "Pushing oplog entries to OplogBufferCollection failed with: " << e;
+ auto status = _storageInterface->insertDocuments(txn, _nss, docsToInsert);
+ if (!status.isOK()) {
+ LOG(1) << "Pushing oplog entries to OplogBufferCollection failed with: " << status;
return false;
}
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _count += numDocs;
+ _size += std::accumulate(begin, end, 0U, [](const size_t& docSize, const Value& value) {
+ return docSize + size_t(value.objsize());
+ });
+ return true;
}
void OplogBufferCollection::waitForSpace(OperationContext* txn, std::size_t size) {}
@@ -165,37 +147,23 @@ void OplogBufferCollection::clear(OperationContext* txn) {
}
bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) {
- auto request = FindAndModifyRequest::makeRemove(_nss, BSONObj());
- request.setSort(BSON("_id" << 1));
-
- // TODO: use storage interface to find and remove document.
- try {
- DBDirectClient client(txn);
- BSONObj response;
- bool res = client.runCommand(_nss.db().toString(), request.toBSON(), response);
- if (!res) {
- return false;
- }
-
- if (auto okElem = response["ok"]) {
- if (!okElem.trueValue()) {
- return false;
- }
- }
-
- if (auto valueElem = response["value"]) {
- *value = extractEmbeddedOplogDocument(valueElem.Obj()).getOwned();
- } else {
- return false;
+ auto keyPattern = BSON("_id" << 1);
+ auto scanDirection = StorageInterface::ScanDirection::kForward;
+ auto result = _storageInterface->deleteOne(txn, _nss, keyPattern, scanDirection);
+ if (!result.isOK()) {
+ if (result != ErrorCodes::NoSuchKey) {
+ LOG(1) << "Popping oplog entries from OplogBufferCollection failed with: "
+ << result.getStatus();
}
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _count--;
- _size -= value->objsize();
- return true;
- } catch (const DBException& e) {
- LOG(1) << "Popping oplog entries from OplogBufferCollection failed with: " << e;
return false;
- };
+ }
+ *value = extractEmbeddedOplogDocument(result.getValue()).getOwned();
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_count > 0);
+ invariant(_size >= std::size_t(value->objsize()));
+ _count--;
+ _size -= value->objsize();
+ return true;
}
OplogBuffer::Value OplogBufferCollection::blockingPop(OperationContext* txn) {
@@ -223,52 +191,27 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed(
}
bool OplogBufferCollection::_peekOneSide(OperationContext* txn, Value* value, bool front) const {
- int asc = front ? 1 : -1;
- // TODO: use storage interface for to find document.
- try {
- auto query = Query();
- query.sort("_id", asc);
-
- DBDirectClient client(txn);
- BSONObj response = client.findOne(_nss.ns(), query);
- if (response.isEmpty()) {
- return false;
- }
- *value = extractEmbeddedOplogDocument(response).getOwned();
- return true;
- } catch (const DBException& e) {
- LOG(1) << "Peeking oplog entries from OplogBufferCollection failed with: " << e;
+ auto keyPattern = BSON("_id" << 1);
+ auto scanDirection = front ? StorageInterface::ScanDirection::kForward
+ : StorageInterface::ScanDirection::kBackward;
+ auto result = _storageInterface->findOne(txn, _nss, keyPattern, scanDirection);
+ if (!result.isOK()) {
+ LOG(1) << "Peeking oplog entries from OplogBufferCollection failed with: "
+ << result.getStatus();
return false;
}
+ *value = extractEmbeddedOplogDocument(result.getValue()).getOwned();
+ return true;
}
void OplogBufferCollection::_createCollection(OperationContext* txn) {
- // TODO: use storage interface to create collection.
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- AutoGetOrCreateDb databaseWriteGuard(txn, _nss.db(), MODE_X);
- auto db = databaseWriteGuard.getDb();
- invariant(db);
- WriteUnitOfWork wuow(txn);
- CollectionOptions options;
- options.temp = true;
- auto coll = db->createCollection(txn, _nss.ns(), options);
- invariant(coll);
- wuow.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "OplogBufferCollection::_createCollection", _nss.ns());
+ CollectionOptions options;
+ options.temp = true;
+ fassert(40154, _storageInterface->createCollection(txn, _nss, options));
}
void OplogBufferCollection::_dropCollection(OperationContext* txn) {
- // TODO: use storage interface to drop collection.
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- AutoGetOrCreateDb databaseWriteGuard(txn, _nss.db(), MODE_X);
- auto db = databaseWriteGuard.getDb();
- invariant(db);
- WriteUnitOfWork wuow(txn);
- db->dropCollection(txn, _nss.ns());
- wuow.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "OplogBufferCollection::_dropCollection", _nss.ns());
+ fassert(40155, _storageInterface->dropCollection(txn, _nss));
}
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h
index bf0e31c4f7d..1a50a25bc71 100644
--- a/src/mongo/db/repl/oplog_buffer_collection.h
+++ b/src/mongo/db/repl/oplog_buffer_collection.h
@@ -36,6 +36,8 @@
namespace mongo {
namespace repl {
+class StorageInterface;
+
/**
* Oplog buffer backed by a temporary collection. This collection is created in startup() and
* removed in shutdown(). The documents will be popped and peeked in timestamp order.
@@ -59,8 +61,8 @@ public:
*/
static BSONObj addIdToDocument(const BSONObj& orig);
- OplogBufferCollection();
- OplogBufferCollection(const NamespaceString& nss);
+ explicit OplogBufferCollection(StorageInterface* storageInterface);
+ OplogBufferCollection(StorageInterface* storageInterface, const NamespaceString& nss);
/**
* Returns the namespace string of the collection used by this oplog buffer.
@@ -105,6 +107,9 @@ private:
*/
bool _peekOneSide(OperationContext* txn, Value* value, bool front) const;
+ // Storage interface used to perform storage engine level functions on the collection.
+ StorageInterface* _storageInterface;
+
// The namespace for the oplog buffer collection.
const NamespaceString _nss;
diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
index 76a5e0e38e4..0eb88d17280 100644
--- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp
+++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp
@@ -57,6 +57,7 @@ protected:
protected:
ServiceContext::UniqueOperationContext makeOperationContext() const;
+ StorageInterface* _storageInterface = nullptr;
ServiceContext::UniqueOperationContext _txn;
private:
@@ -79,7 +80,9 @@ void OplogBufferCollectionTest::setUp() {
ReplicationCoordinator::set(serviceContext,
stdx::make_unique<ReplicationCoordinatorMock>(replSettings));
- StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceImpl>());
+ auto storageInterface = stdx::make_unique<StorageInterfaceImpl>();
+ _storageInterface = storageInterface.get();
+ StorageInterface::set(serviceContext, std::move(storageInterface));
_txn = makeOperationContext();
}
@@ -87,6 +90,8 @@ void OplogBufferCollectionTest::setUp() {
void OplogBufferCollectionTest::tearDown() {
_txn.reset();
+ _storageInterface = nullptr;
+
ServiceContextMongoDTest::tearDown();
}
@@ -122,17 +127,17 @@ BSONObj makeOplogEntry(int t) {
TEST_F(OplogBufferCollectionTest, DefaultNamespace) {
ASSERT_EQUALS(OplogBufferCollection::getDefaultNamespace(),
- OplogBufferCollection().getNamespace());
+ OplogBufferCollection(_storageInterface).getNamespace());
}
TEST_F(OplogBufferCollectionTest, GetNamespace) {
auto nss = makeNamespace(_agent);
- ASSERT_EQUALS(nss, OplogBufferCollection(nss).getNamespace());
+ ASSERT_EQUALS(nss, OplogBufferCollection(_storageInterface, nss).getNamespace());
}
TEST_F(OplogBufferCollectionTest, StartupCreatesCollection) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
// Collection should not exist until startup() is called.
ASSERT_FALSE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
@@ -143,7 +148,7 @@ TEST_F(OplogBufferCollectionTest, StartupCreatesCollection) {
TEST_F(OplogBufferCollectionTest, ShutdownDropsCollection) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection());
@@ -153,7 +158,7 @@ TEST_F(OplogBufferCollectionTest, ShutdownDropsCollection) {
TEST_F(OplogBufferCollectionTest, extractEmbeddedOplogDocumentChangesIdToTimestamp) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
const BSONObj expectedOp = makeOplogEntry(1);
BSONObj originalOp = BSON("_id" << Timestamp(1, 1) << "entry" << expectedOp);
@@ -162,7 +167,7 @@ TEST_F(OplogBufferCollectionTest, extractEmbeddedOplogDocumentChangesIdToTimesta
TEST_F(OplogBufferCollectionTest, addIdToDocumentChangesTimestampToId) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
const BSONObj originalOp = makeOplogEntry(1);
BSONObj expectedOp = BSON("_id" << Timestamp(1, 1) << "entry" << originalOp);
@@ -173,7 +178,7 @@ TEST_F(OplogBufferCollectionTest, addIdToDocumentChangesTimestampToId) {
TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocument) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
const std::vector<BSONObj> oplog = {makeOplogEntry(1)};
@@ -193,7 +198,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocum
TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAddsDocument) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
BSONObj oplog = makeOplogEntry(1);
@@ -213,7 +218,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAddsDocument) {
TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
BSONObj oplog = makeOplogEntry(1);
@@ -233,7 +238,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument)
TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
BSONObj oplog = makeOplogEntry(1);
@@ -258,7 +263,7 @@ TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) {
TEST_F(OplogBufferCollectionTest, PopRemovesDocument) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
BSONObj oplog = makeOplogEntry(1);
@@ -280,7 +285,7 @@ TEST_F(OplogBufferCollectionTest, PopRemovesDocument) {
TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
const std::vector<BSONObj> oplog = {
@@ -333,7 +338,7 @@ TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) {
TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNewestOplogEntry) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
const std::vector<BSONObj> oplog = {
@@ -350,7 +355,7 @@ TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNewestOplogEntry) {
TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNoneWithNoEntries) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
@@ -360,7 +365,7 @@ TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNoneWithNoEntries) {
TEST_F(OplogBufferCollectionTest, IsEmptyReturnsTrueWhenEmptyAndFalseWhenNot) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
BSONObj oplog = makeOplogEntry(1);
@@ -371,7 +376,7 @@ TEST_F(OplogBufferCollectionTest, IsEmptyReturnsTrueWhenEmptyAndFalseWhenNot) {
TEST_F(OplogBufferCollectionTest, ClearClearsCollection) {
auto nss = makeNamespace(_agent);
- OplogBufferCollection oplogBuffer(nss);
+ OplogBufferCollection oplogBuffer(_storageInterface, nss);
oplogBuffer.startup(_txn.get());
BSONObj oplog = makeOplogEntry(1);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 8307ac7b80b..cd04eeb464b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -293,7 +293,8 @@ public:
/**
* This function creates an oplog buffer of the type specified at server startup.
*/
- virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const = 0;
+ virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
+ OperationContext* txn) const = 0;
/**
* Creates an oplog buffer suitable for steady state replication.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index e101dc120e7..2a22b2d7057 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -566,10 +566,10 @@ void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply(
repl::multiInitialSyncApply(ops, &syncTail);
}
-std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer()
- const {
+std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer(
+ OperationContext* txn) const {
if (initialSyncOplogBuffer == kCollectionOplogBufferName) {
- return stdx::make_unique<OplogBufferCollection>();
+ return stdx::make_unique<OplogBufferCollection>(StorageInterface::get(txn));
} else {
return stdx::make_unique<OplogBufferBlockingQueue>();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 8ab9d47f31e..fd89ca1f80a 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -97,7 +97,8 @@ public:
virtual void multiSyncApply(const MultiApplier::Operations& ops) override;
virtual void multiInitialSyncApply(const MultiApplier::Operations& ops,
const HostAndPort& source) override;
- virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override;
+ virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
+ OperationContext* txn) const override;
virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override;
virtual bool shouldUseDataReplicatorInitialSync() const override;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index f1f2770677e..5ef6e476c65 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -256,8 +256,8 @@ void ReplicationCoordinatorExternalStateMock::multiSyncApply(const MultiApplier:
void ReplicationCoordinatorExternalStateMock::multiInitialSyncApply(
const MultiApplier::Operations& ops, const HostAndPort& source) {}
-std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateMock::makeInitialSyncOplogBuffer()
- const {
+std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateMock::makeInitialSyncOplogBuffer(
+ OperationContext* txn) const {
return stdx::make_unique<OplogBufferBlockingQueue>();
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 87583512504..70cdd0f9f06 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -97,7 +97,8 @@ public:
virtual void multiSyncApply(const MultiApplier::Operations& ops) override;
virtual void multiInitialSyncApply(const MultiApplier::Operations& ops,
const HostAndPort& source) override;
- virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override;
+ virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(
+ OperationContext* txn) const override;
virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override;
virtual bool shouldUseDataReplicatorInitialSync() const override;
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 1a45d84021e..e7321f03d87 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -100,22 +100,6 @@ class StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterface);
public:
- // Used for testing.
-
- using CreateCollectionFn = stdx::function<StatusWith<std::unique_ptr<CollectionBulkLoader>>(
- const NamespaceString& nss,
- const CollectionOptions& options,
- const BSONObj idIndexSpec,
- const std::vector<BSONObj>& secondaryIndexSpecs)>;
- using InsertDocumentFn = stdx::function<Status(
- OperationContext* txn, const NamespaceString& nss, const BSONObj& doc)>;
- using InsertOplogDocumentsFn = stdx::function<StatusWith<OpTime>(
- OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops)>;
- using DropUserDatabasesFn = stdx::function<Status(OperationContext* txn)>;
- using CreateOplogFn = stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>;
- using DropCollectionFn =
- stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>;
-
// Operation Context binding.
static StorageInterface* get(ServiceContext* service);
static StorageInterface* get(ServiceContext& service);
@@ -209,17 +193,25 @@ public:
const BSONObj& doc) = 0;
/**
- * Inserts the given documents into the oplog, returning the last written OpTime.
+ * Inserts the given documents into the collection.
*/
- virtual StatusWith<OpTime> insertOplogDocuments(OperationContext* txn,
- const NamespaceString& nss,
- const std::vector<BSONObj>& ops) = 0;
+ virtual Status insertDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) = 0;
+
/**
* Creates the initial oplog, errors if it exists.
*/
virtual Status createOplog(OperationContext* txn, const NamespaceString& nss) = 0;
/**
+ * Creates a collection.
+ */
+ virtual Status createCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options) = 0;
+
+ /**
* Drops a collection, like the oplog.
*/
virtual Status dropCollection(OperationContext* txn, const NamespaceString& nss) = 0;
@@ -233,6 +225,28 @@ public:
* Validates that the admin database is valid during initial sync.
*/
virtual Status isAdminDbValid(OperationContext* txn) = 0;
+
+ /**
+ * Finds the first document returned by an index scan on the collection in the requested
+ * direction.
+ */
+ enum class ScanDirection {
+ kForward = 1,
+ kBackward = -1,
+ };
+ virtual StatusWith<BSONObj> findOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) = 0;
+
+ /**
+ * Deletes the first document returned by an index scan on the collection in the requested
+ * direction. Returns deleted document on success.
+ */
+ virtual StatusWith<BSONObj> deleteOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 3acbad55326..1f1319611a6 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/repl/storage_interface_impl.h"
#include <algorithm>
+#include <utility>
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
@@ -48,8 +49,11 @@
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/delete.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/keypattern.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/collection_bulk_loader_impl.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -245,24 +249,10 @@ StatusWith<OpTime> StorageInterfaceImpl::writeOpsToOplog(
auto toBSON = [](const OplogEntry& entry) { return entry.raw; };
std::transform(operations.begin(), operations.end(), ops.begin(), toBSON);
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- AutoGetCollection collectionWriteGuard(txn, nss, MODE_X);
- auto collection = collectionWriteGuard.getCollection();
- if (!collection) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "unable to write operations to oplog at " << nss.ns()
- << ": collection not found. Did you drop it?"};
- }
-
- WriteUnitOfWork wunit(txn);
- OpDebug* const nullOpDebug = nullptr;
- auto status = collection->insertDocuments(txn, ops.begin(), ops.end(), nullOpDebug, false);
- if (!status.isOK()) {
- return status;
- }
- wunit.commit();
+ auto status = insertDocuments(txn, nss, ops);
+ if (!status.isOK()) {
+ return status;
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOpsToOplog", nss.ns());
return operations.back().getOpTime();
}
@@ -345,47 +335,33 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
Status StorageInterfaceImpl::insertDocument(OperationContext* txn,
const NamespaceString& nss,
const BSONObj& doc) {
- ScopedTransaction transaction(txn, MODE_IX);
- AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX);
- AutoGetCollection autoColl(txn, nss, MODE_IX);
- if (!autoColl.getCollection()) {
- return {ErrorCodes::NamespaceNotFound,
- "The collection must exist before inserting documents."};
- }
- WriteUnitOfWork wunit(txn);
- const auto status =
- autoColl.getCollection()->insertDocument(txn, doc, nullptr /** OpDebug **/, false, false);
- if (status.isOK()) {
- wunit.commit();
- }
- return status;
+ return insertDocuments(txn, nss, {doc});
}
-StatusWith<OpTime> StorageInterfaceImpl::insertOplogDocuments(OperationContext* txn,
- const NamespaceString& nss,
- const std::vector<BSONObj>& ops) {
+Status StorageInterfaceImpl::insertDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX);
AutoGetCollection autoColl(txn, nss, MODE_IX);
- if (!autoColl.getCollection()) {
- return {
- ErrorCodes::NamespaceNotFound,
- str::stream() << "The oplog collection must exist before inserting documents, ns:"
- << nss.ns()};
- }
- if (!autoColl.getCollection()->isCapped()) {
+ auto collection = autoColl.getCollection();
+ if (!collection) {
return {ErrorCodes::NamespaceNotFound,
- str::stream() << "The oplog collection must be capped, ns:" << nss.ns()};
+ str::stream() << "The collection must exist before inserting documents, ns:"
+ << nss.ns()};
}
WriteUnitOfWork wunit(txn);
- const auto lastOpTime = repl::writeOpsToOplog(txn, nss.ns(), ops);
+ OpDebug* const nullOpDebug = nullptr;
+ auto status =
+ collection->insertDocuments(txn, docs.begin(), docs.end(), nullOpDebug, false);
+ if (!status.isOK()) {
+ return status;
+ }
wunit.commit();
- return lastOpTime;
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::insertOplogDocuments", nss.ns());
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::insertDocuments", nss.ns());
+
+ return Status::OK();
}
Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* txn) {
@@ -398,6 +374,26 @@ Status StorageInterfaceImpl::createOplog(OperationContext* txn, const NamespaceS
return Status::OK();
}
+Status StorageInterfaceImpl::createCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ AutoGetOrCreateDb databaseWriteGuard(txn, nss.db(), MODE_X);
+ auto db = databaseWriteGuard.getDb();
+ invariant(db);
+ if (db->getCollection(nss)) {
+ return {ErrorCodes::NamespaceExists,
+ str::stream() << "Collection " << nss.ns() << " already exists."};
+ }
+ WriteUnitOfWork wuow(txn);
+ auto coll = db->createCollection(txn, nss.ns(), options);
+ invariant(coll);
+ wuow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::createCollection", nss.ns());
+ return Status::OK();
+}
+
Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const NamespaceString& nss) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IX);
@@ -412,6 +408,114 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const Namespa
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::dropCollection", nss.ns());
}
+namespace {
+
+/**
+ * Returns DeleteStageParams for deleteOne with fetch.
+ */
+DeleteStageParams makeDeleteStageParamsForDeleteOne() {
+ DeleteStageParams deleteStageParams;
+ invariant(!deleteStageParams.isMulti);
+ deleteStageParams.returnDeleted = true;
+ return deleteStageParams;
+}
+
+/**
+ * Shared implementation between findOne and deleteOne.
+ */
+enum class FindDeleteMode { kFind, kDelete };
+StatusWith<BSONObj> _findOrDeleteOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ StorageInterface::ScanDirection scanDirection,
+ FindDeleteMode mode) {
+ auto isFind = mode == FindDeleteMode::kFind;
+ auto opStr = isFind ? "StorageInterfaceImpl::findOne" : "StorageInterfaceImpl::deleteOne";
+
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ auto collectionAccessMode = isFind ? MODE_IS : MODE_IX;
+ AutoGetCollection collectionGuard(txn, nss, collectionAccessMode);
+ auto collection = collectionGuard.getCollection();
+ if (!collection) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection not found, ns:" << nss.ns()};
+ }
+ auto indexCatalog = collection->getIndexCatalog();
+ invariant(indexCatalog);
+ bool includeUnfinishedIndexes = false;
+ auto indexDescriptor =
+ indexCatalog->findIndexByKeyPattern(txn, indexKeyPattern, includeUnfinishedIndexes);
+ if (!indexDescriptor) {
+ return {ErrorCodes::IndexNotFound,
+ str::stream() << "Index not found, ns:" << nss.ns() << ", index: "
+ << indexKeyPattern};
+ }
+ if (indexDescriptor->isPartial()) {
+ return {ErrorCodes::IndexOptionsConflict,
+ str::stream() << "Partial index is not allowed for this operation, ns:"
+ << nss.ns()
+ << ", index: "
+ << indexKeyPattern};
+ }
+
+ KeyPattern keyPattern(indexKeyPattern);
+ auto minKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, false));
+ auto maxKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, true));
+ auto isForward = scanDirection == StorageInterface::ScanDirection::kForward;
+ auto bounds = isForward ? std::make_pair(minKey, maxKey) : std::make_pair(maxKey, minKey);
+ bool endKeyInclusive = false;
+ auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD;
+
+ std::unique_ptr<PlanExecutor> planExecutor = isFind
+ ? InternalPlanner::indexScan(txn,
+ collection,
+ indexDescriptor,
+ bounds.first,
+ bounds.second,
+ endKeyInclusive,
+ PlanExecutor::YIELD_MANUAL,
+ direction,
+ InternalPlanner::IXSCAN_FETCH)
+ : InternalPlanner::deleteWithIndexScan(txn,
+ collection,
+ makeDeleteStageParamsForDeleteOne(),
+ indexDescriptor,
+ bounds.first,
+ bounds.second,
+ endKeyInclusive,
+ PlanExecutor::YIELD_MANUAL,
+ direction);
+
+ BSONObj doc;
+ auto state = planExecutor->getNext(&doc, nullptr);
+ if (PlanExecutor::IS_EOF == state) {
+ return {ErrorCodes::NoSuchKey,
+ str::stream() << "Collection is empty, ns: " << nss.ns() << ", index: "
+ << indexKeyPattern};
+ }
+ invariant(PlanExecutor::ADVANCED == state);
+ return doc;
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, opStr, nss.ns());
+ MONGO_UNREACHABLE;
+}
+
+} // namespace
+
+StatusWith<BSONObj> StorageInterfaceImpl::findOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) {
+ return _findOrDeleteOne(txn, nss, indexKeyPattern, scanDirection, FindDeleteMode::kFind);
+}
+
+StatusWith<BSONObj> StorageInterfaceImpl::deleteOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) {
+ return _findOrDeleteOne(txn, nss, indexKeyPattern, scanDirection, FindDeleteMode::kDelete);
+}
+
Status StorageInterfaceImpl::isAdminDbValid(OperationContext* txn) {
log() << "StorageInterfaceImpl::isAdminDbValid called.";
// TODO: plumb through operation context from caller, for now run on ioThread with runner.
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 9f05270ef1f..0eb2f47e406 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -94,16 +94,30 @@ public:
const NamespaceString& nss,
const BSONObj& doc) override;
- StatusWith<OpTime> insertOplogDocuments(OperationContext* txn,
- const NamespaceString& nss,
- const std::vector<BSONObj>& ops) override;
+ Status insertDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) override;
Status dropReplicatedDatabases(OperationContext* txn) override;
Status createOplog(OperationContext* txn, const NamespaceString& nss) override;
+ Status createCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options) override;
+
Status dropCollection(OperationContext* txn, const NamespaceString& nss) override;
+ StatusWith<BSONObj> findOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) override;
+
+ StatusWith<BSONObj> deleteOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) override;
+
Status isAdminDbValid(OperationContext* txn) override;
private:
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 31a6a814209..662daf84274 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -51,6 +51,7 @@
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/mongoutils/str.h"
namespace {
@@ -58,6 +59,14 @@ using namespace mongo;
using namespace mongo::repl;
/**
+ * Generates a unique namespace from the test registration agent.
+ */
+template <typename T>
+NamespaceString makeNamespace(const T& t, const char* suffix = "") {
+ return NamespaceString("local." + t.getSuiteName() + "_" + t.getTestName() + suffix);
+}
+
+/**
* Returns min valid document.
*/
BSONObj getMinValidDocument(OperationContext* txn, const NamespaceString& minValidNss) {
@@ -374,7 +383,7 @@ TEST_F(StorageInterfaceImplTest,
auto txn = getClient()->makeOperationContext();
auto status = storageInterface.writeOpsToOplog(txn.get(), nss, {op}).getStatus();
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
- ASSERT_STRING_CONTAINS(status.reason(), "collection not found");
+ ASSERT_STRING_CONTAINS(status.reason(), "The collection must exist before inserting documents");
}
TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocWorksOnExistingCappedCollection) {
@@ -463,6 +472,26 @@ TEST_F(StorageInterfaceImplWithReplCoordTest, CreateOplogCreateCappedCollection)
}
}
+TEST_F(StorageInterfaceImplWithReplCoordTest, CreateCollectionFailsIfCollectionExists) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ {
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_FALSE(autoColl.getCollection());
+ }
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ {
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_TRUE(autoColl.getCollection());
+ ASSERT_EQ(nss.toString(), autoColl.getCollection()->ns().toString());
+ }
+ auto status = storage.createCollection(txn, nss, CollectionOptions());
+ ASSERT_EQUALS(ErrorCodes::NamespaceExists, status);
+ ASSERT_STRING_CONTAINS(status.reason(),
+ str::stream() << "Collection " << nss.ns() << " already exists");
+}
+
TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithExistingWithDataCollection) {
auto txn = getOperationContext();
StorageInterfaceImpl storage;
@@ -491,4 +520,173 @@ TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithMissingColl
ASSERT_FALSE(autoColl.getCollection());
}
+TEST_F(StorageInterfaceImplWithReplCoordTest, FindOneReturnsInvalidNamespaceIfCollectionIsMissing) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound,
+ storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, FindOneReturnsIndexNotFoundIfIndexIsMissing) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("x" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_EQUALS(ErrorCodes::IndexNotFound,
+ storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest,
+ FindOneReturnsIndexOptionsConflictIfIndexIsAPartialIndex) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ storage.startup();
+ auto nss = makeNamespace(_agent);
+ std::vector<BSONObj> indexes = {BSON("v" << 1 << "key" << BSON("x" << 1) << "name"
+ << "x_1"
+ << "ns"
+ << nss.ns()
+ << "partialFilterExpression"
+ << BSON("y" << 1))};
+ auto loader = unittest::assertGet(
+ storage.createCollectionForBulkLoading(nss, CollectionOptions(), {}, indexes));
+ std::vector<BSONObj> docs = {BSON("_id" << 1), BSON("_id" << 1), BSON("_id" << 2)};
+ ASSERT_OK(loader->insertDocuments(docs.begin(), docs.end()));
+ ASSERT_OK(loader->commit());
+ auto keyPattern = BSON("x" << 1);
+ ASSERT_EQUALS(ErrorCodes::IndexOptionsConflict,
+ storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, FindOneReturnsNoSuchKeyIfCollectionIsEmpty) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey,
+ storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest,
+ FindOneReturnsDocumentWithLowestKeyValueIfScanDirectionIsForward) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_OK(
+ storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)}));
+ ASSERT_EQUALS(BSON("_id" << 0),
+ storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+
+ // Check collection contents. OplogInterface returns documents in reverse natural order.
+ OplogInterfaceLocal oplog(txn, nss.ns());
+ auto iter = oplog.makeIterator();
+ ASSERT_EQUALS(BSON("_id" << 2), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(BSON("_id" << 0), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest,
+ FindOneReturnsDocumentWithHighestKeyValueIfScanDirectionIsBackward) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_OK(
+ storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)}));
+ ASSERT_EQUALS(
+ BSON("_id" << 2),
+ storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kBackward));
+
+ // Check collection contents. OplogInterface returns documents in reverse natural order.
+ OplogInterfaceLocal oplog(txn, nss.ns());
+ auto iter = oplog.makeIterator();
+ ASSERT_EQUALS(BSON("_id" << 2), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(BSON("_id" << 0), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest,
+ DeleteOneReturnsInvalidNamespaceIfCollectionIsMissing) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_EQUALS(
+ ErrorCodes::NamespaceNotFound,
+ storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, DeleteOneReturnsIndexNotFoundIfIndexIsMissing) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("x" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_EQUALS(
+ ErrorCodes::IndexNotFound,
+ storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, DeleteOneReturnsNoSuchKeyIfCollectionIsEmpty) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_EQUALS(
+ ErrorCodes::NoSuchKey,
+ storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest,
+ DeleteOneReturnsDocumentWithLowestKeyValueIfScanDirectionIsForward) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_OK(
+ storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)}));
+ ASSERT_EQUALS(
+ BSON("_id" << 0),
+ storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward));
+
+ // Check collection contents. OplogInterface returns documents in reverse natural order.
+ OplogInterfaceLocal oplog(txn, nss.ns());
+ auto iter = oplog.makeIterator();
+ ASSERT_EQUALS(BSON("_id" << 2), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest,
+ DeleteOneReturnsDocumentWithHighestKeyValueIfScanDirectionIsBackward) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ auto nss = makeNamespace(_agent);
+ auto keyPattern = BSON("_id" << 1);
+ ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions()));
+ ASSERT_OK(
+ storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)}));
+ ASSERT_EQUALS(
+ BSON("_id" << 2),
+ storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kBackward));
+
+ // Check collection contents. OplogInterface returns documents in reverse natural order.
+ OplogInterfaceLocal oplog(txn, nss.ns());
+ auto iter = oplog.makeIterator();
+ ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(BSON("_id" << 0), unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus());
+}
+
} // namespace
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 143d5d367cd..b25d91f8978 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -87,6 +87,33 @@ class StorageInterfaceMock : public StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterfaceMock);
public:
+ // Used for testing.
+
+ using CreateCollectionForBulkFn =
+ stdx::function<StatusWith<std::unique_ptr<CollectionBulkLoader>>(
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs)>;
+ using InsertDocumentFn = stdx::function<Status(
+ OperationContext* txn, const NamespaceString& nss, const BSONObj& doc)>;
+ using InsertDocumentsFn = stdx::function<Status(
+ OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs)>;
+ using DropUserDatabasesFn = stdx::function<Status(OperationContext* txn)>;
+ using CreateOplogFn = stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>;
+ using CreateCollectionFn = stdx::function<Status(
+ OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options)>;
+ using DropCollectionFn =
+ stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>;
+ using FindOneFn = stdx::function<StatusWith<BSONObj>(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection)>;
+ using DeleteOneFn = stdx::function<StatusWith<BSONObj>(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection)>;
+
StorageInterfaceMock() = default;
void startup() override;
@@ -113,7 +140,7 @@ public:
const CollectionOptions& options,
const BSONObj idIndexSpec,
const std::vector<BSONObj>& secondaryIndexSpecs) override {
- return createCollectionFn(nss, options, idIndexSpec, secondaryIndexSpecs);
+ return createCollectionForBulkFn(nss, options, idIndexSpec, secondaryIndexSpecs);
};
Status insertDocument(OperationContext* txn,
@@ -122,10 +149,10 @@ public:
return insertDocumentFn(txn, nss, doc);
};
- StatusWith<OpTime> insertOplogDocuments(OperationContext* txn,
- const NamespaceString& nss,
- const std::vector<BSONObj>& ops) override {
- return insertOplogDocumentsFn(txn, nss, ops);
+ Status insertDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& docs) override {
+ return insertDocumentsFn(txn, nss, docs);
}
Status dropReplicatedDatabases(OperationContext* txn) override {
@@ -136,31 +163,51 @@ public:
return createOplogFn(txn, nss);
};
+ Status createCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ const CollectionOptions& options) override {
+ return createCollFn(txn, nss, options);
+ }
+
Status dropCollection(OperationContext* txn, const NamespaceString& nss) override {
return dropCollFn(txn, nss);
};
+ StatusWith<BSONObj> findOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) override {
+ return findOneFn(txn, nss, indexKeyPattern, scanDirection);
+ }
+
+ StatusWith<BSONObj> deleteOne(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) override {
+ return deleteOneFn(txn, nss, indexKeyPattern, scanDirection);
+ }
+
Status isAdminDbValid(OperationContext* txn) override {
return Status::OK();
};
// Testing functions.
- CreateCollectionFn createCollectionFn = [](const NamespaceString& nss,
- const CollectionOptions& options,
- const BSONObj idIndexSpec,
- const std::vector<BSONObj>& secondaryIndexSpecs)
- -> StatusWith<std::unique_ptr<CollectionBulkLoader>> {
- return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."};
- };
+ CreateCollectionForBulkFn createCollectionForBulkFn =
+ [](const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>&
+ secondaryIndexSpecs) -> StatusWith<std::unique_ptr<CollectionBulkLoader>> {
+ return Status{ErrorCodes::IllegalOperation, "CreateCollectionForBulkFn not implemented."};
+ };
InsertDocumentFn insertDocumentFn =
[](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) {
return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."};
};
- InsertOplogDocumentsFn insertOplogDocumentsFn =
- [](OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops) {
- return StatusWith<OpTime>(
- Status{ErrorCodes::IllegalOperation, "InsertOplogDocumentsFn not implemented."});
+ InsertDocumentsFn insertDocumentsFn =
+ [](OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ return Status{ErrorCodes::IllegalOperation, "InsertDocumentsFn not implemented."};
};
DropUserDatabasesFn dropUserDBsFn = [](OperationContext* txn) {
return Status{ErrorCodes::IllegalOperation, "DropUserDatabasesFn not implemented."};
@@ -168,9 +215,25 @@ public:
CreateOplogFn createOplogFn = [](OperationContext* txn, const NamespaceString& nss) {
return Status{ErrorCodes::IllegalOperation, "CreateOplogFn not implemented."};
};
+ CreateCollectionFn createCollFn =
+ [](OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options) {
+ return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."};
+ };
DropCollectionFn dropCollFn = [](OperationContext* txn, const NamespaceString& nss) {
return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."};
};
+ FindOneFn findOneFn = [](OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) {
+ return Status{ErrorCodes::IllegalOperation, "FindOneFn not implemented."};
+ };
+ DeleteOneFn deleteOneFn = [](OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& indexKeyPattern,
+ ScanDirection scanDirection) {
+ return Status{ErrorCodes::IllegalOperation, "DeleteOneFn not implemented."};
+ };
private:
mutable stdx::mutex _initialSyncFlagMutex;