diff options
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 24 |
10 files changed, 50 insertions, 85 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 40d8cde4504..f1123991b41 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -56,7 +56,6 @@ env.Library( 'storage_interface.cpp', ], LIBDEPS=[ - 'oplog_entry', 'optime', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/util/decorable', diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 696e699b984..4a3f3e721a1 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -367,10 +367,10 @@ Status _initialSync(BackgroundSync* bgsync) { log() << "initial sync data copy, starting syncup"; // prime oplog, but don't need to actually apply the op as the cloned data already reflects it. - OplogEntry lastOplogEntry(lastOp); - OpTime lastOptime = fassertStatusOK(40142, - StorageInterface::get(&txn)->writeOpsToOplog( - &txn, NamespaceString(rsOplogName), {lastOplogEntry})); + fassertStatusOK( + 40142, + StorageInterface::get(&txn)->insertDocument(&txn, NamespaceString(rsOplogName), lastOp)); + OpTime lastOptime = OplogEntry(lastOp).getOpTime(); ReplClientInfo::forClient(txn.getClient()).setLastOp(lastOptime); replCoord->setMyLastAppliedOpTime(lastOptime); setNewTimestamp(lastOptime.getTimestamp()); diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index e7321f03d87..5168a8d7fda 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -35,7 +35,6 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_bulk_loader.h" -#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/optime.h" #include "mongo/db/service_context.h" @@ -160,16 +159,6 @@ public: */ virtual void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) = 0; - /** - * Writes operations into the replica set oplog at "nss". - * Used internally by replication secondaries. - * - * Returns the optime for the last operation inserted on success. - */ - virtual StatusWith<OpTime> writeOpsToOplog(OperationContext* txn, - const NamespaceString& nss, - const MultiApplier::Operations& operations) = 0; - // Collection creation and population for initial sync. /** * Creates a collection with the provided indexes. @@ -194,6 +183,7 @@ public: /** * Inserts the given documents into the collection. + * It is an error to call this function with an empty set of documents. */ virtual Status insertDocuments(OperationContext* txn, const NamespaceString& nss, diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 1f1319611a6..f28d2e79643 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -238,25 +238,6 @@ void StorageInterfaceImpl::setMinValid(OperationContext* txn, const BatchBoundar << boundaries.end.toBSON() << ")"; } -StatusWith<OpTime> StorageInterfaceImpl::writeOpsToOplog( - OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) { - if (operations.empty()) { - return {ErrorCodes::EmptyArrayOperation, - "unable to write operations to oplog - no operations provided"}; - } - - std::vector<BSONObj> ops(operations.size()); - auto toBSON = [](const OplogEntry& entry) { return entry.raw; }; - std::transform(operations.begin(), operations.end(), ops.begin(), toBSON); - - auto status = insertDocuments(txn, nss, ops); - if (!status.isOK()) { - return status; - } - - return operations.back().getOpTime(); -} - StatusWith<std::unique_ptr<CollectionBulkLoader>> StorageInterfaceImpl::createCollectionForBulkLoading( const NamespaceString& nss, @@ -341,6 +322,11 @@ Status StorageInterfaceImpl::insertDocument(OperationContext* txn, Status StorageInterfaceImpl::insertDocuments(OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + if (docs.empty()) { + return {ErrorCodes::EmptyArrayOperation, + str::stream() << "unable to insert documents into " << nss.ns() + << " - no documents provided"}; + } MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { AutoGetCollection autoColl(txn, nss, MODE_IX); auto collection = autoColl.getCollection(); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 0eb2f47e406..32181d78584 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -77,10 +77,6 @@ public: void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries) override; - StatusWith<OpTime> writeOpsToOplog(OperationContext* txn, - const NamespaceString& nss, - const MultiApplier::Operations& operations) override; - /** * Allocates a new TaskRunner for use by the passed in collection. */ diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 662daf84274..fa9692c8619 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -118,13 +118,13 @@ void createCollection(OperationContext* txn, /** * Creates an oplog entry with given optime. */ -OplogEntry makeOplogEntry(OpTime opTime) { +BSONObj makeOplogEntry(OpTime opTime) { BSONObjBuilder bob; bob.appendElements(opTime.toBSON()); bob.append("h", 1LL); bob.append("op", "c"); bob.append("ns", "test.t"); - return OplogEntry(bob.obj()); + return bob.obj(); } /** @@ -328,16 +328,16 @@ TEST_F(StorageInterfaceImplTest, SnapshotNotSupported) { } TEST_F(StorageInterfaceImplTest, - WriteOpsToOplogReturnsEmptyArrayOperationWhenNoOperationsAreGiven) { + InsertDocumentsReturnsEmptyArrayOperationWhenNoOperationsAreGiven) { NamespaceString nss("local." + _agent.getTestName()); StorageInterfaceImpl storageInterface(nss); auto txn = getClient()->makeOperationContext(); ASSERT_EQUALS(ErrorCodes::EmptyArrayOperation, - storageInterface.writeOpsToOplog(txn.get(), nss, {})); + storageInterface.insertDocuments(txn.get(), nss, {})); } TEST_F(StorageInterfaceImplTest, - WriteOpsToOplogReturnsInternalErrorWhenSavingOperationToNonOplogCollection) { + InsertDocumentsReturnsInternalErrorWhenSavingOperationToNonOplogCollection) { // Create fake non-oplog collection to ensure saving oplog entries (without _id field) will // fail. auto txn = getClient()->makeOperationContext(); @@ -347,12 +347,12 @@ TEST_F(StorageInterfaceImplTest, // Non-oplog collection will enforce mandatory _id field requirement on insertion. StorageInterfaceImpl storageInterface(nss); auto op = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL}); - auto status = storageInterface.writeOpsToOplog(txn.get(), nss, {op}).getStatus(); + auto status = storageInterface.insertDocuments(txn.get(), nss, {op}); ASSERT_EQUALS(ErrorCodes::InternalError, status); ASSERT_STRING_CONTAINS(status.reason(), "Collection::insertDocument got document without _id"); } -TEST_F(StorageInterfaceImplTest, WriteOpsToOplogSavesOperationsReturnsOpTimeOfLastOperation) { +TEST_F(StorageInterfaceImplTest, InsertDocumentsSavesOperationsReturnsOpTimeOfLastOperation) { // Create fake oplog collection to hold operations. auto txn = getClient()->makeOperationContext(); NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); @@ -363,25 +363,23 @@ TEST_F(StorageInterfaceImplTest, WriteOpsToOplogSavesOperationsReturnsOpTimeOfLa StorageInterfaceImpl storageInterface(nss); auto op1 = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL}); auto op2 = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL}); - ASSERT_EQUALS( - op2.getOpTime(), - unittest::assertGet(storageInterface.writeOpsToOplog(txn.get(), nss, {op1, op2}))); + ASSERT_OK(storageInterface.insertDocuments(txn.get(), nss, {op1, op2})); // Check contents of oplog. OplogInterface iterates over oplog collection in reverse. repl::OplogInterfaceLocal oplog(txn.get(), nss.ns()); auto iter = oplog.makeIterator(); - ASSERT_EQUALS(op2.raw, unittest::assertGet(iter->next()).first); - ASSERT_EQUALS(op1.raw, unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(op2, unittest::assertGet(iter->next()).first); + ASSERT_EQUALS(op1, unittest::assertGet(iter->next()).first); ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus()); } TEST_F(StorageInterfaceImplTest, - WriteOpsToOplogReturnsNamespaceNotFoundIfOplogCollectionDoesNotExist) { + InsertDocumentsReturnsNamespaceNotFoundIfOplogCollectionDoesNotExist) { auto op = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL}); NamespaceString nss("local.nosuchcollection"); StorageInterfaceImpl storageInterface(nss); auto txn = getClient()->makeOperationContext(); - auto status = storageInterface.writeOpsToOplog(txn.get(), nss, {op}).getStatus(); + auto status = storageInterface.insertDocuments(txn.get(), nss, {op}); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); ASSERT_STRING_CONTAINS(status.reason(), "The collection must exist before inserting documents"); } diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index 57daebdb60c..96b40deba0d 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -72,21 +72,6 @@ void StorageInterfaceMock::setMinValid(OperationContext* txn, const BatchBoundar _minValidBoundaries = boundaries; } -StatusWith<OpTime> StorageInterfaceMock::writeOpsToOplog( - OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) { - invariant(!operations.empty()); - stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex); - for (const auto& oplogEntry : operations) { - _operationsWrittenToOplog.push_back(oplogEntry.getOwned()); - } - return operations.back().getOpTime(); -} - -MultiApplier::Operations StorageInterfaceMock::getOperationsWrittenToOplog() const { - stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex); - return _operationsWrittenToOplog; -} - Status CollectionBulkLoaderMock::init(OperationContext* txn, Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs) { diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index b25d91f8978..4b19194832d 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -129,12 +129,6 @@ public: const DurableRequirement durReq) override; void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) override; - StatusWith<OpTime> writeOpsToOplog(OperationContext* txn, - const NamespaceString& nss, - const MultiApplier::Operations& operations) override; - - MultiApplier::Operations getOperationsWrittenToOplog() const; - StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading( const NamespaceString& nss, const CollectionOptions& options, @@ -241,9 +235,6 @@ private: mutable stdx::mutex _minValidBoundariesMutex; BatchBoundaries _minValidBoundaries = {OpTime(), OpTime()}; - - mutable stdx::mutex _operationsWrittenToOplogMutex; - MultiApplier::Operations _operationsWrittenToOplog; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index a4514386671..84871d24249 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -1226,12 +1226,16 @@ StatusWith<OpTime> multiApply(OperationContext* txn, applyOps(writerVectors, workerPool, applyOperation); - OpTime lastOpTime; { ON_BLOCK_EXIT([&] { workerPool->join(); }); - lastOpTime = fassertStatusOK( + + std::vector<BSONObj> docs(ops.size()); + auto toBSON = [](const OplogEntry& entry) { return entry.raw; }; + std::transform(ops.begin(), ops.end(), docs.begin(), toBSON); + + fassertStatusOK( 40141, - StorageInterface::get(txn)->writeOpsToOplog(txn, NamespaceString(rsOplogName), ops)); + StorageInterface::get(txn)->insertDocuments(txn, NamespaceString(rsOplogName), docs)); } if (inShutdownStrict()) { @@ -1240,7 +1244,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn, "Cannot apply operations due to shutdown in progress"}; } // We have now written all database writes and updated the oplog to match. - return lastOpTime; + return ops.back().getOpTime(); } } // namespace repl diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 81e4cc51ac1..2b1ff21d1c7 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -30,6 +30,7 @@ #include <algorithm> #include <memory> +#include <utility> #include <vector> #include "mongo/db/catalog/collection_options.h" @@ -43,6 +44,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" @@ -107,6 +109,9 @@ void SyncTailTest::setUp() { stdx::make_unique<ReplicationCoordinatorMock>(replSettings)); auto storageInterface = stdx::make_unique<StorageInterfaceMock>(); _storageInterface = storageInterface.get(); + storageInterface->insertDocumentsFn = [](OperationContext*, + const NamespaceString&, + const std::vector<BSONObj>&) { return Status::OK(); }; StorageInterface::set(serviceContext, std::move(storageInterface)); Client::initThreadIfNotAlready(); @@ -556,6 +561,16 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2)); + NamespaceString nssForInsert; + std::vector<BSONObj> operationsWrittenToOplog; + _storageInterface->insertDocumentsFn = [&mutex, &nssForInsert, &operationsWrittenToOplog]( + OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs) { + stdx::lock_guard<stdx::mutex> lock(mutex); + nssForInsert = nss; + operationsWrittenToOplog = docs; + return Status::OK(); + }; + auto lastOpTime = unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn)); ASSERT_EQUALS(op2.getOpTime(), lastOpTime); @@ -576,10 +591,11 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH } // Check ops in oplog. - auto operationsWritternToOplog = _storageInterface->getOperationsWrittenToOplog(); - ASSERT_EQUALS(2U, operationsWritternToOplog.size()); - ASSERT_EQUALS(op1, operationsWritternToOplog[0]); - ASSERT_EQUALS(op2, operationsWritternToOplog[1]); + stdx::lock_guard<stdx::mutex> lock(mutex); + ASSERT_EQUALS(2U, operationsWrittenToOplog.size()); + ASSERT_EQUALS(NamespaceString(rsOplogName), nssForInsert); + ASSERT_EQUALS(op1.raw, operationsWrittenToOplog[0]); + ASSERT_EQUALS(op2.raw, operationsWrittenToOplog[1]); } TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) { |