/** * Copyright 2017 (C) MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/repl/idempotency_test_fixture.h" #include #include #include #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/util/md5.hpp" namespace mongo { namespace repl { namespace { /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, repl::OpTypeEnum opType, NamespaceString nss, BSONObj object, boost::optional object2 = boost::none, OperationSessionInfo sessionInfo = {}, boost::optional wallClockTime = boost::none, boost::optional stmtId = boost::none) { return repl::OplogEntry(opTime, // optime 1LL, // hash opType, // opType nss, // namespace boost::none, // uuid boost::none, // fromMigrate repl::OplogEntry::kOplogVersion, // version object, // o object2, // o2 sessionInfo, // sessionInfo boost::none, // upsert wallClockTime, // wall clock time stmtId, // statement id boost::none, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } } // namespace /** * Compares BSON objects (BSONObj) in two sets of BSON objects (BSONObjSet) to see if the two * sets are equivalent. * * Two sets are equivalent if and only if their sizes are the same and all of their elements * that share the same index position are also equivalent in value. */ bool CollectionState::cmpIndexSpecs(const BSONObjSet& otherSpecs) const { if (indexSpecs.size() != otherSpecs.size()) { return false; } auto thisIt = this->indexSpecs.begin(); auto otherIt = otherSpecs.begin(); // thisIt and otherIt cannot possibly be out of sync in terms of progression through // their respective sets because we ensured earlier that their sizes are equal and we // increment both by 1 on each iteration. We can avoid checking both iterator positions and // only check one (thisIt). for (; thisIt != this->indexSpecs.end(); ++thisIt, ++otherIt) { // Since these are ordered sets, we expect that in the case of equivalent index specs, // each copy will be in the same order in both sets, therefore each loop step should be // true. if (!thisIt->binaryEqual(*otherIt)) { return false; } } return true; } /** * Returns a std::string representation of the CollectionState struct of which this is a member * function. Returns out its representation in the form: * * Collection options: {...}; Index options: [...]; MD5 hash: */ std::string CollectionState::toString() const { if (!this->exists) { return "Collection does not exist."; } BSONObj collectionOptionsBSON = this->collectionOptions.toBSON(); StringBuilder sb; sb << "Collection options: " << collectionOptionsBSON.toString() << "; "; sb << "Index specs: [ "; bool firstIter = true; for (auto indexSpec : this->indexSpecs) { if (!firstIter) { sb << ", "; } else { firstIter = false; } sb << indexSpec.toString(); } sb << " ]; "; sb << "MD5 Hash: "; // Be more explicit about CollectionState structs without a supplied MD5 hash string. sb << (this->dataHash.length() != 0 ? this->dataHash : "No hash"); return sb.str(); } CollectionState::CollectionState(CollectionOptions collectionOptions_, BSONObjSet indexSpecs_, std::string dataHash_) : collectionOptions(std::move(collectionOptions_)), indexSpecs(std::move(indexSpecs_)), dataHash(std::move(dataHash_)), exists(true){}; bool operator==(const CollectionState& lhs, const CollectionState& rhs) { if (!lhs.exists || !rhs.exists) { return lhs.exists == rhs.exists; } BSONObj lhsCollectionOptionsBSON = lhs.collectionOptions.toBSON(); BSONObj rhsCollectionOptionsBSON = rhs.collectionOptions.toBSON(); // Since collection options uses deferred comparison, we opt to binary compare its BSON // representations. bool collectionOptionsEqual = lhsCollectionOptionsBSON.binaryEqual(rhsCollectionOptionsBSON); bool indexSpecsEqual = lhs.cmpIndexSpecs(rhs.indexSpecs); bool dataHashEqual = lhs.dataHash == rhs.dataHash; bool existsEqual = lhs.exists == rhs.exists; return collectionOptionsEqual && indexSpecsEqual && dataHashEqual && existsEqual; } bool operator!=(const CollectionState& lhs, const CollectionState& rhs) { return !(lhs == rhs); } std::ostream& operator<<(std::ostream& stream, const CollectionState& state) { return stream << state.toString(); } StringBuilderImpl& operator<<(StringBuilderImpl& sb, const CollectionState& state) { return sb << state.toString(); } const auto kCollectionDoesNotExist = CollectionState(); /** * Creates a command oplog entry with given optime and namespace. */ OplogEntry makeCommandOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& command) { return makeOplogEntry(opTime, OpTypeEnum::kCommand, nss.getCommandNS(), command); } /** * Creates a create collection oplog entry with given optime. */ OplogEntry makeCreateCollectionOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& options) { BSONObjBuilder bob; bob.append("create", nss.coll()); bob.appendElements(options); return makeCommandOplogEntry(opTime, nss, bob.obj()); } /** * Creates an insert oplog entry with given optime and namespace. */ OplogEntry makeInsertDocumentOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToInsert) { return makeOplogEntry(opTime, // optime OpTypeEnum::kInsert, // op type nss, // namespace documentToInsert, // o boost::none, // o2 {}, // session info Date_t::now()); // wall clock time } /** * Creates a delete oplog entry with given optime and namespace. */ OplogEntry makeDeleteDocumentOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToDelete) { return makeOplogEntry(opTime, // optime OpTypeEnum::kDelete, // op type nss, // namespace documentToDelete, // o boost::none, // o2 {}, // session info Date_t::now()); // wall clock time } /** * Creates an update oplog entry with given optime and namespace. */ OplogEntry makeUpdateDocumentOplogEntry(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToUpdate, const BSONObj& updatedDocument) { return makeOplogEntry(opTime, // optime OpTypeEnum::kUpdate, // op type nss, // namespace updatedDocument, // o documentToUpdate, // o2 {}, // session info Date_t::now()); // wall clock time } /** * Creates an index creation entry with given optime and namespace. */ OplogEntry makeCreateIndexOplogEntry(OpTime opTime, const NamespaceString& nss, const std::string& indexName, const BSONObj& keyPattern) { BSONObjBuilder indexInfoBob; indexInfoBob.append("v", 2); indexInfoBob.append("key", keyPattern); indexInfoBob.append("name", indexName); indexInfoBob.append("ns", nss.ns()); return makeInsertDocumentOplogEntry( opTime, NamespaceString(nss.getSystemIndexesCollection()), indexInfoBob.obj()); } /** * Creates an insert oplog entry with given optime, namespace and session info. */ OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToInsert, OperationSessionInfo info) { return makeOplogEntry(opTime, // optime OpTypeEnum::kInsert, // op type nss, // namespace documentToInsert, // o boost::none, // o2 info, // session info Date_t::now()); // wall clock time } OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(OpTime opTime, const NamespaceString& nss, const BSONObj& documentToInsert, LogicalSessionId lsid, TxnNumber txnNum, StmtId stmtId) { OperationSessionInfo info; info.setSessionId(lsid); info.setTxnNumber(txnNum); return makeOplogEntry(opTime, // optime OpTypeEnum::kInsert, // op type nss, // namespace documentToInsert, // o boost::none, // o2 info, // session info Date_t::now(), // wall clock time stmtId); // statement id } Status IdempotencyTest::resetState() { return Status::OK(); } void IdempotencyTest::testOpsAreIdempotent(std::vector ops, SequenceType sequenceType) { ASSERT_OK(resetState()); ASSERT_OK(runOpsInitialSync(ops)); auto state1 = validate(); auto iterations = sequenceType == SequenceType::kEntireSequence ? 1 : ops.size(); for (std::size_t i = 0; i < iterations; i++) { ASSERT_OK(resetState()); std::vector fullSequence; if (sequenceType == SequenceType::kEntireSequence) { ASSERT_OK(runOpsInitialSync(ops)); fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); } else if (sequenceType == SequenceType::kAnyPrefix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { std::vector prefix(ops.begin(), ops.begin() + i + 1); ASSERT_OK(runOpsInitialSync(prefix)); fullSequence.insert(fullSequence.end(), prefix.begin(), prefix.end()); } ASSERT_OK(runOpsInitialSync(ops)); fullSequence.insert(fullSequence.end(), ops.begin(), ops.end()); if (sequenceType == SequenceType::kAnySuffix || sequenceType == SequenceType::kAnyPrefixOrSuffix) { std::vector suffix(ops.begin() + i, ops.end()); ASSERT_OK(runOpsInitialSync(suffix)); fullSequence.insert(fullSequence.end(), suffix.begin(), suffix.end()); } auto state2 = validate(); if (state1 != state2) { FAIL(getStateString(state1, state2, fullSequence)); } } } OplogEntry IdempotencyTest::createCollection(CollectionUUID uuid) { return makeCreateCollectionOplogEntry(nextOpTime(), nss, BSON("uuid" << uuid)); } OplogEntry IdempotencyTest::dropCollection() { return makeCommandOplogEntry(nextOpTime(), nss, BSON("drop" << nss.coll())); } OplogEntry IdempotencyTest::insert(const BSONObj& obj) { return makeInsertDocumentOplogEntry(nextOpTime(), nss, obj); } template OplogEntry IdempotencyTest::update(IdType _id, const BSONObj& obj) { return makeUpdateDocumentOplogEntry(nextOpTime(), nss, BSON("_id" << _id), obj); } OplogEntry IdempotencyTest::buildIndex(const BSONObj& indexSpec, const BSONObj& options) { BSONObjBuilder bob; bob.append("v", 2); bob.append("key", indexSpec); bob.append("name", std::string(indexSpec.firstElementFieldName()) + "_index"); bob.append("ns", nss.ns()); bob.appendElementsUnique(options); return makeInsertDocumentOplogEntry(nextOpTime(), nssIndex, bob.obj()); } OplogEntry IdempotencyTest::dropIndex(const std::string& indexName) { auto cmd = BSON("deleteIndexes" << nss.coll() << "index" << indexName); return makeCommandOplogEntry(nextOpTime(), nss, cmd); } std::string IdempotencyTest::computeDataHash(Collection* collection) { IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex(_opCtx.get()); ASSERT_TRUE(desc); auto exec = InternalPlanner::indexScan(_opCtx.get(), collection, desc, BSONObj(), BSONObj(), BoundInclusion::kIncludeStartKeyOnly, PlanExecutor::NO_YIELD, InternalPlanner::FORWARD, InternalPlanner::IXSCAN_FETCH); ASSERT(NULL != exec.get()); md5_state_t st; md5_init(&st); PlanExecutor::ExecState state; BSONObj obj; while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { obj = this->canonicalizeDocumentForDataHash(obj); md5_append(&st, (const md5_byte_t*)obj.objdata(), obj.objsize()); } ASSERT_EQUALS(PlanExecutor::IS_EOF, state); md5digest d; md5_finish(&st, d); return digestToString(d); } CollectionState IdempotencyTest::validate() { AutoGetCollectionForReadCommand autoColl(_opCtx.get(), nss); auto collection = autoColl.getCollection(); if (!collection) { // Return a mostly default initialized CollectionState struct with exists set to false to // indicate an unfound Collection (or a view). return kCollectionDoesNotExist; } ValidateResults validateResults; BSONObjBuilder bob; Lock::DBLock lk(_opCtx.get(), nss.db(), MODE_IX); auto lock = stdx::make_unique(_opCtx->lockState(), nss.ns(), MODE_X); ASSERT_OK(collection->validate( _opCtx.get(), kValidateFull, false, std::move(lock), &validateResults, &bob)); ASSERT_TRUE(validateResults.valid); std::string dataHash = computeDataHash(collection); auto collectionCatalog = collection->getCatalogEntry(); auto collectionOptions = collectionCatalog->getCollectionOptions(_opCtx.get()); std::vector allIndexes; BSONObjSet indexSpecs = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); collectionCatalog->getAllIndexes(_opCtx.get(), &allIndexes); for (auto const& index : allIndexes) { indexSpecs.insert(collectionCatalog->getIndexSpec(_opCtx.get(), index)); } ASSERT_EQUALS(indexSpecs.size(), allIndexes.size()); CollectionState collectionState(collectionOptions, indexSpecs, dataHash); return collectionState; } std::string IdempotencyTest::getStateString(const CollectionState& state1, const CollectionState& state2, const std::vector& ops) { StringBuilder sb; sb << "The state: " << state1 << " does not match with the state: " << state2 << " found after applying the operations a second time, therefore breaking idempotency."; return sb.str(); } template OplogEntry IdempotencyTest::update(int _id, const BSONObj& obj); template OplogEntry IdempotencyTest::update(char const* _id, const BSONObj& obj); } // namespace repl } // namespace mongo