diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-20 09:50:00 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-20 15:46:57 -0400 |
commit | cb72da029c5ed5605977483524f803775e3b8953 (patch) | |
tree | 4e6df112aae4ca5b7e461ecd4de12b194a40ff35 | |
parent | 8abd58df5ab9799bd5d3930d6e240928e266fad5 (diff) | |
download | mongo-cb72da029c5ed5605977483524f803775e3b8953.tar.gz |
SERVER-31281 Combine computeLatestTransactionTableRecords and fillWriterVectorsAndLastestSessionRecords
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.h | 3 |
4 files changed, 60 insertions, 106 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 7448f3373da..c0919865dc0 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1616,7 +1616,6 @@ env.Library( 'ops/write_ops_retryability.cpp', 'session.cpp', 'session_catalog.cpp', - 'session_txn_record.cpp', 'transaction_history_iterator.cpp', env.Idlc('ops/single_write_result.idl')[0], env.Idlc('session_txn_record.idl')[0], diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index f117263b219..89f4ee11448 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -87,12 +87,13 @@ namespace repl { AtomicInt32 SyncTail::replBatchLimitOperations{50 * 1000}; +namespace { + /** - * This variable determines the number of writer threads SyncTail will have. It has a default - * value, which varies based on architecture and can be overridden using the - * "replWriterThreadCount" server parameter. + * This variable determines the number of writer threads SyncTail will have. It has a default value, + * which varies based on architecture and can be overridden using the "replWriterThreadCount" server + * parameter. */ -namespace { #if defined(MONGO_PLATFORM_64) int replWriterThreadCount = 16; #elif defined(MONGO_PLATFORM_32) @@ -149,6 +150,7 @@ ServerStatusMetricField<Counter64> displayAttemptsToBecomeSecondary( // Number and time of each ApplyOps worker pool round TimerStats applyBatchStats; ServerStatusMetricField<TimerStats> displayOpBatchesApplied("repl.apply.batches", &applyBatchStats); + void initializePrefetchThread() { if (!Client::getCurrent()) { Client::initThreadIfNotAlready(); @@ -270,13 +272,16 @@ NamespaceString parseUUIDOrNs(OperationContext* opCtx, const BSONObj& o) { if (!statusWithUUID.isOK()) { return NamespaceString(o.getStringField("ns")); } - auto uuid = statusWithUUID.getValue(); + + const auto& uuid = statusWithUUID.getValue(); auto& catalog = UUIDCatalog::get(opCtx); auto nss = catalog.lookupNSSByUUID(uuid); - uassert( - ErrorCodes::NamespaceNotFound, "No namespace with UUID " + uuid.toString(), !nss.isEmpty()); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "No namespace with UUID " << uuid.toString(), + !nss.isEmpty()); return nss; } + } // namespace SyncTail::SyncTail(BackgroundSync* q, MultiSyncApplyFunc func) @@ -529,36 +534,6 @@ void scheduleWritesToOplog(OperationContext* opCtx, using SessionRecordMap = stdx::unordered_map<LogicalSessionId, SessionTxnRecord, LogicalSessionIdHash>; -// Returns a map of the "latest" transaction table records for each logical session id present in -// the given operations. Each record represents the final state of the transaction table entry for -// that session id after the operations are applied. -SessionRecordMap computeLatestTransactionTableRecords(const MultiApplier::Operations& ops) { - SessionRecordMap latestRecords; - for (const auto& op : ops) { - auto sessionInfo = op.getOperationSessionInfo(); - if (!sessionInfo.getTxnNumber()) { - continue; - } - - invariant(sessionInfo.getSessionId()); - LogicalSessionId lsid(*sessionInfo.getSessionId()); - - auto txnNumber = *sessionInfo.getTxnNumber(); - auto opTime = op.getOpTime(); - - auto it = latestRecords.find(lsid); - if (it != latestRecords.end()) { - auto record = makeSessionTxnRecord(lsid, txnNumber, opTime); - if (record > it->second) { - latestRecords[lsid] = std::move(record); - } - } else { - latestRecords.emplace(lsid, makeSessionTxnRecord(lsid, txnNumber, opTime)); - } - } - return latestRecords; -} - void scheduleTxnTableUpdates(OperationContext* opCtx, OldThreadPool* threadPool, const SessionRecordMap& latestRecords) { @@ -622,13 +597,22 @@ private: StringMap<CollectionProperties> _cache; }; -// This only modifies the isForCappedCollection field on each op. It does not alter the ops vector -// in any other way. -void fillWriterVectors(OperationContext* opCtx, - MultiApplier::Operations* ops, - std::vector<MultiApplier::OperationPtrs>* writerVectors) { - const bool supportsDocLocking = - getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); +/** + * ops - This only modifies the isForCappedCollection field on each op. It does not alter the ops + * vector in any other way. + * writerVectors - Set of operations for each worker thread to apply. + * latestSessionRecords - Populated map of the "latest" transaction table records for each logical + * session id present in the given operations. Each record represents the final state of the + * transaction table entry for that session id after the operations are applied. + */ +void fillWriterVectorsAndLastestSessionRecords( + OperationContext* opCtx, + MultiApplier::Operations* ops, + std::vector<MultiApplier::OperationPtrs>* writerVectors, + SessionRecordMap* latestSessionRecords) { + const auto serviceContext = opCtx->getServiceContext(); + const auto storageEngine = serviceContext->getGlobalStorageEngine(); + const bool supportsDocLocking = storageEngine->supportsDocLocking(); const uint32_t numWriters = writerVectors->size(); CachedCollectionProperties collPropertiesCache; @@ -660,9 +644,27 @@ void fillWriterVectors(OperationContext* opCtx, } } + const auto& sessionInfo = op.getOperationSessionInfo(); + if (sessionInfo.getTxnNumber()) { + const auto& lsid = *sessionInfo.getSessionId(); + + SessionTxnRecord record; + record.setSessionId(lsid); + record.setTxnNum(*sessionInfo.getTxnNumber()); + record.setLastWriteOpTime(op.getOpTime()); + + auto it = latestSessionRecords->find(lsid); + if (it == latestSessionRecords->end()) { + latestSessionRecords->emplace(lsid, std::move(record)); + } else if (record > it->second) { + (*latestSessionRecords)[lsid] = std::move(record); + } + } + auto& writer = (*writerVectors)[hash % numWriters]; - if (writer.empty()) - writer.reserve(8); // skip a few growth rounds. + if (writer.empty()) { + writer.reserve(8); // Skip a few growth rounds + } writer.push_back(&op); } } @@ -1413,7 +1415,8 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, return {ErrorCodes::BadValue, "invalid apply operation function"}; } - if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { + const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); + if (storageEngine->isMmapV1()) { // Use a ThreadPool to prefetch all the operations in a batch. prefetchOps(ops, workerPool); } @@ -1432,18 +1435,20 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, "attempting to replicate ops while primary"}; } - auto latestTxnRecords = computeLatestTransactionTableRecords(ops); std::vector<Status> statusVector(workerPool->getNumThreads(), Status::OK()); { // We must wait for the all work we've dispatched to complete before leaving this block - // because the spawned threads refer to objects on our stack, including writerVectors. - std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads()); + // because the spawned threads refer to objects on the stack ON_BLOCK_EXIT([&] { workerPool->join(); }); // Write batch of ops into oplog. consistencyMarkers->setOplogTruncateAfterPoint(opCtx, ops.front().getTimestamp()); scheduleWritesToOplog(opCtx, workerPool, ops); - fillWriterVectors(opCtx, &ops, &writerVectors); + + std::vector<MultiApplier::OperationPtrs> writerVectors(workerPool->getNumThreads()); + SessionRecordMap latestSessionRecords; + fillWriterVectorsAndLastestSessionRecords( + opCtx, &ops, &writerVectors, &latestSessionRecords); // Wait for writes to finish before applying ops. workerPool->join(); @@ -1456,13 +1461,15 @@ StatusWith<OpTime> multiApply(OperationContext* opCtx, workerPool->join(); // Update the transaction table to point to the latest oplog entries for each session id. - scheduleTxnTableUpdates(opCtx, workerPool, latestTxnRecords); + scheduleTxnTableUpdates(opCtx, workerPool, latestSessionRecords); + workerPool->join(); // Notify the storage engine that a replication batch has completed. // This means that all the writes associated with the oplog entries in the batch are // finished and no new writes with timestamps associated with those oplog entries will show // up in the future. - getGlobalServiceContext()->getGlobalStorageEngine()->replicationBatchIsComplete(); + const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine(); + storageEngine->replicationBatchIsComplete(); } // If any of the statuses is not ok, return error. diff --git a/src/mongo/db/session_txn_record.cpp b/src/mongo/db/session_txn_record.cpp deleted file mode 100644 index 89b627e7dce..00000000000 --- a/src/mongo/db/session_txn_record.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/bson/timestamp.h" -#include "mongo/db/logical_session_id.h" -#include "mongo/db/session_txn_record.h" - -namespace mongo { - -SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, - TxnNumber txnNum, - repl::OpTime opTime) { - SessionTxnRecord record; - - record.setSessionId(lsid); - record.setTxnNum(txnNum); - record.setLastWriteOpTime(std::move(opTime)); - - return record; -} - -} // namespace mongo diff --git a/src/mongo/db/session_txn_record.h b/src/mongo/db/session_txn_record.h index b738e9b1dc3..332f95c5ef5 100644 --- a/src/mongo/db/session_txn_record.h +++ b/src/mongo/db/session_txn_record.h @@ -28,7 +28,6 @@ #pragma once -#include "mongo/bson/timestamp.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/optime.h" #include "mongo/db/session_txn_record_gen.h" @@ -56,6 +55,4 @@ inline bool operator>(const SessionTxnRecord& lhs, const SessionTxnRecord& rhs) (lhs.getTxnNum() == rhs.getTxnNum() && lhs.getLastWriteOpTime() > rhs.getLastWriteOpTime()); } -SessionTxnRecord makeSessionTxnRecord(LogicalSessionId lsid, TxnNumber txnNum, repl::OpTime opTime); - } // namespace mongo |