/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/op_observer_impl.h"
#include "keys_collection_client_sharded.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/locker_noop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/keys_collection_manager.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog.h"
#include "mongo/s/config_server_test_fixture.h"
#include "mongo/unittest/death_test.h"
#include "mongo/util/clock_source_mock.h"
namespace mongo {
namespace {
class OpObserverTest : public ServiceContextMongoDTest {
public:
void setUp() override {
// Set up mongod.
ServiceContextMongoDTest::setUp();
auto service = getServiceContext();
auto opCtx = cc().makeOperationContext();
// Set up ReplicationCoordinator and create oplog.
repl::ReplicationCoordinator::set(
service, stdx::make_unique(service));
repl::setOplogCollectionName(service);
repl::createOplog(opCtx.get());
// Ensure that we are primary.
auto replCoord = repl::ReplicationCoordinator::get(opCtx.get());
ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
}
protected:
// Assert that oplog only has a single entry and return that oplog entry.
BSONObj getSingleOplogEntry(OperationContext* opCtx) {
repl::OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns());
auto oplogIter = oplogInterface.makeIterator();
auto opEntry = unittest::assertGet(oplogIter->next());
ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus());
return opEntry.first;
}
};
TEST_F(OpObserverTest, CollModWithCollectionOptionsAndTTLInfo) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
auto uuid = CollectionUUID::gen();
// Create 'collMod' command.
NamespaceString nss("test.coll");
BSONObj collModCmd = BSON("collMod" << nss.coll() << "validationLevel"
<< "off"
<< "validationAction"
<< "warn"
// We verify that 'onCollMod' ignores this field.
<< "index"
<< "indexData");
CollectionOptions oldCollOpts;
oldCollOpts.validationLevel = "strict";
oldCollOpts.validationAction = "error";
oldCollOpts.flags = 2;
oldCollOpts.flagsSet = true;
TTLCollModInfo ttlInfo;
ttlInfo.expireAfterSeconds = Seconds(10);
ttlInfo.oldExpireAfterSeconds = Seconds(5);
ttlInfo.indexName = "name_of_index";
// Write to the oplog.
{
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
opObserver.onCollMod(opCtx.get(), nss, uuid, collModCmd, oldCollOpts, ttlInfo);
wunit.commit();
}
auto oplogEntry = getSingleOplogEntry(opCtx.get());
// Ensure that collMod fields were properly added to the oplog entry.
auto o = oplogEntry.getObjectField("o");
auto oExpected =
BSON("collMod" << nss.coll() << "validationLevel"
<< "off"
<< "validationAction"
<< "warn"
<< "index"
<< BSON("name" << ttlInfo.indexName << "expireAfterSeconds"
<< durationCount(ttlInfo.expireAfterSeconds)));
ASSERT_BSONOBJ_EQ(oExpected, o);
// Ensure that the old collection metadata was saved.
auto o2 = oplogEntry.getObjectField("o2");
auto o2Expected =
BSON("collectionOptions_old" << BSON("flags" << oldCollOpts.flags << "validationLevel"
<< oldCollOpts.validationLevel
<< "validationAction"
<< oldCollOpts.validationAction)
<< "expireAfterSeconds_old"
<< durationCount(ttlInfo.oldExpireAfterSeconds));
ASSERT_BSONOBJ_EQ(o2Expected, o2);
}
TEST_F(OpObserverTest, CollModWithOnlyCollectionOptions) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
auto uuid = CollectionUUID::gen();
// Create 'collMod' command.
NamespaceString nss("test.coll");
BSONObj collModCmd = BSON("collMod" << nss.coll() << "validationLevel"
<< "off"
<< "validationAction"
<< "warn");
CollectionOptions oldCollOpts;
oldCollOpts.validationLevel = "strict";
oldCollOpts.validationAction = "error";
// Write to the oplog.
{
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
opObserver.onCollMod(opCtx.get(), nss, uuid, collModCmd, oldCollOpts, boost::none);
wunit.commit();
}
auto oplogEntry = getSingleOplogEntry(opCtx.get());
// Ensure that collMod fields were properly added to oplog entry.
auto o = oplogEntry.getObjectField("o");
auto oExpected = collModCmd;
ASSERT_BSONOBJ_EQ(oExpected, o);
// Ensure that the old collection metadata was saved and that TTL info is not present.
auto o2 = oplogEntry.getObjectField("o2");
auto o2Expected =
BSON("collectionOptions_old"
<< BSON("validationLevel" << oldCollOpts.validationLevel << "validationAction"
<< oldCollOpts.validationAction));
ASSERT_BSONOBJ_EQ(o2Expected, o2);
}
TEST_F(OpObserverTest, OnDropCollectionReturnsDropOpTime) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
auto uuid = CollectionUUID::gen();
// Create 'drop' command.
NamespaceString nss("test.coll");
auto dropCmd = BSON("drop" << nss.coll());
// Write to the oplog.
repl::OpTime dropOpTime;
{
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
opObserver.onDropCollection(opCtx.get(), nss, uuid);
dropOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front();
wunit.commit();
}
auto oplogEntry = getSingleOplogEntry(opCtx.get());
// Ensure that drop fields were properly added to oplog entry.
auto o = oplogEntry.getObjectField("o");
auto oExpected = dropCmd;
ASSERT_BSONOBJ_EQ(oExpected, o);
// Ensure that the drop optime returned is the same as the last optime in the ReplClientInfo.
ASSERT_EQUALS(repl::ReplClientInfo::forClient(&cc()).getLastOp(), dropOpTime);
}
TEST_F(OpObserverTest, OnRenameCollectionReturnsRenameOpTime) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
// Create 'renameCollection' command.
auto dropTarget = false;
auto stayTemp = false;
NamespaceString sourceNss("test.foo");
NamespaceString targetNss("test.bar");
auto renameCmd = BSON(
"renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "stayTemp" << stayTemp
<< "dropTarget"
<< dropTarget);
// Write to the oplog.
repl::OpTime renameOpTime;
{
AutoGetDb autoDb(opCtx.get(), sourceNss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
opObserver.onRenameCollection(
opCtx.get(), sourceNss, targetNss, {}, dropTarget, {}, stayTemp);
renameOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front();
wunit.commit();
}
auto oplogEntry = getSingleOplogEntry(opCtx.get());
// Ensure that renameCollection fields were properly added to oplog entry.
auto o = oplogEntry.getObjectField("o");
auto oExpected = renameCmd;
ASSERT_BSONOBJ_EQ(oExpected, o);
// Ensure that the rename optime returned is the same as the last optime in the ReplClientInfo.
ASSERT_EQUALS(repl::ReplClientInfo::forClient(&cc()).getLastOp(), renameOpTime);
}
/**
* Test fixture for testing OpObserver behavior specific to the SessionCatalog.
*/
class OpObserverSessionCatalogTest : public OpObserverTest {
public:
void setUp() override {
OpObserverTest::setUp();
auto opCtx = cc().makeOperationContext();
SessionCatalog::reset_forTest(getServiceContext());
SessionCatalog::create(getServiceContext());
auto sessionCatalog = SessionCatalog::get(getServiceContext());
sessionCatalog->onStepUp(opCtx.get());
}
/**
* Simulate a new write occurring on given session with the given transaction number and
* statement id.
*/
void simulateSessionWrite(OperationContext* opCtx,
ScopedSession session,
NamespaceString nss,
TxnNumber txnNum,
StmtId stmtId) {
session->beginOrContinueTxn(opCtx, txnNum, boost::none);
{
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
WriteUnitOfWork wuow(opCtx);
auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp.
session->onWriteOpCompletedOnPrimary(opCtx, txnNum, {stmtId}, opTime, Date_t::now());
wuow.commit();
}
}
};
TEST_F(OpObserverSessionCatalogTest, OnRollbackInvalidatesSessionCatalogIfSessionOpsRolledBack) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
const NamespaceString nss("testDB", "testColl");
// Create a session.
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId);
// Simulate a write occurring on that session.
const TxnNumber txnNum = 0;
const StmtId stmtId = 1000;
simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId);
// Check that the statement executed.
ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
// The OpObserver should invalidate in-memory session state, so the check after this should
// fail.
OpObserver::RollbackObserverInfo rbInfo;
rbInfo.rollbackSessionIds = {UUID::gen()};
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
ASSERT_THROWS_CODE(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId),
DBException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(OpObserverSessionCatalogTest,
OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
const NamespaceString nss("testDB", "testColl");
// Create a session.
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId);
// Simulate a write occurring on that session.
const TxnNumber txnNum = 0;
const StmtId stmtId = 1000;
simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId);
// Check that the statement executed.
ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
// The OpObserver should not invalidate the in-memory session state, so the check after this
// should still succeed.
OpObserver::RollbackObserverInfo rbInfo;
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
}
TEST_F(OpObserverTest, OnRollbackInvalidatesAuthCacheWhenAuthNamespaceRolledBack) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
auto authMgr = AuthorizationManager::get(getServiceContext());
auto initCacheGen = authMgr->getCacheGeneration();
// Verify that the rollback op observer invalidates the user cache for each auth namespace by
// checking that the cache generation changes after a call to the rollback observer method.
auto nss = AuthorizationManager::rolesCollectionNamespace;
OpObserver::RollbackObserverInfo rbInfo;
rbInfo.rollbackNamespaces = {AuthorizationManager::rolesCollectionNamespace};
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
ASSERT_NE(initCacheGen, authMgr->getCacheGeneration());
initCacheGen = authMgr->getCacheGeneration();
rbInfo.rollbackNamespaces = {AuthorizationManager::usersCollectionNamespace};
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
ASSERT_NE(initCacheGen, authMgr->getCacheGeneration());
initCacheGen = authMgr->getCacheGeneration();
rbInfo.rollbackNamespaces = {AuthorizationManager::versionCollectionNamespace};
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
ASSERT_NE(initCacheGen, authMgr->getCacheGeneration());
}
TEST_F(OpObserverTest, OnRollbackDoesntInvalidateAuthCacheWhenNoAuthNamespaceRolledBack) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
auto authMgr = AuthorizationManager::get(getServiceContext());
auto initCacheGen = authMgr->getCacheGeneration();
// Verify that the rollback op observer doesn't invalidate the user cache.
auto nss = AuthorizationManager::rolesCollectionNamespace;
OpObserver::RollbackObserverInfo rbInfo;
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
auto newCacheGen = authMgr->getCacheGeneration();
ASSERT_EQ(newCacheGen, initCacheGen);
}
TEST_F(OpObserverTest, MultipleAboutToDeleteAndOnDelete) {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
NamespaceString nss = {"test", "coll"};
AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
WriteUnitOfWork wunit(opCtx.get());
opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1));
opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
opObserver.aboutToDelete(opCtx.get(), nss, BSON("_id" << 1));
opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
}
DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
opCtx->swapLockState(stdx::make_unique());
NamespaceString nss = {"test", "coll"};
opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
}
DEATH_TEST_F(OpObserverTest, EachOnDeleteRequiresAboutToDelete, "invariant") {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
opCtx->swapLockState(stdx::make_unique());
NamespaceString nss = {"test", "coll"};
opObserver.aboutToDelete(opCtx.get(), nss, {});
opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
opObserver.onDelete(opCtx.get(), nss, {}, {}, false, {});
}
DEATH_TEST_F(OpObserverTest,
NodeCrashesIfShardIdentityDocumentRolledBack,
"Fatal Assertion 50712") {
OpObserverImpl opObserver;
auto opCtx = cc().makeOperationContext();
OpObserver::RollbackObserverInfo rbInfo;
rbInfo.shardIdentityRolledBack = true;
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
}
} // namespace
} // namespace mongo