summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-18 16:52:40 +0200
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-23 06:08:41 -0400
commit14c224a6437ff476b8ff49b2e2bd2aa7758d2862 (patch)
treed967c43f3fb379d6f2a8d671829b24a7196ad034
parent00b88359c7c1440c519c0d06afbcd3c181fa0bbb (diff)
downloadmongo-14c224a6437ff476b8ff49b2e2bd2aa7758d2862.tar.gz
SERVER-37657 Report the offending oplog entries if a batch contains non-increasing transaction numbers
(cherry picked from commit 826c8b47c283749bb6f751d57729c6c3ac160a75)
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp31
-rw-r--r--src/mongo/db/repl/session_update_tracker.h16
-rw-r--r--src/mongo/db/repl/sync_tail.cpp42
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp2
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp2
5 files changed, 50 insertions, 43 deletions
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index c0baf92db3a..9988a3832b9 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -28,6 +28,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
#include "mongo/platform/basic.h"
#include "mongo/db/repl/session_update_tracker.h"
@@ -35,16 +37,18 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/session.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace repl {
boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
const OplogEntry& entry) {
- auto ns = entry.getNamespace();
+ const auto& ns = entry.getNamespace();
+
if (ns == NamespaceString::kSessionTransactionsTableNamespace ||
(ns.isConfigDB() && ns.isCommand())) {
- return flush(entry);
+ return _flush(entry);
}
_updateSessionInfo(entry);
@@ -52,14 +56,14 @@ boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
}
void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
- auto sessionInfo = entry.getOperationSessionInfo();
+ const auto& sessionInfo = entry.getOperationSessionInfo();
if (!sessionInfo.getTxnNumber()) {
return;
}
- auto lsid = sessionInfo.getSessionId();
- fassert(50842, lsid.is_initialized());
+ const auto& lsid = sessionInfo.getSessionId();
+ invariant(lsid);
auto iter = _sessionsToUpdate.find(lsid->getId());
if (iter == _sessionsToUpdate.end()) {
@@ -67,12 +71,21 @@ void SessionUpdateTracker::_updateSessionInfo(const OplogEntry& entry) {
return;
}
- auto existingSessionInfo = iter->second.getOperationSessionInfo();
- fassert(50843, *sessionInfo.getTxnNumber() >= *existingSessionInfo.getTxnNumber());
- iter->second = entry;
+ const auto& existingSessionInfo = iter->second.getOperationSessionInfo();
+ if (*sessionInfo.getTxnNumber() >= *existingSessionInfo.getTxnNumber()) {
+ iter->second = entry;
+ return;
+ }
+
+ severe() << "Entry for session " << lsid->getId() << " has txnNumber "
+ << *sessionInfo.getTxnNumber() << " < " << *existingSessionInfo.getTxnNumber();
+ severe() << "New oplog entry: " << redact(entry.toString());
+ severe() << "Existing oplog entry: " << redact(iter->second.toString());
+
+ fassertFailedNoTrace(50843);
}
-std::vector<OplogEntry> SessionUpdateTracker::flush(const OplogEntry& entry) {
+std::vector<OplogEntry> SessionUpdateTracker::_flush(const OplogEntry& entry) {
switch (entry.getOpType()) {
case OpTypeEnum::kInsert:
case OpTypeEnum::kNoop:
diff --git a/src/mongo/db/repl/session_update_tracker.h b/src/mongo/db/repl/session_update_tracker.h
index de89d22bc1c..e4288324337 100644
--- a/src/mongo/db/repl/session_update_tracker.h
+++ b/src/mongo/db/repl/session_update_tracker.h
@@ -57,14 +57,6 @@ public:
boost::optional<std::vector<OplogEntry>> updateOrFlush(const OplogEntry& entry);
/**
- * Analyzes the given oplog entry and determines which transactions stored so far needs to be
- * converted to oplog writes.
- *
- * Note: should only be called when oplog entry's ns target config.transactions or config.$cmd.
- */
- std::vector<OplogEntry> flush(const OplogEntry& entry);
-
- /**
* Converts all stored transaction infos to oplog writes to config.transactions.
* Can return an empty vector if there is nothing to flush.
*/
@@ -72,6 +64,14 @@ public:
private:
/**
+ * Analyzes the given oplog entry and determines which transactions stored so far needs to be
+ * converted to oplog writes.
+ *
+ * Note: should only be called when oplog entry's ns target config.transactions or config.$cmd.
+ */
+ std::vector<OplogEntry> _flush(const OplogEntry& entry);
+
+ /**
* Converts stored transaction infos that has a matching transcation id with the given
* query predicate. Can return an empty vector if there is nothing to flush.
*/
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 46603ec3a67..a5dfbf3ebb4 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -671,28 +671,6 @@ void fillWriterVectors(OperationContext* opCtx,
}
}
-} // namespace
-
-/**
- * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then using
- * a set of threads to apply the operations. If the batch application is successful, returns the
- * optime of the last op applied, which should be the last op in the batch. To provide crash
- * resilience, this function will advance the persistent value of 'minValid' to at least the
- * last optime of the batch. If 'minValid' is already greater than or equal to the last optime of
- * this batch, it will not be updated.
- */
-OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
- auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status {
- _applyFunc(ops, this);
- // This function is used by 3.2 initial sync and steady state data replication.
- // _applyFunc() will throw or abort on error, so we return OK here.
- return Status::OK();
- };
- return fassertStatusOK(
- 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation));
-}
-
-namespace {
void tryToGoLiveAsASecondary(OperationContext* opCtx,
ReplicationCoordinator* replCoord,
OpTime minValid) {
@@ -739,6 +717,26 @@ void tryToGoLiveAsASecondary(OperationContext* opCtx,
<< ". Current state: " << replCoord->getMemberState() << causedBy(status);
}
}
+
+} // namespace
+
+/**
+ * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then using
+ * a set of threads to apply the operations. If the batch application is successful, returns the
+ * optime of the last op applied, which should be the last op in the batch. To provide crash
+ * resilience, this function will advance the persistent value of 'minValid' to at least the
+ * last optime of the batch. If 'minValid' is already greater than or equal to the last optime of
+ * this batch, it will not be updated.
+ */
+OpTime SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
+ auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status {
+ _applyFunc(ops, this);
+ // This function is used by 3.2 initial sync and steady state data replication.
+ // _applyFunc() will throw or abort on error, so we return OK here.
+ return Status::OK();
+ };
+ return fassertStatusOK(
+ 34437, repl::multiApply(opCtx, _writerPool.get(), std::move(ops), applyOperation));
}
class SyncTail::OpQueueBatcher {
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
index 44d6a3fd079..4952c9a5aa1 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_collection_operations.cpp
@@ -74,8 +74,6 @@ using std::set;
namespace {
-const Seconds kDefaultFindHostMaxWaitTime(20);
-
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp
index 54e1350e0fa..87d9d8d165e 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations.cpp
@@ -85,8 +85,6 @@ using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
-const Seconds kDefaultFindHostMaxWaitTime(20);
-
const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{});
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));