diff options
author | Sulabh Mahajan <sulabh.mahajan@mongodb.com> | 2018-07-29 16:48:48 +1000 |
---|---|---|
committer | Sulabh Mahajan <sulabh.mahajan@mongodb.com> | 2018-07-29 16:48:48 +1000 |
commit | b807f350638757c8833d82fec14a90d7b3a03051 (patch) | |
tree | a91eeb824d4d90a578bf05af5103c2c29389246c /src/mongo/db/storage/mobile | |
parent | afca554216dfa32cbf92374a828451cdb1e04b8a (diff) | |
download | mongo-b807f350638757c8833d82fec14a90d7b3a03051.tar.gz |
SERVER-35473 Fix writeconflict issues with cursor restarts
Diffstat (limited to 'src/mongo/db/storage/mobile')
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_index.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_record_store.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_recovery_unit.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/storage/mobile/mobile_sqlite_statement.h | 11 |
5 files changed, 61 insertions, 18 deletions
diff --git a/src/mongo/db/storage/mobile/mobile_index.cpp b/src/mongo/db/storage/mobile/mobile_index.cpp index 416d39a0027..bfd190cb4fd 100644 --- a/src/mongo/db/storage/mobile/mobile_index.cpp +++ b/src/mongo/db/storage/mobile/mobile_index.cpp @@ -466,7 +466,9 @@ public: // All work is done in restore(). void save() override { - _resetStatement(); + // SQLite acquires implicit locks over the snapshot this cursor is using. It is important + // to finalize the corresponding statement to release these locks. + _stmt->finalize(); } void saveUnpositioned() override { @@ -478,6 +480,12 @@ public: return; } + // Obtaining a session starts a read transaction if not done already. + MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx); + // save() finalized this cursor's SQLite statement. We need to prepare a new statement, + // before re-positioning it at the saved state. + _stmt->prepare(*session); + _startPosition.resetFromBuffer(_savedKey.getBuffer(), _savedKey.getSize()); bool isExactMatch = _doSeek(); diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp index fd307aa790a..0111abb67e3 100644 --- a/src/mongo/db/storage/mobile/mobile_record_store.cpp +++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp @@ -116,7 +116,9 @@ public: } void save() final { - _resetStatement(); + // SQLite acquires implicit locks over the snapshot this cursor is using. It is important + // to finalize the corresponding statement to release these locks. + _stmt->finalize(); } void saveUnpositioned() final { @@ -129,7 +131,12 @@ public: return true; } - _resetStatement(); + // Obtaining a session starts a read transaction if not done already. + MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx); + // save() finalized this cursor's SQLite statement. We need to prepare a new statement, + // before re-positioning it at the saved state. + _stmt->prepare(*session); + _stmt->bindInt(0, _savedId.repr()); return true; } diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp index a36a7092f9c..8bc16acb377 100644 --- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp +++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp @@ -51,13 +51,13 @@ MobileRecoveryUnit::MobileRecoveryUnit(MobileSessionPool* sessionPool) // Increment the global instance count and assign this instance an id. _id = _nextID.addAndFetch(1); - RECOVERY_UNIT_TRACE() << " Created."; + RECOVERY_UNIT_TRACE() << "Created."; } MobileRecoveryUnit::~MobileRecoveryUnit() { invariant(!_inUnitOfWork); _abort(); - RECOVERY_UNIT_TRACE() << " Destroyed."; + RECOVERY_UNIT_TRACE() << "Destroyed."; } void MobileRecoveryUnit::_commit() { @@ -94,7 +94,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { invariant(!_areWriteUnitOfWorksBanned); invariant(!_inUnitOfWork); - RECOVERY_UNIT_TRACE() << " Unit of work Active."; + RECOVERY_UNIT_TRACE() << "Unit of work Active."; if (_active) { // Confirm a write transaction is not running @@ -110,7 +110,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { void MobileRecoveryUnit::commitUnitOfWork() { invariant(_inUnitOfWork); - RECOVERY_UNIT_TRACE() << " Unit of work commited, marked inactive."; + RECOVERY_UNIT_TRACE() << "Unit of work commited, marked inactive."; _inUnitOfWork = false; _commit(); @@ -119,7 +119,7 @@ void MobileRecoveryUnit::commitUnitOfWork() { void MobileRecoveryUnit::abortUnitOfWork() { invariant(_inUnitOfWork); - RECOVERY_UNIT_TRACE() << " Unit of work aborted, marked inactive."; + RECOVERY_UNIT_TRACE() << "Unit of work aborted, marked inactive."; _inUnitOfWork = false; _abort(); @@ -140,7 +140,7 @@ void MobileRecoveryUnit::registerChange(Change* change) { } MobileSession* MobileRecoveryUnit::getSession(OperationContext* opCtx, bool readOnly) { - RECOVERY_UNIT_TRACE() << " getSession called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); + RECOVERY_UNIT_TRACE() << "getSession called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); invariant(_inUnitOfWork || readOnly); if (!_active) { @@ -160,7 +160,7 @@ void MobileRecoveryUnit::assertInActiveTxn() const { } void MobileRecoveryUnit::_ensureSession(OperationContext* opCtx) { - RECOVERY_UNIT_TRACE() << " Creating new session:" << (_session ? "NO" : "YES"); + RECOVERY_UNIT_TRACE() << "Creating new session:" << (_session ? "NO" : "YES"); if (!_session) { _session = _sessionPool->getSession(opCtx); } @@ -168,7 +168,7 @@ void MobileRecoveryUnit::_ensureSession(OperationContext* opCtx) { void MobileRecoveryUnit::_txnOpen(OperationContext* opCtx, bool readOnly) { invariant(!_active); - RECOVERY_UNIT_TRACE() << " _txnOpen called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); + RECOVERY_UNIT_TRACE() << "_txnOpen called with readOnly:" << (readOnly ? "TRUE" : "FALSE"); _ensureSession(opCtx); /* @@ -206,7 +206,7 @@ void MobileRecoveryUnit::_txnOpen(OperationContext* opCtx, bool readOnly) { void MobileRecoveryUnit::_txnClose(bool commit) { invariant(_active); - RECOVERY_UNIT_TRACE() << " _txnClose called with " << (commit ? "commit " : "rollback "); + RECOVERY_UNIT_TRACE() << "_txnClose called with " << (commit ? "commit " : "rollback "); if (commit) { SqliteStatement::execQuery(_session.get(), "COMMIT"); diff --git a/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp b/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp index 31601a79207..e5ff02c8b97 100644 --- a/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp +++ b/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp @@ -49,15 +49,33 @@ AtomicInt64 SqliteStatement::_nextID(0); SqliteStatement::SqliteStatement(const MobileSession& session, const std::string& sqlQuery) { // Increment the global instance count and assign this instance an id. _id = _nextID.addAndFetch(1); - SQLITE_STMT_TRACE() << "Preparing: " << sqlQuery; + _sqlQuery = sqlQuery; + + prepare(session); +} + +void SqliteStatement::finalize() { + if (!_stmt) { + return; + } + SQLITE_STMT_TRACE() << "Finalize: " << _sqlQuery; + + int status = sqlite3_finalize(_stmt); + fassert(37053, status == _exceptionStatus); + _stmt = NULL; +} + +void SqliteStatement::prepare(const MobileSession& session) { + SQLITE_STMT_TRACE() << "Preparing: " << _sqlQuery; + int status = sqlite3_prepare_v2( - session.getSession(), sqlQuery.c_str(), sqlQuery.length() + 1, &_stmt, NULL); + session.getSession(), _sqlQuery.c_str(), _sqlQuery.length() + 1, &_stmt, NULL); if (status == SQLITE_BUSY) { SQLITE_STMT_TRACE() << "Throwing writeConflictException, " - << "SQLITE_BUSY while preparing: " << sqlQuery; + << "SQLITE_BUSY while preparing: " << _sqlQuery; throw WriteConflictException(); } else if (status != SQLITE_OK) { - SQLITE_STMT_TRACE() << "Error while preparing: " << sqlQuery; + SQLITE_STMT_TRACE() << "Error while preparing: " << _sqlQuery; std::string errMsg = "sqlite3_prepare_v2 failed: "; errMsg += sqlite3_errstr(status); uasserted(ErrorCodes::UnknownError, errMsg); @@ -65,8 +83,7 @@ SqliteStatement::SqliteStatement(const MobileSession& session, const std::string } SqliteStatement::~SqliteStatement() { - int status = sqlite3_finalize(_stmt); - fassert(37053, status == _exceptionStatus); + finalize(); } void SqliteStatement::bindInt(int paramIndex, int64_t intValue) { diff --git a/src/mongo/db/storage/mobile/mobile_sqlite_statement.h b/src/mongo/db/storage/mobile/mobile_sqlite_statement.h index 095e4c94205..096d9978a12 100644 --- a/src/mongo/db/storage/mobile/mobile_sqlite_statement.h +++ b/src/mongo/db/storage/mobile/mobile_sqlite_statement.h @@ -116,11 +116,22 @@ public: */ static void execQuery(MobileSession* session, const std::string& query); + /** + * Finalizes a prepared statement. + */ + void finalize(); + + /** + * Prepare a statement with the given mobile session. + */ + void prepare(const MobileSession& session); + uint64_t _id; private: static AtomicInt64 _nextID; sqlite3_stmt* _stmt; + std::string _sqlQuery; // If the most recent call to sqlite3_step on this statement returned an error, the error is // returned again when the statement is finalized. This is used to verify that the last error |