diff options
author | Sulabh Mahajan <sulabh.mahajan@mongodb.com> | 2018-08-07 10:53:58 +1000 |
---|---|---|
committer | Sulabh Mahajan <sulabh.mahajan@mongodb.com> | 2018-08-07 10:53:58 +1000 |
commit | a37441baa3d5d3dc6f70c7b0d7825f9dbbc90b3f (patch) | |
tree | 079ae902372b777a4293283f0b171f0ec1e69e5f | |
parent | f848cceb5f9ae4e369ad6577987d354361773a96 (diff) | |
download | mongo-a37441baa3d5d3dc6f70c7b0d7825f9dbbc90b3f.tar.gz |
SERVER-35473 Fix writeconflict issues with cursor restarts
(cherry picked from commit b807f350638757c8833d82fec14a90d7b3a03051)
-rw-r--r-- | etc/evergreen.yml | 8 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/compact.js | 2 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 83 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_index.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_record_store.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_recovery_unit.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_sqlite_statement.h | 11 |
9 files changed, 114 insertions, 60 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 3ade60be4e0..f0f52c8f8bb 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -4483,7 +4483,7 @@ tasks: - func: "do setup" - func: "run tests" vars: - resmoke_args: --suites=core --mongod=./mongoed --excludeWithAnyTags=requires_scripting,requires_auth,requires_sharding,does_not_support_stepdowns,requires_eval_command,requires_background_index,incompatible_with_embedded,incompatible_with_embedded_todo_investigate,requires_replication,requires_capped,requires_profiling + resmoke_args: --suites=core --mongod=./mongoed --excludeWithAnyTags=requires_scripting,requires_auth,requires_sharding,does_not_support_stepdowns,requires_eval_command,requires_background_index,incompatible_with_embedded,incompatible_with_embedded_todo_investigate,requires_replication,requires_capped,requires_profiling,requires_compact run_multiple_jobs: true - <<: *task_template @@ -12074,7 +12074,7 @@ buildvariants: # mobile storage engine. test_flags: >- --storageEngine=mobile - --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling + --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact --excludeWithAnyTags=SERVER-32709,SERVER-32869,SERVER-32993 compile_flags: >- -j$(grep -c ^processor /proc/cpuinfo) @@ -12132,7 +12132,7 @@ buildvariants: expansions: test_flags: >- --storageEngine=mobile - --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling + --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact --excludeWithAnyTags=SERVER-32709,SERVER-32869,SERVER-32993 compile_flags: >- -j$(grep -c ^processor /proc/cpuinfo) @@ -12216,7 +12216,7 @@ buildvariants: # mobile storage engine. test_flags: >- --storageEngine=mobile - --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling + --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact --excludeWithAnyTags=SERVER-32709,SERVER-32869,SERVER-32993 compile_env: DEVELOPER_DIR=/Applications/Xcode8.3.app compile_flags: >- diff --git a/jstests/concurrency/fsm_workloads/compact.js b/jstests/concurrency/fsm_workloads/compact.js index 99017b136d1..bf38ec947c2 100644 --- a/jstests/concurrency/fsm_workloads/compact.js +++ b/jstests/concurrency/fsm_workloads/compact.js @@ -11,7 +11,7 @@ * with wiredTiger LSM variants. Bypass this command for the wiredTiger LSM variant * until a fix is available for WT-2523. * - * @tags: [does_not_support_wiredtiger_lsm] + * @tags: [does_not_support_wiredtiger_lsm, requires_compact] */ load('jstests/concurrency/fsm_workload_helpers/server_types.js'); // for isEphemeral diff --git a/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js b/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js index b1991cd3b8c..07c89bfc20d 100644 --- a/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js +++ b/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js @@ -6,6 +6,8 @@ * Bulk inserts 1000 documents and builds indexes. Then alternates between compacting the * collection and verifying the number of documents and indexes. Operates on a single collection * for all threads. Uses paddingBytes as a parameter for compact. + * + * @tags: [requires_compact] */ load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 4f69f3f5545..63d9bf89373 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1138,53 +1138,62 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong); invariant(coll); - auto exec = uassertStatusOK(getExecutor( - _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN)); - - // Make sure the PlanExecutor is destroyed while holding a collection lock. - ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { - if (!ctx) { - AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS); - exec.reset(); - } - }); + // The following anonymous block makes sure to destroy the executor prior to the + // finalReduce(all) call. This is important to clear the cursors being held by the + // storage engine. + { + auto exec = uassertStatusOK(getExecutor(_opCtx, + coll, + std::move(cq), + PlanExecutor::YIELD_AUTO, + QueryPlannerParams::NO_TABLE_SCAN)); + + // Make sure the PlanExecutor is destroyed while holding a collection lock. + ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] { + if (!ctx) { + AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS); + exec.reset(); + } + }); - // iterate over all sorted objects - BSONObj o; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) { - o = o.getOwned(); // we will be accessing outside of the lock - pm.hit(); + // iterate over all sorted objects + BSONObj o; + PlanExecutor::ExecState state; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) { + o = o.getOwned(); // we will be accessing outside of the lock + pm.hit(); - if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) { - // object is same as previous, add to array - all.push_back(o); - if (pm->hits() % 100 == 0) { - _opCtx->checkForInterrupt(); + if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) { + // object is same as previous, add to array + all.push_back(o); + if (pm->hits() % 100 == 0) { + _opCtx->checkForInterrupt(); + } + continue; } - continue; - } - exec->saveState(); + exec->saveState(); - ctx.reset(); + ctx.reset(); - // reduce a finalize array - finalReduce(all); + // reduce a finalize array + finalReduce(all); - ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); + ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong)); - all.clear(); - prev = o; - all.push_back(o); + all.clear(); + prev = o; + all.push_back(o); - _opCtx->checkForInterrupt(); - uassertStatusOK(exec->restoreState()); - } + _opCtx->checkForInterrupt(); + uassertStatusOK(exec->restoreState()); + } - uassert(34428, - "Plan executor error during mapReduce command: " + WorkingSetCommon::toStatusString(o), - PlanExecutor::IS_EOF == state); + uassert(34428, + "Plan executor error during mapReduce command: " + + WorkingSetCommon::toStatusString(o), + PlanExecutor::IS_EOF == state); + } ctx.reset(); // reduce and finalize last array diff --git a/src/mongo/db/storage/mobile/mobile_index.cpp b/src/mongo/db/storage/mobile/mobile_index.cpp index 416d39a0027..bfd190cb4fd 100644 --- a/src/mongo/db/storage/mobile/mobile_index.cpp +++ b/src/mongo/db/storage/mobile/mobile_index.cpp @@ -466,7 +466,9 @@ public: // All work is done in restore(). void save() override { - _resetStatement(); + // SQLite acquires implicit locks over the snapshot this cursor is using. It is important + // to finalize the corresponding statement to release these locks. + _stmt->finalize(); } void saveUnpositioned() override { @@ -478,6 +480,12 @@ public: return; } + // Obtaining a session starts a read transaction if not done already. + MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx); + // save() finalized this cursor's SQLite statement. We need to prepare a new statement, + // before re-positioning it at the saved state. + _stmt->prepare(*session); + _startPosition.resetFromBuffer(_savedKey.getBuffer(), _savedKey.getSize()); bool isExactMatch = _doSeek(); diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp index e016b604b7e..99b2b045a42 100644 --- a/src/mongo/db/storage/mobile/mobile_record_store.cpp +++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp @@ -116,7 +116,9 @@ public: } void save() final { - _resetStatement(); + // SQLite acquires implicit locks over the snapshot this cursor is using. It is important + // to finalize the corresponding statement to release these locks. + _stmt->finalize(); } void saveUnpositioned() final { @@ -129,7 +131,12 @@ public: return true; } - _resetStatement(); + // Obtaining a session starts a read transaction if not done already. + MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx); + // save() finalized this cursor's SQLite statement. We need to prepare a new statement, + // before re-positioning it at the saved state. + _stmt->prepare(*session); + _stmt->bindInt(0, _savedId.repr()); return true; } diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp index a36a7092f9c..8bc16acb377 100644 --- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp +++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp @@ -51,13 +51,13 @@ MobileRecoveryUnit::MobileRecoveryUnit(MobileSessionPool* sessionPool) // Increment the global instance count and assign this instance an id. _id = _nextID.addAndFetch(1); - RECOVERY_UNIT_TRACE() << " Created."; + RECOVERY_UNIT_TRACE() << "Created."; } MobileRecoveryUnit::~MobileRecoveryUnit() { invariant(!_inUnitOfWork); _abort(); - RECOVERY_UNIT_TRACE() << " Destroyed."; + RECOVERY_UNIT_TRACE() << "Destroyed."; } void MobileRecoveryUnit::_commit() { @@ -94,7 +94,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { invariant(!_areWriteUnitOfWorksBanned); invariant(!_inUnitOfWork); - RECOVERY_UNIT_TRACE() << " Unit of work Active."; + RECOVERY_UNIT_TRACE() << "Unit of work Active."; if (_active) { // Confirm a write transaction is not running @@ -110,7 +110,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { void MobileRecoveryUnit::commitUnitOfWork() { invariant(_inUnitOfWork); - RECOVERY_UNIT_TRACE() << " Unit of work commited, marked inactive."; + RECOVERY_UNIT_TRACE() << "Unit of work commited, marked inactive."; _inUnitOfWork = false; _commit(); @@ -119,7 +119,7 @@ void MobileRecoveryUnit::commitUnitOfWork() { void MobileRecoveryUnit::abortUnitOfWork() { invariant(_inUnitOfWork); - RECOVERY_UNIT_TRACE() << " Unit of work aborted, marked inactive."; + RECOVERY_UNIT_TRACE() << "Unit of work aborted, marked inactive."; _inUnitOfWork = false; _abort(); @@ -140,7 +140,7 @@ void MobileRecoveryUnit::registerChange(Change* change) { } MobileSession* MobileRecoveryUnit::getSession(OperationContext* opCtx, bool readOnly) { - RECOVERY_UNIT_TRACE() << " getSession called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); + RECOVERY_UNIT_TRACE() << "getSession called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); invariant(_inUnitOfWork || readOnly); if (!_active) { @@ -160,7 +160,7 @@ void MobileRecoveryUnit::assertInActiveTxn() const { } void MobileRecoveryUnit::_ensureSession(OperationContext* opCtx) { - RECOVERY_UNIT_TRACE() << " Creating new session:" << (_session ? "NO" : "YES"); + RECOVERY_UNIT_TRACE() << "Creating new session:" << (_session ? "NO" : "YES"); if (!_session) { _session = _sessionPool->getSession(opCtx); } @@ -168,7 +168,7 @@ void MobileRecoveryUnit::_ensureSession(OperationContext* opCtx) { void MobileRecoveryUnit::_txnOpen(OperationContext* opCtx, bool readOnly) { invariant(!_active); - RECOVERY_UNIT_TRACE() << " _txnOpen called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); + RECOVERY_UNIT_TRACE() << "_txnOpen called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); _ensureSession(opCtx); /* @@ -206,7 +206,7 @@ void MobileRecoveryUnit::_txnOpen(OperationContext* opCtx, bool readOnly) { void MobileRecoveryUnit::_txnClose(bool commit) { invariant(_active); - RECOVERY_UNIT_TRACE() << " _txnClose called with " << (commit ? "commit " : "rollback "); + RECOVERY_UNIT_TRACE() << "_txnClose called with " << (commit ? "commit " : "rollback "); if (commit) { SqliteStatement::execQuery(_session.get(), "COMMIT"); diff --git a/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp b/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp index 31601a79207..e5ff02c8b97 100644 --- a/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp +++ b/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp @@ -49,15 +49,33 @@ AtomicInt64 SqliteStatement::_nextID(0); SqliteStatement::SqliteStatement(const MobileSession& session, const std::string& sqlQuery) { // Increment the global instance count and assign this instance an id. _id = _nextID.addAndFetch(1); - SQLITE_STMT_TRACE() << "Preparing: " << sqlQuery; + _sqlQuery = sqlQuery; + + prepare(session); +} + +void SqliteStatement::finalize() { + if (!_stmt) { + return; + } + SQLITE_STMT_TRACE() << "Finalize: " << _sqlQuery; + + int status = sqlite3_finalize(_stmt); + fassert(37053, status == _exceptionStatus); + _stmt = NULL; +} + +void SqliteStatement::prepare(const MobileSession& session) { + SQLITE_STMT_TRACE() << "Preparing: " << _sqlQuery; + int status = sqlite3_prepare_v2( - session.getSession(), sqlQuery.c_str(), sqlQuery.length() + 1, &_stmt, NULL); + session.getSession(), _sqlQuery.c_str(), _sqlQuery.length() + 1, &_stmt, NULL); if (status == SQLITE_BUSY) { SQLITE_STMT_TRACE() << "Throwing writeConflictException, " - << "SQLITE_BUSY while preparing: " << sqlQuery; + << "SQLITE_BUSY while preparing: " << _sqlQuery; throw WriteConflictException(); } else if (status != SQLITE_OK) { - SQLITE_STMT_TRACE() << "Error while preparing: " << sqlQuery; + SQLITE_STMT_TRACE() << "Error while preparing: " << _sqlQuery; std::string errMsg = "sqlite3_prepare_v2 failed: "; errMsg += sqlite3_errstr(status); uasserted(ErrorCodes::UnknownError, errMsg); @@ -65,8 +83,7 @@ SqliteStatement::SqliteStatement(const MobileSession& session, const std::string } SqliteStatement::~SqliteStatement() { - int status = sqlite3_finalize(_stmt); - fassert(37053, status == _exceptionStatus); + finalize(); } void SqliteStatement::bindInt(int paramIndex, int64_t intValue) { diff --git a/src/mongo/db/storage/mobile/mobile_sqlite_statement.h b/src/mongo/db/storage/mobile/mobile_sqlite_statement.h index 095e4c94205..096d9978a12 100644 --- a/src/mongo/db/storage/mobile/mobile_sqlite_statement.h +++ b/src/mongo/db/storage/mobile/mobile_sqlite_statement.h @@ -116,11 +116,22 @@ public: */ static void execQuery(MobileSession* session, const std::string& query); + /** + * Finalizes a prepared statement. + */ + void finalize(); + + /** + * Prepare a statement with the given mobile session. + */ + void prepare(const MobileSession& session); + uint64_t _id; private: static AtomicInt64 _nextID; sqlite3_stmt* _stmt; + std::string _sqlQuery; // If the most recent call to sqlite3_step on this statement returned an error, the error is // returned again when the statement is finalized. This is used to verify that the last error |