diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-18 16:52:40 +0200 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-23 06:08:41 -0400 |
commit | 14c224a6437ff476b8ff49b2e2bd2aa7758d2862 (patch) | |
tree | d967c43f3fb379d6f2a8d671829b24a7196ad034 | |
parent | 00b88359c7c1440c519c0d06afbcd3c181fa0bbb (diff) | |
download | mongo-14c224a6437ff476b8ff49b2e2bd2aa7758d2862.tar.gz |
SERVER-37657 Report the offending oplog entries if a batch contains non-increasing transaction numbers
(cherry picked from commit 826c8b47c283749bb6f751d57729c6c3ac160a75)
5 files changed, 50 insertions, 43 deletions
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index c0baf92db3a..9988a3832b9 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -28,6 +28,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + #include "mongo/platform/basic.h" #include "mongo/db/repl/session_update_tracker.h" @@ -35,16 +37,18 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/session.h" #include "mongo/util/assert_util.h" +#include "mongo/util/log.h" namespace mongo { namespace repl { boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( const OplogEntry& entry) { - auto ns = entry.getNamespace(); + const auto& ns = entry.getNamespace(); + if (ns == NamespaceString::kSessionTransactionsTableNamespace || (ns.isConfigDB() && ns.isCommand())) { - return flush(entry); + return _flush(entry); } _updateSessionInfo(entry); @@ -52,14 +56,14 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( } void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { - auto sessionInfo = entry.getOperationSessionInfo(); + const auto& sessionInfo = entry.getOperationSessionInfo(); if (!sessionInfo.getTxnNumber()) { return; } - auto lsid = sessionInfo.getSessionId(); - fassert(50842, lsid.is_initialized()); + const auto& lsid = sessionInfo.getSessionId(); + invariant(lsid); auto iter = _sessionsToUpdate.find(lsid->getId()); if (iter == _sessionsToUpdate.end()) { @@ -67,12 +71,21 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) { return; } - auto existingSessionInfo = iter->second.getOperationSessionInfo(); - fassert(50843, *sessionInfo.getTxnNumber() >= *existingSessionInfo.getTxnNumber()); - iter->second = entry; + const auto& existingSessionInfo = iter->second.getOperationSessionInfo(); + if (*sessionInfo.getTxnNumber() >= *existingSessionInfo.getTxnNumber()) { + iter->second = entry; + return; + } + + severe() << "Entry for session " << lsid->getId() << " has txnNumber " + << *sessionInfo.getTxnNumber() << " < " << *existingSessionInfo.getTxnNumber(); + severe() << "New oplog entry: " << redact(entry.toString()); + severe() << "Existing oplog entry: " << redact(iter->second.toString()); + + fassertFailedNoTrace(50843); } -std::vector<OplogEntry> SessionUpdateTracker::flush(const OplogEntry& entry) { +std::vector<OplogEntry> SessionUpdateTracker::_flush(const OplogEntry& entry) { switch (entry.getOpType()) { case OpTypeEnum::kInsert: case OpTypeEnum::kNoop: diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h index de89d22bc1c..e4288324337 100644 --- a/src/mongo/db/repl/session_update_tracker.h +++ b/src/mongo/db/repl/session_update_tracker.h @@ -57,14 +57,6 @@ public: boost::optional<std::vector<OplogEntry>> updateOrFlush(const OplogEntry& entry); /** - * Analyzes the given oplog entry and determines which transactions stored so far needs to be - * converted to oplog writes. - * - * Note: should only be called when oplog entry's ns target config.transactions or config.$cmd. - */ - std::vector<OplogEntry> flush(const OplogEntry& entry); - - /** * Converts all stored transaction infos to oplog writes to config.transactions. * Can return an empty vector if there is nothing to flush. */ @@ -72,6 +64,14 @@ public: private: /** + * Analyzes the given oplog entry and determines which transactions stored so far needs to be + * converted to oplog writes. + * + * Note: should only be called when oplog entry's ns target config.transactions or config.$cmd. + */ + std::vector<OplogEntry> _flush(const OplogEntry& entry); + + /** * Converts stored transaction infos that has a matching transcation id with the given * query predicate. Can return an empty vector if there is nothing to flush. */ diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 46603ec3a67..a5dfbf3ebb4 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -671,28 +671,6 @@ void fillWriterVectors(OperationContext* opCtx, } } -} // namespace - -/** - * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then using - * a set of threads to apply the operations. If the batch application is successful, returns the - * optime of the last op applied, which should be the last op in the batch. To provide crash - * resilience, this function will advance the persistent value of 'minValid' to at least the - * last optime of the batch. If 'minValid' is already greater than or equal to the last optime of - * this batch, it will not be updated. - */ -OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { - auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status { - _applyFunc(ops, this); - // This function is used by 3.2 initial sync and steady state data replication. - // _applyFunc() will throw or abort on error, so we return OK here. - return Status::OK(); - }; - return fassertStatusOK( - 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation)); -} - -namespace { void tryToGoLiveAsASecondary(OperationContext* opCtx, ReplicationCoordinator* replCoord, OpTime minValid) { @@ -739,6 +717,26 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx, << ". Current state: " << replCoord->getMemberState() << causedBy(status); } } + +} // namespace + +/** + * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then using + * a set of threads to apply the operations. If the batch application is successful, returns the + * optime of the last op applied, which should be the last op in the batch. To provide crash + * resilience, this function will advance the persistent value of 'minValid' to at least the + * last optime of the batch. If 'minValid' is already greater than or equal to the last optime of + * this batch, it will not be updated. + */ +OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { + auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status { + _applyFunc(ops, this); + // This function is used by 3.2 initial sync and steady state data replication. + // _applyFunc() will throw or abort on error, so we return OK here. + return Status::OK(); + }; + return fassertStatusOK( + 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation)); } class SyncTail::OpQueueBatcher { diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp index 44d6a3fd079..4952c9a5aa1 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp @@ -74,8 +74,6 @@ using std::set; namespace { -const Seconds kDefaultFindHostMaxWaitTime(20); - const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); diff --git a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp index 54e1350e0fa..87d9d8d165e 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp @@ -85,8 +85,6 @@ using CallbackArgs = executor::TaskExecutor::CallbackArgs; using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; -const Seconds kDefaultFindHostMaxWaitTime(20); - const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); |