summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-08-30 22:52:03 -0400
committerBenety Goh <benety@mongodb.com>2017-09-05 15:23:18 -0400
commit42bfda31eae322ac190e0c8cd831ca73f6e78f18 (patch)
treecf072595ef0004320012e186922ae1f915aa5cd9
parent4db291425761c1b557f10b643242ed08eb542df7 (diff)
downloadmongo-42bfda31eae322ac190e0c8cd831ca73f6e78f18.tar.gz
SERVER-30371 renaming collection across databases logs individual oplog entries
-rw-r--r--jstests/noPassthrough/atomic_rename_collection.js22
-rw-r--r--src/mongo/db/catalog/rename_collection.cpp134
-rw-r--r--src/mongo/db/catalog/rename_collection_test.cpp246
3 files changed, 321 insertions, 81 deletions
diff --git a/jstests/noPassthrough/atomic_rename_collection.js b/jstests/noPassthrough/atomic_rename_collection.js
index 8a1b5526a91..d63e08ff6ce 100644
--- a/jstests/noPassthrough/atomic_rename_collection.js
+++ b/jstests/noPassthrough/atomic_rename_collection.js
@@ -12,7 +12,19 @@
let local = prim.getDB("local");
// Test both for rename within a database as across databases.
- [{source: first.x, target: first.y}, {source: first.x, target: second.x}].forEach((test) => {
+ const tests = [
+ {
+ source: first.x,
+ target: first.y,
+ expectedOplogEntries: 1,
+ },
+ {
+ source: first.x,
+ target: second.x,
+ expectedOplogEntries: 4,
+ }
+ ];
+ tests.forEach((test) => {
test.source.drop();
assert.writeOK(test.source.insert({}));
assert.writeOK(test.target.insert({}));
@@ -25,9 +37,9 @@
};
assert.commandWorked(local.adminCommand(cmd), tojson(cmd));
ops = local.oplog.rs.find({ts: {$gt: ts}}).sort({$natural: 1}).toArray();
- assert.eq(
- ops.length,
- 1,
- "renameCollection was supposed to only generate a single oplog entry: " + tojson(ops));
+ assert.eq(ops.length,
+ test.expectedOplogEntries,
+ "renameCollection was supposed to only generate " + test.expectedOplogEntries +
+ " oplog entries: " + tojson(ops));
});
})();
diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp
index 30a1ac28d9c..f751accff49 100644
--- a/src/mongo/db/catalog/rename_collection.cpp
+++ b/src/mongo/db/catalog/rename_collection.cpp
@@ -57,13 +57,6 @@
namespace mongo {
namespace {
-void dropCollection(OperationContext* opCtx, Database* db, StringData collName) {
- WriteUnitOfWork wunit(opCtx);
- if (db->dropCollection(opCtx, collName).isOK()) {
- // ignoring failure case
- wunit.commit();
- }
-}
NamespaceString getNamespaceFromUUID(OperationContext* opCtx, const BSONElement& ui) {
if (ui.eoo())
@@ -98,7 +91,8 @@ Status renameCollectionCommon(OperationContext* opCtx,
globalWriteLock.emplace(opCtx);
// We stay in source context the whole time. This is mostly to set the CurOp namespace.
- OldClientContext ctx(opCtx, source.ns());
+ boost::optional<OldClientContext> ctx;
+ ctx.emplace(opCtx, source.ns());
bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() &&
!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, source);
@@ -253,14 +247,11 @@ Status renameCollectionCommon(OperationContext* opCtx,
Collection* tmpColl = nullptr;
OptionalCollectionUUID newUUID;
- bool isSourceCollectionTemporary = false;
{
auto collectionOptions = sourceColl->getCatalogEntry()->getCollectionOptions(opCtx);
- isSourceCollectionTemporary = collectionOptions.temp;
// Renaming across databases will result in a new UUID, as otherwise we'd require
// two collections with the same uuid (temporarily).
- collectionOptions.temp = true;
if (targetUUID)
newUUID = targetUUID;
else if (collectionOptions.uuid && enableCollectionUUIDs)
@@ -270,32 +261,43 @@ Status renameCollectionCommon(OperationContext* opCtx,
writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] {
WriteUnitOfWork wunit(opCtx);
-
- // No logOp necessary because the entire renameCollection command is one logOp.
- repl::UnreplicatedWritesBlock uwb(opCtx);
- tmpColl = targetDB->createCollection(opCtx,
- tmpName.ns(),
- collectionOptions,
- false); // _id index build with others later.
-
+ tmpColl = targetDB->createCollection(opCtx, tmpName.ns(), collectionOptions);
wunit.commit();
});
}
// Dismissed on success
- ScopeGuard tmpCollectionDropper = MakeGuard(dropCollection, opCtx, targetDB, tmpName.ns());
-
- MultiIndexBlock indexer(opCtx, tmpColl);
- indexer.allowInterruption();
- std::vector<MultiIndexBlock*> indexers{&indexer};
+ auto tmpCollectionDropper = MakeGuard([&] {
+ BSONObjBuilder unusedResult;
+ auto status =
+ dropCollection(opCtx,
+ tmpName,
+ unusedResult,
+ renameOpTimeFromApplyOps,
+ DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
+ if (!status.isOK()) {
+ // Ignoring failure case when dropping the temporary collection during cleanup because
+ // the rename operation has already failed for another reason.
+ log() << "Unable to drop temporary collection " << tmpName << " while renaming from "
+ << source << " to " << target << ": " << status;
+ }
+ });
// Copy the index descriptions from the source collection, adjusting the ns field.
{
+ MultiIndexBlock indexer(opCtx, tmpColl);
+ indexer.allowInterruption();
+
std::vector<BSONObj> indexesToCopy;
IndexCatalog::IndexIterator sourceIndIt =
sourceColl->getIndexCatalog()->getIndexIterator(opCtx, true);
while (sourceIndIt.more()) {
- const BSONObj currIndex = sourceIndIt.next()->infoObj();
+ auto descriptor = sourceIndIt.next();
+ if (descriptor->isIdIndex()) {
+ continue;
+ }
+
+ const BSONObj currIndex = descriptor->infoObj();
// Process the source index, adding fields in the same order as they were originally.
BSONObjBuilder newIndex;
@@ -308,14 +310,30 @@ Status renameCollectionCommon(OperationContext* opCtx,
}
indexesToCopy.push_back(newIndex.obj());
}
+
status = indexer.init(indexesToCopy).getStatus();
if (!status.isOK()) {
return status;
}
+
+ status = indexer.doneInserting();
+ if (!status.isOK()) {
+ return status;
+ }
+
+ writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] {
+ WriteUnitOfWork wunit(opCtx);
+ indexer.commit();
+ for (auto&& infoObj : indexesToCopy) {
+ getGlobalServiceContext()->getOpObserver()->onCreateIndex(
+ opCtx, tmpName, newUUID, infoObj, false);
+ }
+ wunit.commit();
+ });
}
{
- // Copy over all the data from source collection to target collection.
+ // Copy over all the data from source collection to temporary collection.
auto cursor = sourceColl->getCursor(opCtx);
while (auto record = cursor->next()) {
opCtx->checkForInterrupt();
@@ -324,9 +342,9 @@ Status renameCollectionCommon(OperationContext* opCtx,
status = writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] {
WriteUnitOfWork wunit(opCtx);
- // No logOp necessary because the entire renameCollection command is one logOp.
- repl::UnreplicatedWritesBlock uwb(opCtx);
- Status status = tmpColl->insertDocument(opCtx, obj, indexers, true);
+ const InsertStatement stmt(obj);
+ OpDebug* const opDebug = nullptr;
+ auto status = tmpColl->insertDocument(opCtx, stmt, opDebug, true);
if (!status.isOK())
return status;
wunit.commit();
@@ -339,60 +357,24 @@ Status renameCollectionCommon(OperationContext* opCtx,
}
}
- status = indexer.doneInserting();
- if (!status.isOK()) {
- return status;
- }
-
// Getting here means we successfully built the target copy. We now do the final
// in-place rename and remove the source collection.
- status = writeConflictRetry(opCtx, "renameCollection", tmpName.ns(), [&] {
- WriteUnitOfWork wunit(opCtx);
- indexer.commit();
- OptionalCollectionUUID dropTargetUUID;
- {
- repl::UnreplicatedWritesBlock uwb(opCtx);
- Status status = Status::OK();
- if (targetColl) {
- dropTargetUUID = targetColl->uuid();
- status = targetDB->dropCollection(opCtx, target.ns());
- }
- if (status.isOK()) {
- // When renaming the temporary collection in the target database, we have to take
- // into account the CollectionOptions.temp value of the source collection and the
- // 'stayTemp' option requested by the caller.
- // If the source collection is not temporary, the resulting target collection must
- // not be temporary.
- auto stayTemp = isSourceCollectionTemporary && options.stayTemp;
- status = targetDB->renameCollection(opCtx, tmpName.ns(), target.ns(), stayTemp);
- }
- if (status.isOK())
- status = sourceDB->dropCollection(opCtx, source.ns());
-
- if (!status.isOK())
- return status;
- }
-
- getGlobalServiceContext()->getOpObserver()->onRenameCollection(opCtx,
- source,
- target,
- newUUID,
- options.dropTarget,
- dropTargetUUID,
- sourceUUID,
- options.stayTemp);
-
- wunit.commit();
- return Status::OK();
- });
-
+ invariant(tmpName.db() == target.db());
+ status = renameCollectionCommon(
+ opCtx, tmpName, target, targetUUID, renameOpTimeFromApplyOps, options);
if (!status.isOK()) {
return status;
}
-
tmpCollectionDropper.Dismiss();
- return Status::OK();
+
+ BSONObjBuilder unusedResult;
+ return dropCollection(opCtx,
+ source,
+ unusedResult,
+ renameOpTimeFromApplyOps,
+ DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
}
+
} // namespace
Status renameCollection(OperationContext* opCtx,
diff --git a/src/mongo/db/catalog/rename_collection_test.cpp b/src/mongo/db/catalog/rename_collection_test.cpp
index fa32ebaac77..9e8f5ba2ac6 100644
--- a/src/mongo/db/catalog/rename_collection_test.cpp
+++ b/src/mongo/db/catalog/rename_collection_test.cpp
@@ -29,6 +29,8 @@
#include "mongo/platform/basic.h"
#include <set>
+#include <string>
+#include <vector>
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/collection_options.h"
@@ -54,6 +56,7 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/stringutils.h"
namespace {
@@ -66,6 +69,29 @@ using namespace mongo;
*/
class OpObserverMock : public OpObserverNoop {
public:
+ void onCreateIndex(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ BSONObj indexDoc,
+ bool fromMigrate) override;
+
+ void onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) override;
+
+ void onCreateCollection(OperationContext* opCtx,
+ Collection* coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex) override;
+
+ repl::OpTime onDropCollection(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ OptionalCollectionUUID uuid) override;
+
repl::OpTime onRenameCollection(OperationContext* opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
@@ -75,10 +101,63 @@ public:
OptionalCollectionUUID dropSourceUUID,
bool stayTemp) override;
+ // Operations written to the oplog. These are operations for which
+ // ReplicationCoordinator::isOplogDisabled() returns false.
+ std::vector<std::string> oplogEntries;
+
+ bool onInsertsThrows = false;
+
bool onRenameCollectionCalled = false;
repl::OpTime renameOpTime = {Timestamp(Seconds(100), 1U), 1LL};
+
+private:
+ /**
+ * Pushes 'operationName' into 'oplogEntries' if we can write to the oplog for this namespace.
+ */
+ void _logOp(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::string& operationName);
};
+void OpObserverMock::onCreateIndex(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ BSONObj indexDoc,
+ bool fromMigrate) {
+ _logOp(opCtx, nss, "index");
+ OpObserverNoop::onCreateIndex(opCtx, nss, uuid, indexDoc, fromMigrate);
+}
+
+void OpObserverMock::onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ std::vector<InsertStatement>::const_iterator begin,
+ std::vector<InsertStatement>::const_iterator end,
+ bool fromMigrate) {
+ if (onInsertsThrows) {
+ uasserted(ErrorCodes::OperationFailed, "insert failed");
+ }
+
+ _logOp(opCtx, nss, "inserts");
+ OpObserverNoop::onInserts(opCtx, nss, uuid, begin, end, fromMigrate);
+}
+
+void OpObserverMock::onCreateCollection(OperationContext* opCtx,
+ Collection* coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex) {
+ _logOp(opCtx, collectionName, "create");
+ OpObserverNoop::onCreateCollection(opCtx, coll, collectionName, options, idIndex);
+}
+
+repl::OpTime OpObserverMock::onDropCollection(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ OptionalCollectionUUID uuid) {
+ _logOp(opCtx, collectionName, "drop");
+ return OpObserverNoop::onDropCollection(opCtx, collectionName, uuid);
+}
+
repl::OpTime OpObserverMock::onRenameCollection(OperationContext* opCtx,
const NamespaceString& fromCollection,
const NamespaceString& toCollection,
@@ -87,10 +166,28 @@ repl::OpTime OpObserverMock::onRenameCollection(OperationContext* opCtx,
OptionalCollectionUUID dropTargetUUID,
OptionalCollectionUUID dropSourceUUID,
bool stayTemp) {
+ _logOp(opCtx, fromCollection, "rename");
+ OpObserverNoop::onRenameCollection(opCtx,
+ fromCollection,
+ toCollection,
+ uuid,
+ dropTarget,
+ dropTargetUUID,
+ dropSourceUUID,
+ stayTemp);
onRenameCollectionCalled = true;
return renameOpTime;
}
+void OpObserverMock::_logOp(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::string& operationName) {
+ if (repl::ReplicationCoordinator::get(opCtx)->isOplogDisabledFor(opCtx, nss)) {
+ return;
+ }
+ oplogEntries.push_back(operationName);
+}
+
class RenameCollectionTest : public ServiceContextMongoDTest {
public:
static ServiceContext::UniqueOperationContext makeOpCtx();
@@ -258,6 +355,24 @@ void _createIndex(OperationContext* opCtx,
ASSERT_TRUE(AutoGetCollectionForRead(opCtx, nss).getCollection());
}
+/**
+ * Inserts a single document into a collection.
+ */
+void _insertDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) {
+ writeConflictRetry(opCtx, "_insertDocument", nss.ns(), [=] {
+ AutoGetCollection autoColl(opCtx, nss, MODE_X);
+ auto collection = autoColl.getCollection();
+ ASSERT_TRUE(collection) << "Cannot insert document " << doc << " into collection " << nss
+ << " because collection " << nss.ns() << " does not exist.";
+
+ WriteUnitOfWork wuow(opCtx);
+ OpDebug* const opDebug = nullptr;
+ bool enforceQuota = true;
+ ASSERT_OK(collection->insertDocument(opCtx, InsertStatement(doc), opDebug, enforceQuota));
+ wuow.commit();
+ });
+}
+
TEST_F(RenameCollectionTest, RenameCollectionReturnsNamespaceNotFoundIfDatabaseDoesNotExist) {
ASSERT_FALSE(AutoGetDb(_opCtx.get(), _sourceNss.db(), MODE_X).getDb());
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound,
@@ -522,4 +637,135 @@ TEST_F(RenameCollectionTest, RenameDifferentDatabaseStayTempTrueSourceNotTempora
_testRenameCollectionStayTemp(_opCtx.get(), _sourceNss, _targetNssDifferentDb, true, false);
}
+/**
+ * Checks oplog entries written by the OpObserver to the oplog.
+ */
+void _checkOplogEntries(const std::vector<std::string>& actualOplogEntries,
+ const std::vector<std::string>& expectedOplogEntries) {
+ std::string actualOplogEntriesStr;
+ joinStringDelim(actualOplogEntries, &actualOplogEntriesStr, ',');
+ std::string expectedOplogEntriesStr;
+ joinStringDelim(expectedOplogEntries, &expectedOplogEntriesStr, ',');
+ ASSERT_EQUALS(expectedOplogEntries.size(), actualOplogEntries.size())
+ << str::stream()
+ << "Incorrect number of oplog entries written to oplog. Actual: " << actualOplogEntriesStr
+ << ". Expected: " << expectedOplogEntriesStr;
+ std::vector<std::string>::size_type i = 0;
+ for (const auto& actualOplogEntry : actualOplogEntries) {
+ const auto& expectedOplogEntry = expectedOplogEntries[i++];
+ ASSERT_EQUALS(expectedOplogEntry, actualOplogEntry)
+ << str::stream() << "Mismatch in oplog entry at index " << i
+ << ". Actual: " << actualOplogEntriesStr << ". Expected: " << expectedOplogEntriesStr;
+ }
+}
+
+/**
+ * Runs a rename across database operation and checks oplog entries writtent to the oplog.
+ */
+void _testRenameCollectionAcrossDatabaseOplogEntries(
+ OperationContext* opCtx,
+ const NamespaceString& sourceNss,
+ const NamespaceString& targetNss,
+ std::vector<std::string>* oplogEntries,
+ bool forApplyOps,
+ const std::vector<std::string>& expectedOplogEntries) {
+ ASSERT_NOT_EQUALS(sourceNss.db(), targetNss.db());
+ _createCollection(opCtx, sourceNss);
+ _createIndex(opCtx, sourceNss, "a_1");
+ _insertDocument(opCtx, sourceNss, BSON("_id" << 0));
+ oplogEntries->clear();
+ if (forApplyOps) {
+ auto cmd = BSON(
+ "renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "dropTarget" << true);
+ ASSERT_OK(renameCollectionForApplyOps(opCtx, sourceNss.db().toString(), {}, cmd, {}));
+ } else {
+ RenameCollectionOptions options;
+ options.dropTarget = true;
+ ASSERT_OK(renameCollection(opCtx, sourceNss, targetNss, options));
+ }
+ _checkOplogEntries(*oplogEntries, expectedOplogEntries);
+}
+
+TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseOplogEntries) {
+ bool forApplyOps = false;
+ _testRenameCollectionAcrossDatabaseOplogEntries(
+ _opCtx.get(),
+ _sourceNss,
+ _targetNssDifferentDb,
+ &_opObserver->oplogEntries,
+ forApplyOps,
+ {"create", "index", "inserts", "rename", "drop"});
+}
+
+TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsAcrossDatabaseOplogEntries) {
+ bool forApplyOps = true;
+ _testRenameCollectionAcrossDatabaseOplogEntries(
+ _opCtx.get(),
+ _sourceNss,
+ _targetNssDifferentDb,
+ &_opObserver->oplogEntries,
+ forApplyOps,
+ {"create", "index", "inserts", "rename", "drop"});
+}
+
+TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseOplogEntriesDropTarget) {
+ _createCollection(_opCtx.get(), _targetNssDifferentDb);
+ bool forApplyOps = false;
+ _testRenameCollectionAcrossDatabaseOplogEntries(
+ _opCtx.get(),
+ _sourceNss,
+ _targetNssDifferentDb,
+ &_opObserver->oplogEntries,
+ forApplyOps,
+ {"create", "index", "inserts", "rename", "drop"});
+}
+
+TEST_F(RenameCollectionTest, RenameCollectionForApplyOpsAcrossDatabaseOplogEntriesDropTarget) {
+ _createCollection(_opCtx.get(), _targetNssDifferentDb);
+ bool forApplyOps = true;
+ _testRenameCollectionAcrossDatabaseOplogEntries(
+ _opCtx.get(),
+ _sourceNss,
+ _targetNssDifferentDb,
+ &_opObserver->oplogEntries,
+ forApplyOps,
+ {"create", "index", "inserts", "rename", "drop"});
+}
+
+TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseOplogEntriesWritesNotReplicated) {
+ repl::UnreplicatedWritesBlock uwb(_opCtx.get());
+ bool forApplyOps = false;
+ _testRenameCollectionAcrossDatabaseOplogEntries(_opCtx.get(),
+ _sourceNss,
+ _targetNssDifferentDb,
+ &_opObserver->oplogEntries,
+ forApplyOps,
+ {});
+}
+
+TEST_F(RenameCollectionTest,
+ RenameCollectionForApplyOpsAcrossDatabaseOplogEntriesWritesNotReplicated) {
+ repl::UnreplicatedWritesBlock uwb(_opCtx.get());
+ bool forApplyOps = true;
+ _testRenameCollectionAcrossDatabaseOplogEntries(_opCtx.get(),
+ _sourceNss,
+ _targetNssDifferentDb,
+ &_opObserver->oplogEntries,
+ forApplyOps,
+ {});
+}
+
+TEST_F(RenameCollectionTest, RenameCollectionAcrossDatabaseDropsTemporaryCollectionOnException) {
+ _createCollection(_opCtx.get(), _sourceNss);
+ _createIndex(_opCtx.get(), _sourceNss, "a_1");
+ _insertDocument(_opCtx.get(), _sourceNss, BSON("_id" << 0));
+ _opObserver->onInsertsThrows = true;
+ _opObserver->oplogEntries.clear();
+ ASSERT_THROWS_CODE(
+ renameCollection(_opCtx.get(), _sourceNss, _targetNssDifferentDb, {}).ignore(),
+ AssertionException,
+ ErrorCodes::OperationFailed);
+ _checkOplogEntries(_opObserver->oplogEntries, {"create", "index", "drop"});
+}
+
} // namespace