summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-20 09:50:00 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-20 15:46:57 -0400
commitcb72da029c5ed5605977483524f803775e3b8953 (patch)
tree4e6df112aae4ca5b7e461ecd4de12b194a40ff35
parent8abd58df5ab9799bd5d3930d6e240928e266fad5 (diff)
downloadmongo-cb72da029c5ed5605977483524f803775e3b8953.tar.gz
SERVER-31281 Combine computeLatestTransactionTableRecords and fillWriterVectorsAndLastestSessionRecords
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/repl/sync_tail.cpp113
-rw-r--r--src/mongo/db/session_txn_record.cpp49
-rw-r--r--src/mongo/db/session_txn_record.h3
4 files changed, 60 insertions, 106 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 7448f3373da..c0919865dc0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1616,7 +1616,6 @@ env.Library(
'ops/write_ops_retryability.cpp',
'session.cpp',
'session_catalog.cpp',
- 'session_txn_record.cpp',
'transaction_history_iterator.cpp',
env.Idlc('ops/single_write_result.idl')[0],
env.Idlc('session_txn_record.idl')[0],
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index f117263b219..89f4ee11448 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -87,12 +87,13 @@ namespace repl {
AtomicInt32 SyncTail::replBatchLimitOperations{50 * 1000};
+namespace {
+
/**
- * This variable determines the number of writer threads SyncTail will have. It has a default
- * value, which varies based on architecture and can be overridden using the
- * "replWriterThreadCount" server parameter.
+ * This variable determines the number of writer threads SyncTail will have. It has a default value,
+ * which varies based on architecture and can be overridden using the "replWriterThreadCount" server
+ * parameter.
*/
-namespace {
#if defined(MONGO_PLATFORM_64)
int replWriterThreadCount = 16;
#elif defined(MONGO_PLATFORM_32)
@@ -149,6 +150,7 @@ ServerStatusMetricField<Counter64> displayAttemptsToBecomeSecondary(
// Number and time of each ApplyOps worker pool round
TimerStats applyBatchStats;
ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats);
+
void initializePrefetchThread() {
if (!Client::getCurrent()) {
Client::initThreadIfNotAlready();
@@ -270,13 +272,16 @@ NamespaceString parseUUIDOrNs(OperationContext* opCtx, const BSONObj& o) {
if (!statusWithUUID.isOK()) {
return NamespaceString(o.getStringField("ns"));
}
- auto uuid = statusWithUUID.getValue();
+
+ const auto& uuid = statusWithUUID.getValue();
auto& catalog = UUIDCatalog::get(opCtx);
auto nss = catalog.lookupNSSByUUID(uuid);
- uassert(
- ErrorCodes::NamespaceNotFound, "No namespace with UUID " + uuid.toString(), !nss.isEmpty());
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "No namespace with UUID " << uuid.toString(),
+ !nss.isEmpty());
return nss;
}
+
} // namespace
SyncTail::SyncTail(BackgroundSync* q, MultiSyncApplyFunc func)
@@ -529,36 +534,6 @@ void scheduleWritesToOplog(OperationContext* opCtx,
using SessionRecordMap =
stdx::unordered_map<LogicalSessionId, SessionTxnRecord, LogicalSessionIdHash>;
-// Returns a map of the "latest" transaction table records for each logical session id present in
-// the given operations. Each record represents the final state of the transaction table entry for
-// that session id after the operations are applied.
-SessionRecordMap computeLatestTransactionTableRecords(const MultiApplier::Operations& ops) {
- SessionRecordMap latestRecords;
- for (const auto& op : ops) {
- auto sessionInfo = op.getOperationSessionInfo();
- if (!sessionInfo.getTxnNumber()) {
- continue;
- }
-
- invariant(sessionInfo.getSessionId());
- LogicalSessionId lsid(*sessionInfo.getSessionId());
-
- auto txnNumber = *sessionInfo.getTxnNumber();
- auto opTime = op.getOpTime();
-
- auto it = latestRecords.find(lsid);
- if (it != latestRecords.end()) {
- auto record = makeSessionTxnRecord(lsid, txnNumber, opTime);
- if (record > it->second) {
- latestRecords[lsid] = std::move(record);
- }
- } else {
- latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTime));
- }
- }
- return latestRecords;
-}
-
void scheduleTxnTableUpdates(OperationContext* opCtx,
OldThreadPool* threadPool,
const SessionRecordMap& latestRecords) {
@@ -622,13 +597,22 @@ private:
StringMap<CollectionProperties> _cache;
};
-// This only modifies the isForCappedCollection field on each op. It does not alter the ops vector
-// in any other way.
-void fillWriterVectors(OperationContext* opCtx,
- MultiApplier::Operations* ops,
- std::vector<MultiApplier::OperationPtrs>* writerVectors) {
- const bool supportsDocLocking =
- getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
+/**
+ * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops
+ * vector in any other way.
+ * writerVectors - Set of operations for each worker thread to apply.
+ * latestSessionRecords - Populated map of the "latest" transaction table records for each logical
+ * session id present in the given operations. Each record represents the final state of the
+ * transaction table entry for that session id after the operations are applied.
+ */
+void fillWriterVectorsAndLastestSessionRecords(
+ OperationContext* opCtx,
+ MultiApplier::Operations* ops,
+ std::vector<MultiApplier::OperationPtrs>* writerVectors,
+ SessionRecordMap* latestSessionRecords) {
+ const auto serviceContext = opCtx->getServiceContext();
+ const auto storageEngine = serviceContext->getGlobalStorageEngine();
+ const bool supportsDocLocking = storageEngine->supportsDocLocking();
const uint32_t numWriters = writerVectors->size();
CachedCollectionProperties collPropertiesCache;
@@ -660,9 +644,27 @@ void fillWriterVectors(OperationContext* opCtx,
}
}
+ const auto& sessionInfo = op.getOperationSessionInfo();
+ if (sessionInfo.getTxnNumber()) {
+ const auto& lsid = *sessionInfo.getSessionId();
+
+ SessionTxnRecord record;
+ record.setSessionId(lsid);
+ record.setTxnNum(*sessionInfo.getTxnNumber());
+ record.setLastWriteOpTime(op.getOpTime());
+
+ auto it = latestSessionRecords->find(lsid);
+ if (it == latestSessionRecords->end()) {
+ latestSessionRecords->emplace(lsid, std::move(record));
+ } else if (record > it->second) {
+ (*latestSessionRecords)[lsid] = std::move(record);
+ }
+ }
+
auto& writer = (*writerVectors)[hash % numWriters];
- if (writer.empty())
- writer.reserve(8); // skip a few growth rounds.
+ if (writer.empty()) {
+ writer.reserve(8); // Skip a few growth rounds
+ }
writer.push_back(&op);
}
}
@@ -1413,7 +1415,8 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
return {ErrorCodes::BadValue, "invalid apply operation function"};
}
- if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
+ const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
+ if (storageEngine->isMmapV1()) {
// Use a ThreadPool to prefetch all the operations in a batch.
prefetchOps(ops, workerPool);
}
@@ -1432,18 +1435,20 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
"attempting to replicate ops while primary"};
}
- auto latestTxnRecords = computeLatestTransactionTableRecords(ops);
std::vector<Status> statusVector(workerPool->getNumThreads(), Status::OK());
{
// We must wait for the all work we've dispatched to complete before leaving this block
- // because the spawned threads refer to objects on our stack, including writerVectors.
- std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
+ // because the spawned threads refer to objects on the stack
ON_BLOCK_EXIT([&] { workerPool->join(); });
// Write batch of ops into oplog.
consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp());
scheduleWritesToOplog(opCtx, workerPool, ops);
- fillWriterVectors(opCtx, &ops, &writerVectors);
+
+ std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads());
+ SessionRecordMap latestSessionRecords;
+ fillWriterVectorsAndLastestSessionRecords(
+ opCtx, &ops, &writerVectors, &latestSessionRecords);
// Wait for writes to finish before applying ops.
workerPool->join();
@@ -1456,13 +1461,15 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx,
workerPool->join();
// Update the transaction table to point to the latest oplog entries for each session id.
- scheduleTxnTableUpdates(opCtx, workerPool, latestTxnRecords);
+ scheduleTxnTableUpdates(opCtx, workerPool, latestSessionRecords);
+ workerPool->join();
// Notify the storage engine that a replication batch has completed.
// This means that all the writes associated with the oplog entries in the batch are
// finished and no new writes with timestamps associated with those oplog entries will show
// up in the future.
- getGlobalServiceContext()->getGlobalStorageEngine()->replicationBatchIsComplete();
+ const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
+ storageEngine->replicationBatchIsComplete();
}
// If any of the statuses is not ok, return error.
diff --git a/src/mongo/db/session_txn_record.cpp b/src/mongo/db/session_txn_record.cpp
deleted file mode 100644
index 89b627e7dce..00000000000
--- a/src/mongo/db/session_txn_record.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/bson/timestamp.h"
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/session_txn_record.h"
-
-namespace mongo {
-
-SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid,
- TxnNumber txnNum,
- repl::OpTime opTime) {
- SessionTxnRecord record;
-
- record.setSessionId(lsid);
- record.setTxnNum(txnNum);
- record.setLastWriteOpTime(std::move(opTime));
-
- return record;
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/session_txn_record.h b/src/mongo/db/session_txn_record.h
index b738e9b1dc3..332f95c5ef5 100644
--- a/src/mongo/db/session_txn_record.h
+++ b/src/mongo/db/session_txn_record.h
@@ -28,7 +28,6 @@
#pragma once
-#include "mongo/bson/timestamp.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/session_txn_record_gen.h"
@@ -56,6 +55,4 @@ inline bool operator>(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs)
(lhs.getTxnNum() == rhs.getTxnNum() && lhs.getLastWriteOpTime() > rhs.getLastWriteOpTime());
}
-SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, repl::OpTime opTime);
-
} // namespace mongo