diff options
author | Benety Goh <benety@mongodb.com> | 2016-06-17 14:31:49 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-06-20 15:38:07 -0400 |
commit | eaf507a65787759935e786a15ea58efb91fc004b (patch) | |
tree | 2138c56d626be859677918cfe9ab1446ab1490bb /src/mongo | |
parent | 66b4268b58f94de74ea0f464ba4bd1037fa43efb (diff) | |
download | mongo-eaf507a65787759935e786a15ea58efb91fc004b.tar.gz |
SERVER-24490 oplog buffer collection should use storage interface
Diffstat (limited to 'src/mongo')
22 files changed, 585 insertions, 277 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index f745889c129..40d8cde4504 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -71,6 +71,8 @@ env.Library( ], LIBDEPS=[ 'storage_interface', + '$BUILD_DIR/mongo/db/common', + '$BUILD_DIR/mongo/db/query/internal_plans', '$BUILD_DIR/mongo/db/serveronly', # For OperationContextImpl ], ) @@ -149,12 +151,9 @@ env.Library( ], LIBDEPS=[ 'storage_interface', + '$BUILD_DIR/mongo/db/catalog/collection_options', '$BUILD_DIR/mongo/db/service_context', ], - LIBDEPS_TAGS=[ - # Depends on files like db_raii.cpp from serverOnlyFiles - 'incomplete', - ], ) env.CppUnitTest( diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 20c9745df64..4e089ed88c4 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -631,7 +631,7 @@ TimestampStatus DataReplicator::initialSync(OperationContext* txn) { _reporterPaused = true; _applierPaused = true; - _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(); + _oplogBuffer = _dataReplicatorExternalState->makeInitialSyncOplogBuffer(txn); _oplogBuffer->startup(nullptr); ON_BLOCK_EXIT([this]() { _oplogBuffer->shutdown(nullptr); diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index dc2ebac50e4..aaa7fb8dc2d 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -83,7 +83,8 @@ public: /** * This function creates an oplog buffer of the type specified at server startup. */ - virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const = 0; + virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( + OperationContext* txn) const = 0; /** * Creates an oplog buffer suitable for steady state replication. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index afa71d3cd73..5569ed57b7e 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -71,8 +71,9 @@ bool DataReplicatorExternalStateImpl::shouldStopFetching(const HostAndPort& sour return false; } -std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer() const { - return _replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(); +std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeInitialSyncOplogBuffer( + OperationContext* txn) const { + return _replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn); } std::unique_ptr<OplogBuffer> DataReplicatorExternalStateImpl::makeSteadyStateOplogBuffer() const { diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index f4bfddf2290..b8866701745 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -53,7 +53,7 @@ public: bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& metadata) override; - std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override; + std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override; std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index d2f4db170c7..d990a69e03c 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -57,7 +57,8 @@ bool DataReplicatorExternalStateMock::shouldStopFetching(const HostAndPort& sour return shouldStopFetchingResult; } -std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeInitialSyncOplogBuffer() const { +std::unique_ptr<OplogBuffer> DataReplicatorExternalStateMock::makeInitialSyncOplogBuffer( + OperationContext* txn) const { return stdx::make_unique<OplogBufferBlockingQueue>(); } diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 09036e825cb..918d40ebc73 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -50,7 +50,7 @@ public: bool shouldStopFetching(const HostAndPort& source, const rpc::ReplSetMetadata& metadata) override; - std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override; + std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer(OperationContext* txn) const override; std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 820f4b11c96..ef3c20448a4 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -438,41 +438,6 @@ void logOps(OperationContext* txn, _logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, slots[count - 1].opTime); } -OpTime writeOpsToOplog(OperationContext* txn, - const std::string& oplogName, - const std::vector<BSONObj>& ops) { - OpTime lastOptime; - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - invariant(!ops.empty()); - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - - if (_localOplogCollection == 0) { - OldClientContext ctx(txn, oplogName); - - _localDB = ctx.db(); - verify(_localDB); - _localOplogCollection = _localDB->getCollection(oplogName); - massert(13389, - "local.oplog.rs missing. did you drop it? if so restart server", - _localOplogCollection); - } - - OldClientContext ctx(txn, oplogName, _localDB); - WriteUnitOfWork wunit(txn); - - OpDebug* const nullOpDebug = nullptr; - checkOplogInsert(_localOplogCollection->insertDocuments( - txn, ops.begin(), ops.end(), nullOpDebug, false)); - lastOptime = - fassertStatusOK(ErrorCodes::InvalidBSON, OpTime::parseFromOplogEntry(ops.back())); - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOps", _localOplogCollection->ns().ns()); - - return lastOptime; -} - namespace { long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSettings) { if (replSettings.getOplogSizeBytes() != 0) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 9015af9e238..2d92230dc61 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -65,14 +65,6 @@ void createOplog(OperationContext* txn, const std::string& oplogCollectionName, */ void createOplog(OperationContext* txn); -// This function writes ops into the replica-set oplog; -// used internally by replication secondaries after they have applied ops. -// Updates the global optime. -// Returns the optime for the last op inserted. -OpTime writeOpsToOplog(OperationContext* txn, - const std::string& oplogName, - const std::vector<BSONObj>& ops); - extern std::string rsOplogName; extern std::string masterSlaveOplogName; diff --git a/src/mongo/db/repl/oplog_buffer_collection.cpp b/src/mongo/db/repl/oplog_buffer_collection.cpp index aed801558be..09d18389e03 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection.cpp @@ -31,17 +31,12 @@ #include "mongo/db/repl/oplog_buffer_collection.h" +#include <algorithm> +#include <iterator> +#include <numeric> + #include "mongo/db/catalog/collection_options.h" -#include "mongo/db/catalog/database.h" -#include "mongo/db/client.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/query/find_and_modify_request.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/s/write_ops/batched_insert_request.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/util/log.h" namespace mongo { @@ -70,10 +65,12 @@ BSONObj OplogBufferCollection::extractEmbeddedOplogDocument(const BSONObj& orig) } -OplogBufferCollection::OplogBufferCollection() : OplogBufferCollection(getDefaultNamespace()) {} +OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface) + : OplogBufferCollection(storageInterface, getDefaultNamespace()) {} -OplogBufferCollection::OplogBufferCollection(const NamespaceString& nss) - : _nss(nss), _count(0), _size(0) {} +OplogBufferCollection::OplogBufferCollection(StorageInterface* storageInterface, + const NamespaceString& nss) + : _storageInterface(storageInterface), _nss(nss), _count(0), _size(0) {} NamespaceString OplogBufferCollection::getNamespace() const { return _nss; @@ -102,37 +99,22 @@ void OplogBufferCollection::push(OperationContext* txn, const Value& value) { bool OplogBufferCollection::pushAllNonBlocking(OperationContext* txn, Batch::const_iterator begin, Batch::const_iterator end) { - size_t numDocs = 0; - size_t docSize = 0; - // TODO: use storage interface to insert documents. - try { - DBDirectClient client(txn); - - BatchedInsertRequest req; - req.setNS(_nss); - for (Batch::const_iterator it = begin; it != end; ++it) { - req.addToDocuments(addIdToDocument(*it)); - numDocs++; - docSize += it->objsize(); - } - - BSONObj res; - client.runCommand(_nss.db().toString(), req.toBSON(), res); - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - return false; - } + size_t numDocs = std::distance(begin, end); + Batch docsToInsert(numDocs); + std::transform(begin, end, docsToInsert.begin(), addIdToDocument); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _count += numDocs; - _size += docSize; - return true; - } catch (const DBException& e) { - LOG(1) << "Pushing oplog entries to OplogBufferCollection failed with: " << e; + auto status = _storageInterface->insertDocuments(txn, _nss, docsToInsert); + if (!status.isOK()) { + LOG(1) << "Pushing oplog entries to OplogBufferCollection failed with: " << status; return false; } + + stdx::lock_guard<stdx::mutex> lk(_mutex); + _count += numDocs; + _size += std::accumulate(begin, end, 0U, [](const size_t& docSize, const Value& value) { + return docSize + size_t(value.objsize()); + }); + return true; } void OplogBufferCollection::waitForSpace(OperationContext* txn, std::size_t size) {} @@ -165,37 +147,23 @@ void OplogBufferCollection::clear(OperationContext* txn) { } bool OplogBufferCollection::tryPop(OperationContext* txn, Value* value) { - auto request = FindAndModifyRequest::makeRemove(_nss, BSONObj()); - request.setSort(BSON("_id" << 1)); - - // TODO: use storage interface to find and remove document. - try { - DBDirectClient client(txn); - BSONObj response; - bool res = client.runCommand(_nss.db().toString(), request.toBSON(), response); - if (!res) { - return false; - } - - if (auto okElem = response["ok"]) { - if (!okElem.trueValue()) { - return false; - } - } - - if (auto valueElem = response["value"]) { - *value = extractEmbeddedOplogDocument(valueElem.Obj()).getOwned(); - } else { - return false; + auto keyPattern = BSON("_id" << 1); + auto scanDirection = StorageInterface::ScanDirection::kForward; + auto result = _storageInterface->deleteOne(txn, _nss, keyPattern, scanDirection); + if (!result.isOK()) { + if (result != ErrorCodes::NoSuchKey) { + LOG(1) << "Popping oplog entries from OplogBufferCollection failed with: " + << result.getStatus(); } - stdx::lock_guard<stdx::mutex> lk(_mutex); - _count--; - _size -= value->objsize(); - return true; - } catch (const DBException& e) { - LOG(1) << "Popping oplog entries from OplogBufferCollection failed with: " << e; return false; - }; + } + *value = extractEmbeddedOplogDocument(result.getValue()).getOwned(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_count > 0); + invariant(_size >= std::size_t(value->objsize())); + _count--; + _size -= value->objsize(); + return true; } OplogBuffer::Value OplogBufferCollection::blockingPop(OperationContext* txn) { @@ -223,52 +191,27 @@ boost::optional<OplogBuffer::Value> OplogBufferCollection::lastObjectPushed( } bool OplogBufferCollection::_peekOneSide(OperationContext* txn, Value* value, bool front) const { - int asc = front ? 1 : -1; - // TODO: use storage interface for to find document. - try { - auto query = Query(); - query.sort("_id", asc); - - DBDirectClient client(txn); - BSONObj response = client.findOne(_nss.ns(), query); - if (response.isEmpty()) { - return false; - } - *value = extractEmbeddedOplogDocument(response).getOwned(); - return true; - } catch (const DBException& e) { - LOG(1) << "Peeking oplog entries from OplogBufferCollection failed with: " << e; + auto keyPattern = BSON("_id" << 1); + auto scanDirection = front ? StorageInterface::ScanDirection::kForward + : StorageInterface::ScanDirection::kBackward; + auto result = _storageInterface->findOne(txn, _nss, keyPattern, scanDirection); + if (!result.isOK()) { + LOG(1) << "Peeking oplog entries from OplogBufferCollection failed with: " + << result.getStatus(); return false; } + *value = extractEmbeddedOplogDocument(result.getValue()).getOwned(); + return true; } void OplogBufferCollection::_createCollection(OperationContext* txn) { - // TODO: use storage interface to create collection. - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - AutoGetOrCreateDb databaseWriteGuard(txn, _nss.db(), MODE_X); - auto db = databaseWriteGuard.getDb(); - invariant(db); - WriteUnitOfWork wuow(txn); - CollectionOptions options; - options.temp = true; - auto coll = db->createCollection(txn, _nss.ns(), options); - invariant(coll); - wuow.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "OplogBufferCollection::_createCollection", _nss.ns()); + CollectionOptions options; + options.temp = true; + fassert(40154, _storageInterface->createCollection(txn, _nss, options)); } void OplogBufferCollection::_dropCollection(OperationContext* txn) { - // TODO: use storage interface to drop collection. - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - AutoGetOrCreateDb databaseWriteGuard(txn, _nss.db(), MODE_X); - auto db = databaseWriteGuard.getDb(); - invariant(db); - WriteUnitOfWork wuow(txn); - db->dropCollection(txn, _nss.ns()); - wuow.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "OplogBufferCollection::_dropCollection", _nss.ns()); + fassert(40155, _storageInterface->dropCollection(txn, _nss)); } } // namespace repl diff --git a/src/mongo/db/repl/oplog_buffer_collection.h b/src/mongo/db/repl/oplog_buffer_collection.h index bf0e31c4f7d..1a50a25bc71 100644 --- a/src/mongo/db/repl/oplog_buffer_collection.h +++ b/src/mongo/db/repl/oplog_buffer_collection.h @@ -36,6 +36,8 @@ namespace mongo { namespace repl { +class StorageInterface; + /** * Oplog buffer backed by a temporary collection. This collection is created in startup() and * removed in shutdown(). The documents will be popped and peeked in timestamp order. @@ -59,8 +61,8 @@ public: */ static BSONObj addIdToDocument(const BSONObj& orig); - OplogBufferCollection(); - OplogBufferCollection(const NamespaceString& nss); + explicit OplogBufferCollection(StorageInterface* storageInterface); + OplogBufferCollection(StorageInterface* storageInterface, const NamespaceString& nss); /** * Returns the namespace string of the collection used by this oplog buffer. @@ -105,6 +107,9 @@ private: */ bool _peekOneSide(OperationContext* txn, Value* value, bool front) const; + // Storage interface used to perform storage engine level functions on the collection. + StorageInterface* _storageInterface; + // The namespace for the oplog buffer collection. const NamespaceString _nss; diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 76a5e0e38e4..0eb88d17280 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -57,6 +57,7 @@ protected: protected: ServiceContext::UniqueOperationContext makeOperationContext() const; + StorageInterface* _storageInterface = nullptr; ServiceContext::UniqueOperationContext _txn; private: @@ -79,7 +80,9 @@ void OplogBufferCollectionTest::setUp() { ReplicationCoordinator::set(serviceContext, stdx::make_unique<ReplicationCoordinatorMock>(replSettings)); - StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceImpl>()); + auto storageInterface = stdx::make_unique<StorageInterfaceImpl>(); + _storageInterface = storageInterface.get(); + StorageInterface::set(serviceContext, std::move(storageInterface)); _txn = makeOperationContext(); } @@ -87,6 +90,8 @@ void OplogBufferCollectionTest::setUp() { void OplogBufferCollectionTest::tearDown() { _txn.reset(); + _storageInterface = nullptr; + ServiceContextMongoDTest::tearDown(); } @@ -122,17 +127,17 @@ BSONObj makeOplogEntry(int t) { TEST_F(OplogBufferCollectionTest, DefaultNamespace) { ASSERT_EQUALS(OplogBufferCollection::getDefaultNamespace(), - OplogBufferCollection().getNamespace()); + OplogBufferCollection(_storageInterface).getNamespace()); } TEST_F(OplogBufferCollectionTest, GetNamespace) { auto nss = makeNamespace(_agent); - ASSERT_EQUALS(nss, OplogBufferCollection(nss).getNamespace()); + ASSERT_EQUALS(nss, OplogBufferCollection(_storageInterface, nss).getNamespace()); } TEST_F(OplogBufferCollectionTest, StartupCreatesCollection) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); // Collection should not exist until startup() is called. ASSERT_FALSE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); @@ -143,7 +148,7 @@ TEST_F(OplogBufferCollectionTest, StartupCreatesCollection) { TEST_F(OplogBufferCollectionTest, ShutdownDropsCollection) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); ASSERT_TRUE(AutoGetCollectionForRead(_txn.get(), nss).getCollection()); @@ -153,7 +158,7 @@ TEST_F(OplogBufferCollectionTest, ShutdownDropsCollection) { TEST_F(OplogBufferCollectionTest, extractEmbeddedOplogDocumentChangesIdToTimestamp) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); const BSONObj expectedOp = makeOplogEntry(1); BSONObj originalOp = BSON("_id" << Timestamp(1, 1) << "entry" << expectedOp); @@ -162,7 +167,7 @@ TEST_F(OplogBufferCollectionTest, extractEmbeddedOplogDocumentChangesIdToTimesta TEST_F(OplogBufferCollectionTest, addIdToDocumentChangesTimestampToId) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); const BSONObj originalOp = makeOplogEntry(1); BSONObj expectedOp = BSON("_id" << Timestamp(1, 1) << "entry" << originalOp); @@ -173,7 +178,7 @@ TEST_F(OplogBufferCollectionTest, addIdToDocumentChangesTimestampToId) { TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocument) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); const std::vector<BSONObj> oplog = {makeOplogEntry(1)}; @@ -193,7 +198,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAllNonBlockingAddsDocum TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAddsDocument) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); BSONObj oplog = makeOplogEntry(1); @@ -213,7 +218,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushAddsDocument) { TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); BSONObj oplog = makeOplogEntry(1); @@ -233,7 +238,7 @@ TEST_F(OplogBufferCollectionTest, PushOneDocumentWithPushEvenIfFullAddsDocument) TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); BSONObj oplog = makeOplogEntry(1); @@ -258,7 +263,7 @@ TEST_F(OplogBufferCollectionTest, PeekDoesNotRemoveDocument) { TEST_F(OplogBufferCollectionTest, PopRemovesDocument) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); BSONObj oplog = makeOplogEntry(1); @@ -280,7 +285,7 @@ TEST_F(OplogBufferCollectionTest, PopRemovesDocument) { TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); const std::vector<BSONObj> oplog = { @@ -333,7 +338,7 @@ TEST_F(OplogBufferCollectionTest, PopAndPeekReturnDocumentsInOrder) { TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNewestOplogEntry) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); const std::vector<BSONObj> oplog = { @@ -350,7 +355,7 @@ TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNewestOplogEntry) { TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNoneWithNoEntries) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); @@ -360,7 +365,7 @@ TEST_F(OplogBufferCollectionTest, LastObjectPushedReturnsNoneWithNoEntries) { TEST_F(OplogBufferCollectionTest, IsEmptyReturnsTrueWhenEmptyAndFalseWhenNot) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); BSONObj oplog = makeOplogEntry(1); @@ -371,7 +376,7 @@ TEST_F(OplogBufferCollectionTest, IsEmptyReturnsTrueWhenEmptyAndFalseWhenNot) { TEST_F(OplogBufferCollectionTest, ClearClearsCollection) { auto nss = makeNamespace(_agent); - OplogBufferCollection oplogBuffer(nss); + OplogBufferCollection oplogBuffer(_storageInterface, nss); oplogBuffer.startup(_txn.get()); BSONObj oplog = makeOplogEntry(1); diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 8307ac7b80b..cd04eeb464b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -293,7 +293,8 @@ public: /** * This function creates an oplog buffer of the type specified at server startup. */ - virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const = 0; + virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( + OperationContext* txn) const = 0; /** * Creates an oplog buffer suitable for steady state replication. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index e101dc120e7..2a22b2d7057 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -566,10 +566,10 @@ void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( repl::multiInitialSyncApply(ops, &syncTail); } -std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer() - const { +std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer( + OperationContext* txn) const { if (initialSyncOplogBuffer == kCollectionOplogBufferName) { - return stdx::make_unique<OplogBufferCollection>(); + return stdx::make_unique<OplogBufferCollection>(StorageInterface::get(txn)); } else { return stdx::make_unique<OplogBufferBlockingQueue>(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 8ab9d47f31e..fd89ca1f80a 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -97,7 +97,8 @@ public: virtual void multiSyncApply(const MultiApplier::Operations& ops) override; virtual void multiInitialSyncApply(const MultiApplier::Operations& ops, const HostAndPort& source) override; - virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override; + virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( + OperationContext* txn) const override; virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override; virtual bool shouldUseDataReplicatorInitialSync() const override; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index f1f2770677e..5ef6e476c65 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -256,8 +256,8 @@ void ReplicationCoordinatorExternalStateMock::multiSyncApply(const MultiApplier: void ReplicationCoordinatorExternalStateMock::multiInitialSyncApply( const MultiApplier::Operations& ops, const HostAndPort& source) {} -std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateMock::makeInitialSyncOplogBuffer() - const { +std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateMock::makeInitialSyncOplogBuffer( + OperationContext* txn) const { return stdx::make_unique<OplogBufferBlockingQueue>(); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 87583512504..70cdd0f9f06 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -97,7 +97,8 @@ public: virtual void multiSyncApply(const MultiApplier::Operations& ops) override; virtual void multiInitialSyncApply(const MultiApplier::Operations& ops, const HostAndPort& source) override; - virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer() const override; + virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( + OperationContext* txn) const override; virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer() const override; virtual bool shouldUseDataReplicatorInitialSync() const override; diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 1a45d84021e..e7321f03d87 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -100,22 +100,6 @@ class StorageInterface { MONGO_DISALLOW_COPYING(StorageInterface); public: - // Used for testing. - - using CreateCollectionFn = stdx::function<StatusWith<std::unique_ptr<CollectionBulkLoader>>( - const NamespaceString& nss, - const CollectionOptions& options, - const BSONObj idIndexSpec, - const std::vector<BSONObj>& secondaryIndexSpecs)>; - using InsertDocumentFn = stdx::function<Status( - OperationContext* txn, const NamespaceString& nss, const BSONObj& doc)>; - using InsertOplogDocumentsFn = stdx::function<StatusWith<OpTime>( - OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops)>; - using DropUserDatabasesFn = stdx::function<Status(OperationContext* txn)>; - using CreateOplogFn = stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>; - using DropCollectionFn = - stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>; - // Operation Context binding. static StorageInterface* get(ServiceContext* service); static StorageInterface* get(ServiceContext& service); @@ -209,17 +193,25 @@ public: const BSONObj& doc) = 0; /** - * Inserts the given documents into the oplog, returning the last written OpTime. + * Inserts the given documents into the collection. */ - virtual StatusWith<OpTime> insertOplogDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& ops) = 0; + virtual Status insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) = 0; + /** * Creates the initial oplog, errors if it exists. */ virtual Status createOplog(OperationContext* txn, const NamespaceString& nss) = 0; /** + * Creates a collection. + */ + virtual Status createCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options) = 0; + + /** * Drops a collection, like the oplog. */ virtual Status dropCollection(OperationContext* txn, const NamespaceString& nss) = 0; @@ -233,6 +225,28 @@ public: * Validates that the admin database is valid during initial sync. */ virtual Status isAdminDbValid(OperationContext* txn) = 0; + + /** + * Finds the first document returned by an index scan on the collection in the requested + * direction. + */ + enum class ScanDirection { + kForward = 1, + kBackward = -1, + }; + virtual StatusWith<BSONObj> findOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) = 0; + + /** + * Deletes the first document returned by an index scan on the collection in the requested + * direction. Returns deleted document on success. + */ + virtual StatusWith<BSONObj> deleteOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 3acbad55326..1f1319611a6 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/storage_interface_impl.h" #include <algorithm> +#include <utility> #include "mongo/base/status.h" #include "mongo/base/status_with.h" @@ -48,8 +49,11 @@ #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/delete.h" #include "mongo/db/jsobj.h" +#include "mongo/db/keypattern.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/collection_bulk_loader_impl.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" @@ -245,24 +249,10 @@ StatusWith<OpTime> StorageInterfaceImpl::writeOpsToOplog( auto toBSON = [](const OplogEntry& entry) { return entry.raw; }; std::transform(operations.begin(), operations.end(), ops.begin(), toBSON); - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - AutoGetCollection collectionWriteGuard(txn, nss, MODE_X); - auto collection = collectionWriteGuard.getCollection(); - if (!collection) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "unable to write operations to oplog at " << nss.ns() - << ": collection not found. Did you drop it?"}; - } - - WriteUnitOfWork wunit(txn); - OpDebug* const nullOpDebug = nullptr; - auto status = collection->insertDocuments(txn, ops.begin(), ops.end(), nullOpDebug, false); - if (!status.isOK()) { - return status; - } - wunit.commit(); + auto status = insertDocuments(txn, nss, ops); + if (!status.isOK()) { + return status; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "writeOpsToOplog", nss.ns()); return operations.back().getOpTime(); } @@ -345,47 +335,33 @@ StorageInterfaceImpl::createCollectionForBulkLoading( Status StorageInterfaceImpl::insertDocument(OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) { - ScopedTransaction transaction(txn, MODE_IX); - AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX); - AutoGetCollection autoColl(txn, nss, MODE_IX); - if (!autoColl.getCollection()) { - return {ErrorCodes::NamespaceNotFound, - "The collection must exist before inserting documents."}; - } - WriteUnitOfWork wunit(txn); - const auto status = - autoColl.getCollection()->insertDocument(txn, doc, nullptr /** OpDebug **/, false, false); - if (status.isOK()) { - wunit.commit(); - } - return status; + return insertDocuments(txn, nss, {doc}); } -StatusWith<OpTime> StorageInterfaceImpl::insertOplogDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& ops) { +Status StorageInterfaceImpl::insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX); AutoGetCollection autoColl(txn, nss, MODE_IX); - if (!autoColl.getCollection()) { - return { - ErrorCodes::NamespaceNotFound, - str::stream() << "The oplog collection must exist before inserting documents, ns:" - << nss.ns()}; - } - if (!autoColl.getCollection()->isCapped()) { + auto collection = autoColl.getCollection(); + if (!collection) { return {ErrorCodes::NamespaceNotFound, - str::stream() << "The oplog collection must be capped, ns:" << nss.ns()}; + str::stream() << "The collection must exist before inserting documents, ns:" + << nss.ns()}; } WriteUnitOfWork wunit(txn); - const auto lastOpTime = repl::writeOpsToOplog(txn, nss.ns(), ops); + OpDebug* const nullOpDebug = nullptr; + auto status = + collection->insertDocuments(txn, docs.begin(), docs.end(), nullOpDebug, false); + if (!status.isOK()) { + return status; + } wunit.commit(); - return lastOpTime; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::insertOplogDocuments", nss.ns()); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::insertDocuments", nss.ns()); + + return Status::OK(); } Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* txn) { @@ -398,6 +374,26 @@ Status StorageInterfaceImpl::createOplog(OperationContext* txn, const NamespaceS return Status::OK(); } +Status StorageInterfaceImpl::createCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + AutoGetOrCreateDb databaseWriteGuard(txn, nss.db(), MODE_X); + auto db = databaseWriteGuard.getDb(); + invariant(db); + if (db->getCollection(nss)) { + return {ErrorCodes::NamespaceExists, + str::stream() << "Collection " << nss.ns() << " already exists."}; + } + WriteUnitOfWork wuow(txn); + auto coll = db->createCollection(txn, nss.ns(), options); + invariant(coll); + wuow.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::createCollection", nss.ns()); + return Status::OK(); +} + Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const NamespaceString& nss) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IX); @@ -412,6 +408,114 @@ Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const Namespa MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::dropCollection", nss.ns()); } +namespace { + +/** + * Returns DeleteStageParams for deleteOne with fetch. + */ +DeleteStageParams makeDeleteStageParamsForDeleteOne() { + DeleteStageParams deleteStageParams; + invariant(!deleteStageParams.isMulti); + deleteStageParams.returnDeleted = true; + return deleteStageParams; +} + +/** + * Shared implementation between findOne and deleteOne. + */ +enum class FindDeleteMode { kFind, kDelete }; +StatusWith<BSONObj> _findOrDeleteOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + StorageInterface::ScanDirection scanDirection, + FindDeleteMode mode) { + auto isFind = mode == FindDeleteMode::kFind; + auto opStr = isFind ? "StorageInterfaceImpl::findOne" : "StorageInterfaceImpl::deleteOne"; + + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + auto collectionAccessMode = isFind ? MODE_IS : MODE_IX; + AutoGetCollection collectionGuard(txn, nss, collectionAccessMode); + auto collection = collectionGuard.getCollection(); + if (!collection) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "Collection not found, ns:" << nss.ns()}; + } + auto indexCatalog = collection->getIndexCatalog(); + invariant(indexCatalog); + bool includeUnfinishedIndexes = false; + auto indexDescriptor = + indexCatalog->findIndexByKeyPattern(txn, indexKeyPattern, includeUnfinishedIndexes); + if (!indexDescriptor) { + return {ErrorCodes::IndexNotFound, + str::stream() << "Index not found, ns:" << nss.ns() << ", index: " + << indexKeyPattern}; + } + if (indexDescriptor->isPartial()) { + return {ErrorCodes::IndexOptionsConflict, + str::stream() << "Partial index is not allowed for this operation, ns:" + << nss.ns() + << ", index: " + << indexKeyPattern}; + } + + KeyPattern keyPattern(indexKeyPattern); + auto minKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, false)); + auto maxKey = Helpers::toKeyFormat(keyPattern.extendRangeBound({}, true)); + auto isForward = scanDirection == StorageInterface::ScanDirection::kForward; + auto bounds = isForward ? std::make_pair(minKey, maxKey) : std::make_pair(maxKey, minKey); + bool endKeyInclusive = false; + auto direction = isForward ? InternalPlanner::FORWARD : InternalPlanner::BACKWARD; + + std::unique_ptr<PlanExecutor> planExecutor = isFind + ? InternalPlanner::indexScan(txn, + collection, + indexDescriptor, + bounds.first, + bounds.second, + endKeyInclusive, + PlanExecutor::YIELD_MANUAL, + direction, + InternalPlanner::IXSCAN_FETCH) + : InternalPlanner::deleteWithIndexScan(txn, + collection, + makeDeleteStageParamsForDeleteOne(), + indexDescriptor, + bounds.first, + bounds.second, + endKeyInclusive, + PlanExecutor::YIELD_MANUAL, + direction); + + BSONObj doc; + auto state = planExecutor->getNext(&doc, nullptr); + if (PlanExecutor::IS_EOF == state) { + return {ErrorCodes::NoSuchKey, + str::stream() << "Collection is empty, ns: " << nss.ns() << ", index: " + << indexKeyPattern}; + } + invariant(PlanExecutor::ADVANCED == state); + return doc; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, opStr, nss.ns()); + MONGO_UNREACHABLE; +} + +} // namespace + +StatusWith<BSONObj> StorageInterfaceImpl::findOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) { + return _findOrDeleteOne(txn, nss, indexKeyPattern, scanDirection, FindDeleteMode::kFind); +} + +StatusWith<BSONObj> StorageInterfaceImpl::deleteOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) { + return _findOrDeleteOne(txn, nss, indexKeyPattern, scanDirection, FindDeleteMode::kDelete); +} + Status StorageInterfaceImpl::isAdminDbValid(OperationContext* txn) { log() << "StorageInterfaceImpl::isAdminDbValid called."; // TODO: plumb through operation context from caller, for now run on ioThread with runner. diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 9f05270ef1f..0eb2f47e406 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -94,16 +94,30 @@ public: const NamespaceString& nss, const BSONObj& doc) override; - StatusWith<OpTime> insertOplogDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& ops) override; + Status insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) override; Status dropReplicatedDatabases(OperationContext* txn) override; Status createOplog(OperationContext* txn, const NamespaceString& nss) override; + Status createCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options) override; + Status dropCollection(OperationContext* txn, const NamespaceString& nss) override; + StatusWith<BSONObj> findOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) override; + + StatusWith<BSONObj> deleteOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) override; + Status isAdminDbValid(OperationContext* txn) override; private: diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 31a6a814209..662daf84274 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -51,6 +51,7 @@ #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" +#include "mongo/util/mongoutils/str.h" namespace { @@ -58,6 +59,14 @@ using namespace mongo; using namespace mongo::repl; /** + * Generates a unique namespace from the test registration agent. + */ +template <typename T> +NamespaceString makeNamespace(const T& t, const char* suffix = "") { + return NamespaceString("local." + t.getSuiteName() + "_" + t.getTestName() + suffix); +} + +/** * Returns min valid document. */ BSONObj getMinValidDocument(OperationContext* txn, const NamespaceString& minValidNss) { @@ -374,7 +383,7 @@ TEST_F(StorageInterfaceImplTest, auto txn = getClient()->makeOperationContext(); auto status = storageInterface.writeOpsToOplog(txn.get(), nss, {op}).getStatus(); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); - ASSERT_STRING_CONTAINS(status.reason(), "collection not found"); + ASSERT_STRING_CONTAINS(status.reason(), "The collection must exist before inserting documents"); } TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocWorksOnExistingCappedCollection) { @@ -463,6 +472,26 @@ TEST_F(StorageInterfaceImplWithReplCoordTest, CreateOplogCreateCappedCollection) } } +TEST_F(StorageInterfaceImplWithReplCoordTest, CreateCollectionFailsIfCollectionExists) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + { + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_FALSE(autoColl.getCollection()); + } + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + { + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_TRUE(autoColl.getCollection()); + ASSERT_EQ(nss.toString(), autoColl.getCollection()->ns().toString()); + } + auto status = storage.createCollection(txn, nss, CollectionOptions()); + ASSERT_EQUALS(ErrorCodes::NamespaceExists, status); + ASSERT_STRING_CONTAINS(status.reason(), + str::stream() << "Collection " << nss.ns() << " already exists"); +} + TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithExistingWithDataCollection) { auto txn = getOperationContext(); StorageInterfaceImpl storage; @@ -491,4 +520,173 @@ TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithMissingColl ASSERT_FALSE(autoColl.getCollection()); } +TEST_F(StorageInterfaceImplWithReplCoordTest, FindOneReturnsInvalidNamespaceIfCollectionIsMissing) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, + storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, FindOneReturnsIndexNotFoundIfIndexIsMissing) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("x" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_EQUALS(ErrorCodes::IndexNotFound, + storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, + FindOneReturnsIndexOptionsConflictIfIndexIsAPartialIndex) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + storage.startup(); + auto nss = makeNamespace(_agent); + std::vector<BSONObj> indexes = {BSON("v" << 1 << "key" << BSON("x" << 1) << "name" + << "x_1" + << "ns" + << nss.ns() + << "partialFilterExpression" + << BSON("y" << 1))}; + auto loader = unittest::assertGet( + storage.createCollectionForBulkLoading(nss, CollectionOptions(), {}, indexes)); + std::vector<BSONObj> docs = {BSON("_id" << 1), BSON("_id" << 1), BSON("_id" << 2)}; + ASSERT_OK(loader->insertDocuments(docs.begin(), docs.end())); + ASSERT_OK(loader->commit()); + auto keyPattern = BSON("x" << 1); + ASSERT_EQUALS(ErrorCodes::IndexOptionsConflict, + storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, FindOneReturnsNoSuchKeyIfCollectionIsEmpty) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, + storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, + FindOneReturnsDocumentWithLowestKeyValueIfScanDirectionIsForward) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_OK( + storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)})); + ASSERT_EQUALS(BSON("_id" << 0), + storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); + + // Check collection contents. OplogInterface returns documents in reverse natural order. + OplogInterfaceLocal oplog(txn, nss.ns()); + auto iter = oplog.makeIterator(); + ASSERT_EQUALS(BSON("_id" << 2), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(BSON("_id" << 0), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, + FindOneReturnsDocumentWithHighestKeyValueIfScanDirectionIsBackward) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_OK( + storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)})); + ASSERT_EQUALS( + BSON("_id" << 2), + storage.findOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kBackward)); + + // Check collection contents. OplogInterface returns documents in reverse natural order. + OplogInterfaceLocal oplog(txn, nss.ns()); + auto iter = oplog.makeIterator(); + ASSERT_EQUALS(BSON("_id" << 2), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(BSON("_id" << 0), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, + DeleteOneReturnsInvalidNamespaceIfCollectionIsMissing) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_EQUALS( + ErrorCodes::NamespaceNotFound, + storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, DeleteOneReturnsIndexNotFoundIfIndexIsMissing) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("x" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_EQUALS( + ErrorCodes::IndexNotFound, + storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, DeleteOneReturnsNoSuchKeyIfCollectionIsEmpty) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_EQUALS( + ErrorCodes::NoSuchKey, + storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, + DeleteOneReturnsDocumentWithLowestKeyValueIfScanDirectionIsForward) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_OK( + storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)})); + ASSERT_EQUALS( + BSON("_id" << 0), + storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kForward)); + + // Check collection contents. OplogInterface returns documents in reverse natural order. + OplogInterfaceLocal oplog(txn, nss.ns()); + auto iter = oplog.makeIterator(); + ASSERT_EQUALS(BSON("_id" << 2), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, + DeleteOneReturnsDocumentWithHighestKeyValueIfScanDirectionIsBackward) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + auto nss = makeNamespace(_agent); + auto keyPattern = BSON("_id" << 1); + ASSERT_OK(storage.createCollection(txn, nss, CollectionOptions())); + ASSERT_OK( + storage.insertDocuments(txn, nss, {BSON("_id" << 0), BSON("_id" << 1), BSON("_id" << 2)})); + ASSERT_EQUALS( + BSON("_id" << 2), + storage.deleteOne(txn, nss, keyPattern, StorageInterface::ScanDirection::kBackward)); + + // Check collection contents. OplogInterface returns documents in reverse natural order. + OplogInterfaceLocal oplog(txn, nss.ns()); + auto iter = oplog.makeIterator(); + ASSERT_EQUALS(BSON("_id" << 1), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(BSON("_id" << 0), unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus()); +} + } // namespace diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 143d5d367cd..b25d91f8978 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -87,6 +87,33 @@ class StorageInterfaceMock : public StorageInterface { MONGO_DISALLOW_COPYING(StorageInterfaceMock); public: + // Used for testing. + + using CreateCollectionForBulkFn = + stdx::function<StatusWith<std::unique_ptr<CollectionBulkLoader>>( + const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs)>; + using InsertDocumentFn = stdx::function<Status( + OperationContext* txn, const NamespaceString& nss, const BSONObj& doc)>; + using InsertDocumentsFn = stdx::function<Status( + OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs)>; + using DropUserDatabasesFn = stdx::function<Status(OperationContext* txn)>; + using CreateOplogFn = stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>; + using CreateCollectionFn = stdx::function<Status( + OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options)>; + using DropCollectionFn = + stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>; + using FindOneFn = stdx::function<StatusWith<BSONObj>(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection)>; + using DeleteOneFn = stdx::function<StatusWith<BSONObj>(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection)>; + StorageInterfaceMock() = default; void startup() override; @@ -113,7 +140,7 @@ public: const CollectionOptions& options, const BSONObj idIndexSpec, const std::vector<BSONObj>& secondaryIndexSpecs) override { - return createCollectionFn(nss, options, idIndexSpec, secondaryIndexSpecs); + return createCollectionForBulkFn(nss, options, idIndexSpec, secondaryIndexSpecs); }; Status insertDocument(OperationContext* txn, @@ -122,10 +149,10 @@ public: return insertDocumentFn(txn, nss, doc); }; - StatusWith<OpTime> insertOplogDocuments(OperationContext* txn, - const NamespaceString& nss, - const std::vector<BSONObj>& ops) override { - return insertOplogDocumentsFn(txn, nss, ops); + Status insertDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& docs) override { + return insertDocumentsFn(txn, nss, docs); } Status dropReplicatedDatabases(OperationContext* txn) override { @@ -136,31 +163,51 @@ public: return createOplogFn(txn, nss); }; + Status createCollection(OperationContext* txn, + const NamespaceString& nss, + const CollectionOptions& options) override { + return createCollFn(txn, nss, options); + } + Status dropCollection(OperationContext* txn, const NamespaceString& nss) override { return dropCollFn(txn, nss); }; + StatusWith<BSONObj> findOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) override { + return findOneFn(txn, nss, indexKeyPattern, scanDirection); + } + + StatusWith<BSONObj> deleteOne(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) override { + return deleteOneFn(txn, nss, indexKeyPattern, scanDirection); + } + Status isAdminDbValid(OperationContext* txn) override { return Status::OK(); }; // Testing functions. - CreateCollectionFn createCollectionFn = [](const NamespaceString& nss, - const CollectionOptions& options, - const BSONObj idIndexSpec, - const std::vector<BSONObj>& secondaryIndexSpecs) - -> StatusWith<std::unique_ptr<CollectionBulkLoader>> { - return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."}; - }; + CreateCollectionForBulkFn createCollectionForBulkFn = + [](const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& + secondaryIndexSpecs) -> StatusWith<std::unique_ptr<CollectionBulkLoader>> { + return Status{ErrorCodes::IllegalOperation, "CreateCollectionForBulkFn not implemented."}; + }; InsertDocumentFn insertDocumentFn = [](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) { return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."}; }; - InsertOplogDocumentsFn insertOplogDocumentsFn = - [](OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops) { - return StatusWith<OpTime>( - Status{ErrorCodes::IllegalOperation, "InsertOplogDocumentsFn not implemented."}); + InsertDocumentsFn insertDocumentsFn = + [](OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + return Status{ErrorCodes::IllegalOperation, "InsertDocumentsFn not implemented."}; }; DropUserDatabasesFn dropUserDBsFn = [](OperationContext* txn) { return Status{ErrorCodes::IllegalOperation, "DropUserDatabasesFn not implemented."}; @@ -168,9 +215,25 @@ public: CreateOplogFn createOplogFn = [](OperationContext* txn, const NamespaceString& nss) { return Status{ErrorCodes::IllegalOperation, "CreateOplogFn not implemented."}; }; + CreateCollectionFn createCollFn = + [](OperationContext* txn, const NamespaceString& nss, const CollectionOptions& options) { + return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."}; + }; DropCollectionFn dropCollFn = [](OperationContext* txn, const NamespaceString& nss) { return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."}; }; + FindOneFn findOneFn = [](OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) { + return Status{ErrorCodes::IllegalOperation, "FindOneFn not implemented."}; + }; + DeleteOneFn deleteOneFn = [](OperationContext* txn, + const NamespaceString& nss, + const BSONObj& indexKeyPattern, + ScanDirection scanDirection) { + return Status{ErrorCodes::IllegalOperation, "DeleteOneFn not implemented."}; + }; private: mutable stdx::mutex _initialSyncFlagMutex; |