summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaria van Keulen <maria.vankeulen@mongodb.com>2020-01-14 21:32:54 +0000
committerevergreen <evergreen@mongodb.com>2020-01-14 21:32:54 +0000
commit846c7aa84ac08dbccc7d727e9068406b7b2de033 (patch)
treeb1b1b2c3e8ca99eb9b594e9ab1724d116eead4f3
parentc04fde3eca954d9bf9243d544484a90997ba9373 (diff)
downloadmongo-846c7aa84ac08dbccc7d727e9068406b7b2de033.tar.gz
SERVER-44852 Handle transactions with commands during oplog application
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn.yml21
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn_ubsan.yml21
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_large_txns_format_jscore_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_txns_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_collections_causally_consistent_jscore_txns_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_jscore_txns.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_jscore_txns_sharded_collections.yml3
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp66
-rw-r--r--src/mongo/db/repl/oplog_batcher.cpp29
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp22
-rw-r--r--src/mongo/db/repl/oplog_entry.h6
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp39
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h16
22 files changed, 149 insertions, 116 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn.yml b/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn.yml
index a6d0f64a8e2..4906056a9b7 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn.yml
@@ -29,27 +29,6 @@ selector:
# Extracts error code from write error, which is obscured by runInsideTransaction.
- jstests/concurrency/fsm_workloads/access_collection_in_transaction_after_catalog_changes.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/concurrency/fsm_workloads/create_database.js
- - jstests/concurrency/fsm_workloads/create_capped_collection.js
- - jstests/concurrency/fsm_workloads/create_capped_collection_maxdocs.js
- - jstests/concurrency/fsm_workloads/create_collection.js
- - jstests/concurrency/fsm_workloads/drop_collection.js
- - jstests/concurrency/fsm_workloads/drop_database.js
- - jstests/concurrency/fsm_workloads/indexed_insert_base_capped.js
- - jstests/concurrency/fsm_workloads/map_reduce_reduce.js
- - jstests/concurrency/fsm_workloads/map_reduce_replace.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_chain.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_dbname_chain.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_dbname_droptarget.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_droptarget.js
- - jstests/concurrency/fsm_workloads/rename_collection_chain.js
- - jstests/concurrency/fsm_workloads/rename_collection_dbname_chain.js
- - jstests/concurrency/fsm_workloads/rename_collection_dbname_droptarget.js
- - jstests/concurrency/fsm_workloads/rename_collection_droptarget.js
- - jstests/concurrency/fsm_workloads/update_inc_capped.js
-
exclude_with_any_tags:
- does_not_support_causal_consistency
- requires_sharding
diff --git a/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn_ubsan.yml b/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn_ubsan.yml
index e818c9cb040..dc847ce498d 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn_ubsan.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_replication_multi_stmt_txn_ubsan.yml
@@ -29,27 +29,6 @@ selector:
# Extracts error code from write error, which is obscured by runInsideTransaction.
- jstests/concurrency/fsm_workloads/access_collection_in_transaction_after_catalog_changes.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/concurrency/fsm_workloads/create_database.js
- - jstests/concurrency/fsm_workloads/create_capped_collection.js
- - jstests/concurrency/fsm_workloads/create_capped_collection_maxdocs.js
- - jstests/concurrency/fsm_workloads/create_collection.js
- - jstests/concurrency/fsm_workloads/drop_collection.js
- - jstests/concurrency/fsm_workloads/drop_database.js
- - jstests/concurrency/fsm_workloads/indexed_insert_base_capped.js
- - jstests/concurrency/fsm_workloads/map_reduce_reduce.js
- - jstests/concurrency/fsm_workloads/map_reduce_replace.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_chain.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_dbname_chain.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_dbname_droptarget.js
- - jstests/concurrency/fsm_workloads/rename_capped_collection_droptarget.js
- - jstests/concurrency/fsm_workloads/rename_collection_chain.js
- - jstests/concurrency/fsm_workloads/rename_collection_dbname_chain.js
- - jstests/concurrency/fsm_workloads/rename_collection_dbname_droptarget.js
- - jstests/concurrency/fsm_workloads/rename_collection_droptarget.js
- - jstests/concurrency/fsm_workloads/update_inc_capped.js
-
exclude_with_any_tags:
- does_not_support_causal_consistency
- requires_sharding
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml
index 85fcca785f2..5a57622d8de 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml
@@ -24,9 +24,6 @@ selector:
# time, the logical session cache refresh thread will flush these sessions to disk, creating more
# opLog entries. To avoid this infinite loop, we will blacklist the test from this suite.
- jstests/core/awaitdata_getmore_cmd.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml
index 12b11fade4c..adb75e752fe 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml
@@ -24,9 +24,6 @@ selector:
# time, the logical session cache refresh thread will flush these sessions to disk, creating more
# opLog entries. To avoid this infinite loop, we will blacklist the test from this suite.
- jstests/core/awaitdata_getmore_cmd.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml
index 1cf5eed6cda..88631341ded 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml
@@ -24,9 +24,6 @@ selector:
# time, the logical session cache refresh thread will flush these sessions to disk, creating more
# opLog entries. To avoid this infinite loop, we will blacklist the test from this suite.
- jstests/core/awaitdata_getmore_cmd.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml
index c398bdd7e06..af137e2c3ca 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml
@@ -24,9 +24,6 @@ selector:
# time, the logical session cache refresh thread will flush these sessions to disk, creating more
# opLog entries. To prevent this infinite loop, we will blacklist the test from this suite.
- jstests/core/awaitdata_getmore_cmd.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
index 19f89a09a42..c68ae4dcb48 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
@@ -36,9 +36,6 @@ selector:
- jstests/core/txns/transactions_profiling.js
# The downstream syncing node affects the top output.
- jstests/core/top.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
run_hook_interval: &run_hook_interval 20
executor:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
index 73b1371baa0..388ef3feb0b 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
@@ -11,9 +11,6 @@ selector:
- jstests/core/capped_update.js
# Having duplicate namespaces is not supported and will cause initial sync to fail.
- jstests/core/views/duplicate_ns.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
run_hook_interval: &run_hook_interval 20
executor:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
index 86fa311169f..107ce408367 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
@@ -19,9 +19,6 @@ selector:
# command multiple times, which may observe the change to the "transactionLifetimeLimitSeconds"
# server parameter.
- jstests/core/set_param1.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
index 03ceae267bc..c3e07f9ade0 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
@@ -24,9 +24,6 @@ selector:
- jstests/core/sortb.js
- jstests/core/sortg.js
- jstests/core/sortj.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_large_txns_format_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_large_txns_format_jscore_passthrough.yml
index 3fe95d2df86..00a177cc35e 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_large_txns_format_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_large_txns_format_jscore_passthrough.yml
@@ -12,9 +12,6 @@ selector:
- jstests/core/txns/abort_expired_transaction.js
- jstests/core/txns/abort_transaction_thread_does_not_block_on_locks.js
- jstests/core/txns/kill_op_on_txn_expiry.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
- - jstests/core/txns/create_collection.js
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_txns_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_txns_passthrough.yml
index fc7e289051b..0951c6549a7 100644
--- a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_txns_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_txns_passthrough.yml
@@ -14,7 +14,7 @@ selector:
# No featureCompatibilityVersion parameter on mongos.
- jstests/core/txns/abort_unprepared_transactions_on_FCV_downgrade.js
- # TODO(SERVER-44852) Implicitly creates a database through a collection rename, which does not
+ # Implicitly creates a database through a collection rename, which does not
# work in a sharded cluster.
- jstests/core/txns/transactions_block_ddl.js
@@ -30,8 +30,7 @@ selector:
# attached to statements in a transaction beyond the first one.
- jstests/core/txns/non_transactional_operations_on_session_with_transaction.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
+ # TODO(SERVER-45368) re-enable once collection creation is permitted in cross-shard transactions.
- jstests/core/txns/create_collection.js
exclude_with_any_tags:
diff --git a/buildscripts/resmokeconfig/suites/sharded_collections_causally_consistent_jscore_txns_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_collections_causally_consistent_jscore_txns_passthrough.yml
index 81ca83d2241..41441209150 100644
--- a/buildscripts/resmokeconfig/suites/sharded_collections_causally_consistent_jscore_txns_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_collections_causally_consistent_jscore_txns_passthrough.yml
@@ -29,8 +29,7 @@ selector:
# attached to statements in a transaction beyond the first one.
- jstests/core/txns/non_transactional_operations_on_session_with_transaction.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
+ # TODO(SERVER-45368) re-enable once collection creation is permitted in cross-shard transactions.
- jstests/core/txns/create_collection.js
exclude_with_any_tags:
diff --git a/buildscripts/resmokeconfig/suites/sharded_jscore_txns.yml b/buildscripts/resmokeconfig/suites/sharded_jscore_txns.yml
index c19f6f442c8..6ef0bc600f9 100644
--- a/buildscripts/resmokeconfig/suites/sharded_jscore_txns.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_jscore_txns.yml
@@ -26,10 +26,10 @@ selector:
# Uses hangAfterCollectionInserts failpoint not available on mongos.
- jstests/core/txns/speculative_snapshot_includes_all_writes.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
+ # TODO(SERVER-45368) re-enable once collection creation is permitted in cross-shard transactions.
- jstests/core/txns/create_collection.js
+
exclude_with_any_tags:
- assumes_against_mongod_not_mongos
# Transactions are not allowed to operate on capped collections.
diff --git a/buildscripts/resmokeconfig/suites/sharded_jscore_txns_sharded_collections.yml b/buildscripts/resmokeconfig/suites/sharded_jscore_txns_sharded_collections.yml
index 77667a4a8a1..b1334268870 100644
--- a/buildscripts/resmokeconfig/suites/sharded_jscore_txns_sharded_collections.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_jscore_txns_sharded_collections.yml
@@ -25,8 +25,7 @@ selector:
# View tests aren't expected to work when collections are implicitly sharded.
- jstests/core/txns/view_reads_in_transaction.js
- # TODO(SERVER-44852) re-enable once secondary oplog application supports collection creation
- # inside multi-document transactions.
+ # TODO(SERVER-45368) re-enable once collection creation is permitted in cross-shard transactions.
- jstests/core/txns/create_collection.js
exclude_with_any_tags:
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c33664e29be..35882e4acd3 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -491,6 +491,9 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/collection_options',
'$BUILD_DIR/mongo/idl/idl_parser',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/namespace_string',
+ ],
)
env.Library(
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index ab42b32502a..c0a9f7684d1 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -220,21 +220,54 @@ void addToWriterVector(OplogEntry* op,
/**
* Adds a set of derivedOps to writerVectors.
+ * If `serial` is true, assign all derived operations to the writer vector corresponding to the hash
+ * of the first operation in `derivedOps`.
*/
void addDerivedOps(OperationContext* opCtx,
std::vector<OplogEntry>* derivedOps,
std::vector<std::vector<const OplogEntry*>>* writerVectors,
- CachedCollectionProperties* collPropertiesCache) {
+ CachedCollectionProperties* collPropertiesCache,
+ bool serial) {
+
+ boost::optional<uint32_t>
+ serialWriterId; // Used to determine which writer vector to assign serial ops.
+
for (auto&& op : *derivedOps) {
auto hashedNs = StringMapHasher().hashed_key(op.getNss().ns());
uint32_t hash = static_cast<uint32_t>(hashedNs.hash());
+ if (!serialWriterId && serial) {
+ serialWriterId.emplace(hash);
+ }
if (op.isCrudOpType()) {
processCrudOp(opCtx, &op, &hash, &hashedNs, collPropertiesCache);
}
- addToWriterVector(&op, writerVectors, hash);
+ if (serial) {
+ // Serial derived ops go to the writer vector corresponding to the first op of
+ // derivedOps.
+ addToWriterVector(&op, writerVectors, serialWriterId.get());
+ } else {
+ addToWriterVector(&op, writerVectors, hash);
+ }
}
}
+void _addOplogChainOpsToWriterVectors(OperationContext* opCtx,
+ std::vector<OplogEntry*>* partialTxnList,
+ std::vector<std::vector<OplogEntry>>* derivedOps,
+ OplogEntry* op,
+ CachedCollectionProperties* collPropertiesCache,
+ std::vector<std::vector<const OplogEntry*>>* writerVectors) {
+ std::vector<OplogEntry> txnOps;
+ bool shouldSerialize = false;
+ std::tie(txnOps, shouldSerialize) =
+ readTransactionOperationsFromOplogChainAndCheckForCommands(opCtx, *op, *partialTxnList);
+ derivedOps->emplace_back(txnOps);
+ partialTxnList->clear();
+
+ // Transaction entries cannot have different session updates.
+ addDerivedOps(opCtx, &derivedOps->back(), writerVectors, collPropertiesCache, shouldSerialize);
+}
+
void stableSortByNamespace(std::vector<const OplogEntry*>* oplogEntryPointers) {
auto nssComparator = [](const OplogEntry* l, const OplogEntry* r) {
return l->getNss() < r->getNss();
@@ -759,7 +792,11 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache);
+ addDerivedOps(opCtx,
+ &derivedOps->back(),
+ writerVectors,
+ &collPropertiesCache,
+ false /*serial*/);
}
}
@@ -797,13 +834,8 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
// oplog and fill writers with those operations.
// Flush partialTxnList operations for current transaction.
auto& partialTxnList = partialTxnOps[*logicalSessionId];
-
- derivedOps->emplace_back(
- readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
- partialTxnList.clear();
-
- // Transaction entries cannot have different session updates.
- addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache);
+ _addOplogChainOpsToWriterVectors(
+ opCtx, &partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
@@ -811,7 +843,11 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
derivedOps->emplace_back(ApplyOps::extractOperations(op));
// Nested entries cannot have different session updates.
- addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache);
+ addDerivedOps(opCtx,
+ &derivedOps->back(),
+ writerVectors,
+ &collPropertiesCache,
+ false /*serial*/);
}
continue;
}
@@ -822,12 +858,8 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
if (op.isPreparedCommit() && (getOptions().mode == OplogApplication::Mode::kInitialSync)) {
auto logicalSessionId = op.getSessionId();
auto& partialTxnList = partialTxnOps[*logicalSessionId];
-
- derivedOps->emplace_back(
- readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
- partialTxnList.clear();
-
- addDerivedOps(opCtx, &derivedOps->back(), writerVectors, &collPropertiesCache);
+ _addOplogChainOpsToWriterVectors(
+ opCtx, &partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors);
continue;
}
diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp
index f6389b62bd9..da2f7bf049f 100644
--- a/src/mongo/db/repl/oplog_batcher.cpp
+++ b/src/mongo/db/repl/oplog_batcher.cpp
@@ -103,22 +103,16 @@ bool isUnpreparedCommit(const OplogEntry& entry) {
}
/**
- * Returns whether an oplog entry represents an applyOps which doesn't imply prepare.
- * It could be a partial transaction oplog entry, an implicit commit applyOps or an applyOps outside
- * of transaction.
- */
-bool isUnpreparedApplyOps(const OplogEntry& entry) {
- return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare();
-}
-
-/**
* Returns true if this oplog entry must be processed in its own batch and cannot be grouped with
* other entries.
*
- * Commands must be processed one at a time. The exceptions to this are unprepared applyOps, because
- * applyOps oplog entries are effectively containers for CRUD operations, and unprepared
- * commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to batch
- * applyOps commands with CRUD operations when reading from the oplog buffer.
+ * Commands, in most cases, must be processed one at a time. The exceptions to this rule are
+ * unprepared applyOps and unprepared commitTransaction for transactions that only contain CRUD
+ * operations. These two cases expand to CRUD operations, which can be safely batched with other
+ * CRUD operations. All other command oplog entries, including unprepared applyOps/commitTransaction
+ * for transactions that contain commands, must be processed in their own batch.
+ * Note that 'unprepared applyOps' could mean a partial transaction oplog entry, an implicit commit
+ * applyOps oplog entry, or an atomic applyOps oplog entry outside of a transaction.
*
* Oplog entries on 'system.views' should also be processed one at a time. View catalog immediately
* reflects changes for each oplog entry so we can see inconsistent view catalog if multiple oplog
@@ -129,12 +123,13 @@ bool isUnpreparedApplyOps(const OplogEntry& entry) {
*/
bool mustProcessIndividually(const OplogEntry& entry) {
if (entry.isCommand()) {
- if (isUnpreparedCommit(entry)) {
- return false;
- } else if (isUnpreparedApplyOps(entry)) {
+ if (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare() ||
+ entry.isTransactionWithCommand()) {
+ return true;
+ } else {
+ // This branch covers unprepared CRUD applyOps and unprepared CRUD commits.
return false;
}
- return true;
} else if (entry.getNss().isSystemDotViews()) {
return true;
} else if (entry.getNss().isServerConfigurationCollection()) {
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 787e54f0384..e38732f5df0 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -336,6 +336,28 @@ bool OplogEntry::shouldPrepare() const {
getObject()[ApplyOpsCommandInfoBase::kPrepareFieldName].booleanSafe();
}
+bool OplogEntry::isTransactionWithCommand() const {
+ auto applyOps = getObject().getField("applyOps");
+ if (applyOps.eoo()) {
+ return false;
+ }
+ if (!getTxnNumber() || !getSessionId()) {
+ // Only transactions can produce applyOps oplog entries with transaction numbers and
+ // session IDs.
+ return false;
+ }
+ // Iterating through the entire applyOps array is not optimal for performance. A potential
+ // optimization, if necessary, could be to ensure the primary always constructs applyOps oplog
+ // entries with commands at the beginning.
+ for (BSONElement e : applyOps.Array()) {
+ auto ns = e.Obj().getField("ns");
+ if (!ns.eoo() && NamespaceString(ns.String()).isCommand()) {
+ return true;
+ }
+ }
+ return false;
+}
+
BSONElement OplogEntry::getIdElement() const {
invariant(isCrudOpType());
if (getOpType() == OpTypeEnum::kUpdate) {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index b1678b37267..4181a30c1ad 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -312,6 +312,12 @@ public:
}
/**
+ * Returns whether the oplog entry represents an applyOps with a commnd inside. This will occur
+ * if a multi-document transaction performs a command.
+ */
+ bool isTransactionWithCommand() const;
+
+ /**
* Returns if the oplog entry is for a CRUD operation.
*/
static bool isCrudOpType(OpTypeEnum opType);
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index 986c5664727..2e31f9dc0e5 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -64,8 +64,9 @@ Status _applyOperationsForTransaction(OperationContext* opCtx,
// Apply each the operations via repl::applyOperation.
for (const auto& op : ops) {
try {
+ Status status = Status::OK();
AutoGetCollection coll(opCtx, op.getNss(), MODE_IX);
- auto status = repl::applyOperation_inlock(
+ status = repl::applyOperation_inlock(
opCtx, coll.getDb(), &op, false /*alwaysUpsert*/, oplogApplicationMode);
if (!status.isOK()) {
return status;
@@ -228,10 +229,12 @@ Status applyAbortTransaction(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-std::vector<OplogEntry> readTransactionOperationsFromOplogChain(
+std::pair<std::vector<OplogEntry>, bool> _readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const OplogEntry& lastEntryInTxn,
- const std::vector<OplogEntry*>& cachedOps) noexcept {
+ const std::vector<OplogEntry*>& cachedOps,
+ const bool checkForCommands) noexcept {
+ bool isTransactionWithCommand = false;
// Traverse the oplog chain with its own snapshot and read timestamp.
ReadSourceScope readSourceScope(opCtx);
@@ -293,7 +296,35 @@ std::vector<OplogEntry> readTransactionOperationsFromOplogChain(
// Reconstruct the operations from the prepare or unprepared commit oplog entry.
repl::ApplyOps::extractOperationsTo(prepareOrUnpreparedCommit, lastEntryInTxnObj, &ops);
- return ops;
+
+ // It is safe to assume that any commands inside `ops` are real commands to be applied, as
+ // opposed to auxiliary commands such as "commit" and "abort".
+ if (checkForCommands) {
+ for (auto&& op : ops) {
+ if (op.isCommand()) {
+ isTransactionWithCommand = true;
+ break;
+ }
+ }
+ }
+ return std::make_pair(ops, isTransactionWithCommand);
+}
+
+std::vector<OplogEntry> readTransactionOperationsFromOplogChain(
+ OperationContext* opCtx,
+ const OplogEntry& lastEntryInTxn,
+ const std::vector<OplogEntry*>& cachedOps) noexcept {
+ auto result = _readTransactionOperationsFromOplogChain(
+ opCtx, lastEntryInTxn, cachedOps, false /*checkForCommands*/);
+ return std::get<0>(result);
+}
+
+std::pair<std::vector<OplogEntry>, bool> readTransactionOperationsFromOplogChainAndCheckForCommands(
+ OperationContext* opCtx,
+ const OplogEntry& lastEntryInTxn,
+ const std::vector<OplogEntry*>& cachedOps) noexcept {
+ return _readTransactionOperationsFromOplogChain(
+ opCtx, lastEntryInTxn, cachedOps, true /*checkForCommands*/);
}
namespace {
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index add871d6dac..6b108447b00 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -66,6 +66,22 @@ std::vector<repl::OplogEntry> readTransactionOperationsFromOplogChain(
const std::vector<repl::OplogEntry*>& cachedOps) noexcept;
/**
+ * Like readTransactionOperationsFromOplogChain, but also returns a boolean representing whether at
+ * least one of the transaction operations is a command.
+ */
+std::pair<std::vector<repl::OplogEntry>, bool>
+readTransactionOperationsFromOplogChainAndCheckForCommands(
+ OperationContext* opCtx,
+ const repl::OplogEntry& lastEntryInTxn,
+ const std::vector<repl::OplogEntry*>& cachedOps) noexcept;
+
+std::pair<std::vector<repl::OplogEntry>, bool> _readTransactionOperationsFromOplogChain(
+ OperationContext* opCtx,
+ const repl::OplogEntry& lastEntryInTxn,
+ const std::vector<repl::OplogEntry*>& cachedOps,
+ const bool checkForCommands) noexcept;
+
+/**
* Apply `prepareTransaction` oplog entry.
*/
Status applyPrepareTransaction(OperationContext* opCtx,