summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp8
-rw-r--r--src/mongo/db/repl/storage_interface.h12
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp24
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h4
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp26
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp15
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h9
-rw-r--r--src/mongo/db/repl/sync_tail.cpp12
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp24
10 files changed, 50 insertions, 85 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 40d8cde4504..f1123991b41 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -56,7 +56,6 @@ env.Library(
'storage_interface.cpp',
],
LIBDEPS=[
- 'oplog_entry',
'optime',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/util/decorable',
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 696e699b984..4a3f3e721a1 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -367,10 +367,10 @@ Status _initialSync(BackgroundSync* bgsync) {
log() << "initial sync data copy, starting syncup";
// prime oplog, but don't need to actually apply the op as the cloned data already reflects it.
- OplogEntry lastOplogEntry(lastOp);
- OpTime lastOptime = fassertStatusOK(40142,
- StorageInterface::get(&txn)->writeOpsToOplog(
- &txn, NamespaceString(rsOplogName), {lastOplogEntry}));
+ fassertStatusOK(
+ 40142,
+ StorageInterface::get(&txn)->insertDocument(&txn, NamespaceString(rsOplogName), lastOp));
+ OpTime lastOptime = OplogEntry(lastOp).getOpTime();
ReplClientInfo::forClient(txn.getClient()).setLastOp(lastOptime);
replCoord->setMyLastAppliedOpTime(lastOptime);
setNewTimestamp(lastOptime.getTimestamp());
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index e7321f03d87..5168a8d7fda 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -35,7 +35,6 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_bulk_loader.h"
-#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/service_context.h"
@@ -160,16 +159,6 @@ public:
*/
virtual void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) = 0;
- /**
- * Writes operations into the replica set oplog at "nss".
- * Used internally by replication secondaries.
- *
- * Returns the optime for the last operation inserted on success.
- */
- virtual StatusWith<OpTime> writeOpsToOplog(OperationContext* txn,
- const NamespaceString& nss,
- const MultiApplier::Operations& operations) = 0;
-
// Collection creation and population for initial sync.
/**
* Creates a collection with the provided indexes.
@@ -194,6 +183,7 @@ public:
/**
* Inserts the given documents into the collection.
+ * It is an error to call this function with an empty set of documents.
*/
virtual Status insertDocuments(OperationContext* txn,
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 1f1319611a6..f28d2e79643 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -238,25 +238,6 @@ void StorageInterfaceImpl::setMinValid(OperationContext* txn, const BatchBoundar
<< boundaries.end.toBSON() << ")";
}
-StatusWith<OpTime> StorageInterfaceImpl::writeOpsToOplog(
- OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) {
- if (operations.empty()) {
- return {ErrorCodes::EmptyArrayOperation,
- "unable to write operations to oplog - no operations provided"};
- }
-
- std::vector<BSONObj> ops(operations.size());
- auto toBSON = [](const OplogEntry& entry) { return entry.raw; };
- std::transform(operations.begin(), operations.end(), ops.begin(), toBSON);
-
- auto status = insertDocuments(txn, nss, ops);
- if (!status.isOK()) {
- return status;
- }
-
- return operations.back().getOpTime();
-}
-
StatusWith<std::unique_ptr<CollectionBulkLoader>>
StorageInterfaceImpl::createCollectionForBulkLoading(
const NamespaceString& nss,
@@ -341,6 +322,11 @@ Status StorageInterfaceImpl::insertDocument(OperationContext* txn,
Status StorageInterfaceImpl::insertDocuments(OperationContext* txn,
const NamespaceString& nss,
const std::vector<BSONObj>& docs) {
+ if (docs.empty()) {
+ return {ErrorCodes::EmptyArrayOperation,
+ str::stream() << "unable to insert documents into " << nss.ns()
+ << " - no documents provided"};
+ }
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
AutoGetCollection autoColl(txn, nss, MODE_IX);
auto collection = autoColl.getCollection();
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 0eb2f47e406..32181d78584 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -77,10 +77,6 @@ public:
void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries) override;
- StatusWith<OpTime> writeOpsToOplog(OperationContext* txn,
- const NamespaceString& nss,
- const MultiApplier::Operations& operations) override;
-
/**
* Allocates a new TaskRunner for use by the passed in collection.
*/
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 662daf84274..fa9692c8619 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -118,13 +118,13 @@ void createCollection(OperationContext* txn,
/**
* Creates an oplog entry with given optime.
*/
-OplogEntry makeOplogEntry(OpTime opTime) {
+BSONObj makeOplogEntry(OpTime opTime) {
BSONObjBuilder bob;
bob.appendElements(opTime.toBSON());
bob.append("h", 1LL);
bob.append("op", "c");
bob.append("ns", "test.t");
- return OplogEntry(bob.obj());
+ return bob.obj();
}
/**
@@ -328,16 +328,16 @@ TEST_F(StorageInterfaceImplTest, SnapshotNotSupported) {
}
TEST_F(StorageInterfaceImplTest,
- WriteOpsToOplogReturnsEmptyArrayOperationWhenNoOperationsAreGiven) {
+ InsertDocumentsReturnsEmptyArrayOperationWhenNoOperationsAreGiven) {
NamespaceString nss("local." + _agent.getTestName());
StorageInterfaceImpl storageInterface(nss);
auto txn = getClient()->makeOperationContext();
ASSERT_EQUALS(ErrorCodes::EmptyArrayOperation,
- storageInterface.writeOpsToOplog(txn.get(), nss, {}));
+ storageInterface.insertDocuments(txn.get(), nss, {}));
}
TEST_F(StorageInterfaceImplTest,
- WriteOpsToOplogReturnsInternalErrorWhenSavingOperationToNonOplogCollection) {
+ InsertDocumentsReturnsInternalErrorWhenSavingOperationToNonOplogCollection) {
// Create fake non-oplog collection to ensure saving oplog entries (without _id field) will
// fail.
auto txn = getClient()->makeOperationContext();
@@ -347,12 +347,12 @@ TEST_F(StorageInterfaceImplTest,
// Non-oplog collection will enforce mandatory _id field requirement on insertion.
StorageInterfaceImpl storageInterface(nss);
auto op = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL});
- auto status = storageInterface.writeOpsToOplog(txn.get(), nss, {op}).getStatus();
+ auto status = storageInterface.insertDocuments(txn.get(), nss, {op});
ASSERT_EQUALS(ErrorCodes::InternalError, status);
ASSERT_STRING_CONTAINS(status.reason(), "Collection::insertDocument got document without _id");
}
-TEST_F(StorageInterfaceImplTest, WriteOpsToOplogSavesOperationsReturnsOpTimeOfLastOperation) {
+TEST_F(StorageInterfaceImplTest, InsertDocumentsSavesOperationsReturnsOpTimeOfLastOperation) {
// Create fake oplog collection to hold operations.
auto txn = getClient()->makeOperationContext();
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
@@ -363,25 +363,23 @@ TEST_F(StorageInterfaceImplTest, WriteOpsToOplogSavesOperationsReturnsOpTimeOfLa
StorageInterfaceImpl storageInterface(nss);
auto op1 = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL});
auto op2 = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL});
- ASSERT_EQUALS(
- op2.getOpTime(),
- unittest::assertGet(storageInterface.writeOpsToOplog(txn.get(), nss, {op1, op2})));
+ ASSERT_OK(storageInterface.insertDocuments(txn.get(), nss, {op1, op2}));
// Check contents of oplog. OplogInterface iterates over oplog collection in reverse.
repl::OplogInterfaceLocal oplog(txn.get(), nss.ns());
auto iter = oplog.makeIterator();
- ASSERT_EQUALS(op2.raw, unittest::assertGet(iter->next()).first);
- ASSERT_EQUALS(op1.raw, unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(op2, unittest::assertGet(iter->next()).first);
+ ASSERT_EQUALS(op1, unittest::assertGet(iter->next()).first);
ASSERT_EQUALS(ErrorCodes::NoSuchKey, iter->next().getStatus());
}
TEST_F(StorageInterfaceImplTest,
- WriteOpsToOplogReturnsNamespaceNotFoundIfOplogCollectionDoesNotExist) {
+ InsertDocumentsReturnsNamespaceNotFoundIfOplogCollectionDoesNotExist) {
auto op = makeOplogEntry({Timestamp(Seconds(1), 0), 1LL});
NamespaceString nss("local.nosuchcollection");
StorageInterfaceImpl storageInterface(nss);
auto txn = getClient()->makeOperationContext();
- auto status = storageInterface.writeOpsToOplog(txn.get(), nss, {op}).getStatus();
+ auto status = storageInterface.insertDocuments(txn.get(), nss, {op});
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
ASSERT_STRING_CONTAINS(status.reason(), "The collection must exist before inserting documents");
}
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index 57daebdb60c..96b40deba0d 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -72,21 +72,6 @@ void StorageInterfaceMock::setMinValid(OperationContext* txn, const BatchBoundar
_minValidBoundaries = boundaries;
}
-StatusWith<OpTime> StorageInterfaceMock::writeOpsToOplog(
- OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) {
- invariant(!operations.empty());
- stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex);
- for (const auto& oplogEntry : operations) {
- _operationsWrittenToOplog.push_back(oplogEntry.getOwned());
- }
- return operations.back().getOpTime();
-}
-
-MultiApplier::Operations StorageInterfaceMock::getOperationsWrittenToOplog() const {
- stdx::lock_guard<stdx::mutex> lock(_operationsWrittenToOplogMutex);
- return _operationsWrittenToOplog;
-}
-
Status CollectionBulkLoaderMock::init(OperationContext* txn,
Collection* coll,
const std::vector<BSONObj>& secondaryIndexSpecs) {
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index b25d91f8978..4b19194832d 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -129,12 +129,6 @@ public:
const DurableRequirement durReq) override;
void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) override;
- StatusWith<OpTime> writeOpsToOplog(OperationContext* txn,
- const NamespaceString& nss,
- const MultiApplier::Operations& operations) override;
-
- MultiApplier::Operations getOperationsWrittenToOplog() const;
-
StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading(
const NamespaceString& nss,
const CollectionOptions& options,
@@ -241,9 +235,6 @@ private:
mutable stdx::mutex _minValidBoundariesMutex;
BatchBoundaries _minValidBoundaries = {OpTime(), OpTime()};
-
- mutable stdx::mutex _operationsWrittenToOplogMutex;
- MultiApplier::Operations _operationsWrittenToOplog;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index a4514386671..84871d24249 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -1226,12 +1226,16 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
applyOps(writerVectors, workerPool, applyOperation);
- OpTime lastOpTime;
{
ON_BLOCK_EXIT([&] { workerPool->join(); });
- lastOpTime = fassertStatusOK(
+
+ std::vector<BSONObj> docs(ops.size());
+ auto toBSON = [](const OplogEntry& entry) { return entry.raw; };
+ std::transform(ops.begin(), ops.end(), docs.begin(), toBSON);
+
+ fassertStatusOK(
40141,
- StorageInterface::get(txn)->writeOpsToOplog(txn, NamespaceString(rsOplogName), ops));
+ StorageInterface::get(txn)->insertDocuments(txn, NamespaceString(rsOplogName), docs));
}
if (inShutdownStrict()) {
@@ -1240,7 +1244,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
"Cannot apply operations due to shutdown in progress"};
}
// We have now written all database writes and updated the oplog to match.
- return lastOpTime;
+ return ops.back().getOpTime();
}
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 81e4cc51ac1..2b1ff21d1c7 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -30,6 +30,7 @@
#include <algorithm>
#include <memory>
+#include <utility>
#include <vector>
#include "mongo/db/catalog/collection_options.h"
@@ -43,6 +44,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
@@ -107,6 +109,9 @@ void SyncTailTest::setUp() {
stdx::make_unique<ReplicationCoordinatorMock>(replSettings));
auto storageInterface = stdx::make_unique<StorageInterfaceMock>();
_storageInterface = storageInterface.get();
+ storageInterface->insertDocumentsFn = [](OperationContext*,
+ const NamespaceString&,
+ const std::vector<BSONObj>&) { return Status::OK(); };
StorageInterface::set(serviceContext, std::move(storageInterface));
Client::initThreadIfNotAlready();
@@ -556,6 +561,16 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1));
auto op2 = makeInsertDocumentOplogEntry({Timestamp(Seconds(2), 0), 1LL}, nss2, BSON("x" << 2));
+ NamespaceString nssForInsert;
+ std::vector<BSONObj> operationsWrittenToOplog;
+ _storageInterface->insertDocumentsFn = [&mutex, &nssForInsert, &operationsWrittenToOplog](
+ OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ nssForInsert = nss;
+ operationsWrittenToOplog = docs;
+ return Status::OK();
+ };
+
auto lastOpTime =
unittest::assertGet(multiApply(_txn.get(), &writerPool, {op1, op2}, applyOperationFn));
ASSERT_EQUALS(op2.getOpTime(), lastOpTime);
@@ -576,10 +591,11 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH
}
// Check ops in oplog.
- auto operationsWritternToOplog = _storageInterface->getOperationsWrittenToOplog();
- ASSERT_EQUALS(2U, operationsWritternToOplog.size());
- ASSERT_EQUALS(op1, operationsWritternToOplog[0]);
- ASSERT_EQUALS(op2, operationsWritternToOplog[1]);
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ ASSERT_EQUALS(2U, operationsWrittenToOplog.size());
+ ASSERT_EQUALS(NamespaceString(rsOplogName), nssForInsert);
+ ASSERT_EQUALS(op1.raw, operationsWrittenToOplog[0]);
+ ASSERT_EQUALS(op2.raw, operationsWrittenToOplog[1]);
}
TEST_F(SyncTailTest, MultiSyncApplyUsesSyncApplyToApplyOperation) {