summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSulabh Mahajan <sulabh.mahajan@mongodb.com>2018-08-07 10:53:58 +1000
committerSulabh Mahajan <sulabh.mahajan@mongodb.com>2018-08-07 10:53:58 +1000
commita37441baa3d5d3dc6f70c7b0d7825f9dbbc90b3f (patch)
tree079ae902372b777a4293283f0b171f0ec1e69e5f
parentf848cceb5f9ae4e369ad6577987d354361773a96 (diff)
downloadmongo-a37441baa3d5d3dc6f70c7b0d7825f9dbbc90b3f.tar.gz
SERVER-35473 Fix writeconflict issues with cursor restarts
(cherry picked from commit b807f350638757c8833d82fec14a90d7b3a03051)
-rw-r--r--etc/evergreen.yml8
-rw-r--r--jstests/concurrency/fsm_workloads/compact.js2
-rw-r--r--jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js2
-rw-r--r--src/mongo/db/commands/mr.cpp83
-rw-r--r--src/mongo/db/storage/mobile/mobile_index.cpp10
-rw-r--r--src/mongo/db/storage/mobile/mobile_record_store.cpp11
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.cpp18
-rw-r--r--src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp29
-rw-r--r--src/mongo/db/storage/mobile/mobile_sqlite_statement.h11
9 files changed, 114 insertions, 60 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 3ade60be4e0..f0f52c8f8bb 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -4483,7 +4483,7 @@ tasks:
- func: "do setup"
- func: "run tests"
vars:
- resmoke_args: --suites=core --mongod=./mongoed --excludeWithAnyTags=requires_scripting,requires_auth,requires_sharding,does_not_support_stepdowns,requires_eval_command,requires_background_index,incompatible_with_embedded,incompatible_with_embedded_todo_investigate,requires_replication,requires_capped,requires_profiling
+ resmoke_args: --suites=core --mongod=./mongoed --excludeWithAnyTags=requires_scripting,requires_auth,requires_sharding,does_not_support_stepdowns,requires_eval_command,requires_background_index,incompatible_with_embedded,incompatible_with_embedded_todo_investigate,requires_replication,requires_capped,requires_profiling,requires_compact
run_multiple_jobs: true
- <<: *task_template
@@ -12074,7 +12074,7 @@ buildvariants:
# mobile storage engine.
test_flags: >-
--storageEngine=mobile
- --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling
+ --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact
--excludeWithAnyTags=SERVER-32709,SERVER-32869,SERVER-32993
compile_flags: >-
-j$(grep -c ^processor /proc/cpuinfo)
@@ -12132,7 +12132,7 @@ buildvariants:
expansions:
test_flags: >-
--storageEngine=mobile
- --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling
+ --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact
--excludeWithAnyTags=SERVER-32709,SERVER-32869,SERVER-32993
compile_flags: >-
-j$(grep -c ^processor /proc/cpuinfo)
@@ -12216,7 +12216,7 @@ buildvariants:
# mobile storage engine.
test_flags: >-
--storageEngine=mobile
- --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling
+ --excludeWithAnyTags=requires_mmapv1,requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact
--excludeWithAnyTags=SERVER-32709,SERVER-32869,SERVER-32993
compile_env: DEVELOPER_DIR=/Applications/Xcode8.3.app
compile_flags: >-
diff --git a/jstests/concurrency/fsm_workloads/compact.js b/jstests/concurrency/fsm_workloads/compact.js
index 99017b136d1..bf38ec947c2 100644
--- a/jstests/concurrency/fsm_workloads/compact.js
+++ b/jstests/concurrency/fsm_workloads/compact.js
@@ -11,7 +11,7 @@
* with wiredTiger LSM variants. Bypass this command for the wiredTiger LSM variant
* until a fix is available for WT-2523.
*
- * @tags: [does_not_support_wiredtiger_lsm]
+ * @tags: [does_not_support_wiredtiger_lsm, requires_compact]
*/
load('jstests/concurrency/fsm_workload_helpers/server_types.js'); // for isEphemeral
diff --git a/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js b/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js
index b1991cd3b8c..07c89bfc20d 100644
--- a/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js
+++ b/jstests/concurrency/fsm_workloads/compact_simultaneous_padding_bytes.js
@@ -6,6 +6,8 @@
* Bulk inserts 1000 documents and builds indexes. Then alternates between compacting the
* collection and verifying the number of documents and indexes. Operates on a single collection
* for all threads. Uses paddingBytes as a parameter for compact.
+ *
+ * @tags: [requires_compact]
*/
load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 4f69f3f5545..63d9bf89373 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1138,53 +1138,62 @@ void State::finalReduce(OperationContext* opCtx, CurOp* curOp, ProgressMeterHold
Collection* coll = getCollectionOrUassert(opCtx, ctx->getDb(), _config.incLong);
invariant(coll);
- auto exec = uassertStatusOK(getExecutor(
- _opCtx, coll, std::move(cq), PlanExecutor::YIELD_AUTO, QueryPlannerParams::NO_TABLE_SCAN));
-
- // Make sure the PlanExecutor is destroyed while holding a collection lock.
- ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] {
- if (!ctx) {
- AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS);
- exec.reset();
- }
- });
+ // The following anonymous block makes sure to destroy the executor prior to the
+ // finalReduce(all) call. This is important to clear the cursors being held by the
+ // storage engine.
+ {
+ auto exec = uassertStatusOK(getExecutor(_opCtx,
+ coll,
+ std::move(cq),
+ PlanExecutor::YIELD_AUTO,
+ QueryPlannerParams::NO_TABLE_SCAN));
+
+ // Make sure the PlanExecutor is destroyed while holding a collection lock.
+ ON_BLOCK_EXIT([&exec, &ctx, opCtx, this] {
+ if (!ctx) {
+ AutoGetCollection autoColl(opCtx, _config.incLong, MODE_IS);
+ exec.reset();
+ }
+ });
- // iterate over all sorted objects
- BSONObj o;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) {
- o = o.getOwned(); // we will be accessing outside of the lock
- pm.hit();
+ // iterate over all sorted objects
+ BSONObj o;
+ PlanExecutor::ExecState state;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&o, NULL))) {
+ o = o.getOwned(); // we will be accessing outside of the lock
+ pm.hit();
- if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) {
- // object is same as previous, add to array
- all.push_back(o);
- if (pm->hits() % 100 == 0) {
- _opCtx->checkForInterrupt();
+ if (dps::compareObjectsAccordingToSort(o, prev, sortKey) == 0) {
+ // object is same as previous, add to array
+ all.push_back(o);
+ if (pm->hits() % 100 == 0) {
+ _opCtx->checkForInterrupt();
+ }
+ continue;
}
- continue;
- }
- exec->saveState();
+ exec->saveState();
- ctx.reset();
+ ctx.reset();
- // reduce a finalize array
- finalReduce(all);
+ // reduce a finalize array
+ finalReduce(all);
- ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
+ ctx.reset(new AutoGetCollectionForReadCommand(_opCtx, _config.incLong));
- all.clear();
- prev = o;
- all.push_back(o);
+ all.clear();
+ prev = o;
+ all.push_back(o);
- _opCtx->checkForInterrupt();
- uassertStatusOK(exec->restoreState());
- }
+ _opCtx->checkForInterrupt();
+ uassertStatusOK(exec->restoreState());
+ }
- uassert(34428,
- "Plan executor error during mapReduce command: " + WorkingSetCommon::toStatusString(o),
- PlanExecutor::IS_EOF == state);
+ uassert(34428,
+ "Plan executor error during mapReduce command: " +
+ WorkingSetCommon::toStatusString(o),
+ PlanExecutor::IS_EOF == state);
+ }
ctx.reset();
// reduce and finalize last array
diff --git a/src/mongo/db/storage/mobile/mobile_index.cpp b/src/mongo/db/storage/mobile/mobile_index.cpp
index 416d39a0027..bfd190cb4fd 100644
--- a/src/mongo/db/storage/mobile/mobile_index.cpp
+++ b/src/mongo/db/storage/mobile/mobile_index.cpp
@@ -466,7 +466,9 @@ public:
// All work is done in restore().
void save() override {
- _resetStatement();
+ // SQLite acquires implicit locks over the snapshot this cursor is using. It is important
+ // to finalize the corresponding statement to release these locks.
+ _stmt->finalize();
}
void saveUnpositioned() override {
@@ -478,6 +480,12 @@ public:
return;
}
+ // Obtaining a session starts a read transaction if not done already.
+ MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx);
+ // save() finalized this cursor's SQLite statement. We need to prepare a new statement,
+ // before re-positioning it at the saved state.
+ _stmt->prepare(*session);
+
_startPosition.resetFromBuffer(_savedKey.getBuffer(), _savedKey.getSize());
bool isExactMatch = _doSeek();
diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp
index e016b604b7e..99b2b045a42 100644
--- a/src/mongo/db/storage/mobile/mobile_record_store.cpp
+++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp
@@ -116,7 +116,9 @@ public:
}
void save() final {
- _resetStatement();
+ // SQLite acquires implicit locks over the snapshot this cursor is using. It is important
+ // to finalize the corresponding statement to release these locks.
+ _stmt->finalize();
}
void saveUnpositioned() final {
@@ -129,7 +131,12 @@ public:
return true;
}
- _resetStatement();
+ // Obtaining a session starts a read transaction if not done already.
+ MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx);
+ // save() finalized this cursor's SQLite statement. We need to prepare a new statement,
+ // before re-positioning it at the saved state.
+ _stmt->prepare(*session);
+
_stmt->bindInt(0, _savedId.repr());
return true;
}
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
index a36a7092f9c..8bc16acb377 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
@@ -51,13 +51,13 @@ MobileRecoveryUnit::MobileRecoveryUnit(MobileSessionPool* sessionPool)
// Increment the global instance count and assign this instance an id.
_id = _nextID.addAndFetch(1);
- RECOVERY_UNIT_TRACE() << " Created.";
+ RECOVERY_UNIT_TRACE() << "Created.";
}
MobileRecoveryUnit::~MobileRecoveryUnit() {
invariant(!_inUnitOfWork);
_abort();
- RECOVERY_UNIT_TRACE() << " Destroyed.";
+ RECOVERY_UNIT_TRACE() << "Destroyed.";
}
void MobileRecoveryUnit::_commit() {
@@ -94,7 +94,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
invariant(!_areWriteUnitOfWorksBanned);
invariant(!_inUnitOfWork);
- RECOVERY_UNIT_TRACE() << " Unit of work Active.";
+ RECOVERY_UNIT_TRACE() << "Unit of work Active.";
if (_active) {
// Confirm a write transaction is not running
@@ -110,7 +110,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
void MobileRecoveryUnit::commitUnitOfWork() {
invariant(_inUnitOfWork);
- RECOVERY_UNIT_TRACE() << " Unit of work commited, marked inactive.";
+ RECOVERY_UNIT_TRACE() << "Unit of work commited, marked inactive.";
_inUnitOfWork = false;
_commit();
@@ -119,7 +119,7 @@ void MobileRecoveryUnit::commitUnitOfWork() {
void MobileRecoveryUnit::abortUnitOfWork() {
invariant(_inUnitOfWork);
- RECOVERY_UNIT_TRACE() << " Unit of work aborted, marked inactive.";
+ RECOVERY_UNIT_TRACE() << "Unit of work aborted, marked inactive.";
_inUnitOfWork = false;
_abort();
@@ -140,7 +140,7 @@ void MobileRecoveryUnit::registerChange(Change* change) {
}
MobileSession* MobileRecoveryUnit::getSession(OperationContext* opCtx, bool readOnly) {
- RECOVERY_UNIT_TRACE() << " getSession called with readOnly:" << (readOnly ? "TRUE" : "FALSE");
+ RECOVERY_UNIT_TRACE() << "getSession called with readOnly:" << (readOnly ? "TRUE" : "FALSE");
invariant(_inUnitOfWork || readOnly);
if (!_active) {
@@ -160,7 +160,7 @@ void MobileRecoveryUnit::assertInActiveTxn() const {
}
void MobileRecoveryUnit::_ensureSession(OperationContext* opCtx) {
- RECOVERY_UNIT_TRACE() << " Creating new session:" << (_session ? "NO" : "YES");
+ RECOVERY_UNIT_TRACE() << "Creating new session:" << (_session ? "NO" : "YES");
if (!_session) {
_session = _sessionPool->getSession(opCtx);
}
@@ -168,7 +168,7 @@ void MobileRecoveryUnit::_ensureSession(OperationContext* opCtx) {
void MobileRecoveryUnit::_txnOpen(OperationContext* opCtx, bool readOnly) {
invariant(!_active);
- RECOVERY_UNIT_TRACE() << " _txnOpen called with readOnly:" << (readOnly ? "TRUE" : "FALSE");
+ RECOVERY_UNIT_TRACE() << "_txnOpen called with readOnly:" << (readOnly ? "TRUE" : "FALSE");
_ensureSession(opCtx);
/*
@@ -206,7 +206,7 @@ void MobileRecoveryUnit::_txnOpen(OperationContext* opCtx, bool readOnly) {
void MobileRecoveryUnit::_txnClose(bool commit) {
invariant(_active);
- RECOVERY_UNIT_TRACE() << " _txnClose called with " << (commit ? "commit " : "rollback ");
+ RECOVERY_UNIT_TRACE() << "_txnClose called with " << (commit ? "commit " : "rollback ");
if (commit) {
SqliteStatement::execQuery(_session.get(), "COMMIT");
diff --git a/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp b/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp
index 31601a79207..e5ff02c8b97 100644
--- a/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp
+++ b/src/mongo/db/storage/mobile/mobile_sqlite_statement.cpp
@@ -49,15 +49,33 @@ AtomicInt64 SqliteStatement::_nextID(0);
SqliteStatement::SqliteStatement(const MobileSession& session, const std::string& sqlQuery) {
// Increment the global instance count and assign this instance an id.
_id = _nextID.addAndFetch(1);
- SQLITE_STMT_TRACE() << "Preparing: " << sqlQuery;
+ _sqlQuery = sqlQuery;
+
+ prepare(session);
+}
+
+void SqliteStatement::finalize() {
+ if (!_stmt) {
+ return;
+ }
+ SQLITE_STMT_TRACE() << "Finalize: " << _sqlQuery;
+
+ int status = sqlite3_finalize(_stmt);
+ fassert(37053, status == _exceptionStatus);
+ _stmt = NULL;
+}
+
+void SqliteStatement::prepare(const MobileSession& session) {
+ SQLITE_STMT_TRACE() << "Preparing: " << _sqlQuery;
+
int status = sqlite3_prepare_v2(
- session.getSession(), sqlQuery.c_str(), sqlQuery.length() + 1, &_stmt, NULL);
+ session.getSession(), _sqlQuery.c_str(), _sqlQuery.length() + 1, &_stmt, NULL);
if (status == SQLITE_BUSY) {
SQLITE_STMT_TRACE() << "Throwing writeConflictException, "
- << "SQLITE_BUSY while preparing: " << sqlQuery;
+ << "SQLITE_BUSY while preparing: " << _sqlQuery;
throw WriteConflictException();
} else if (status != SQLITE_OK) {
- SQLITE_STMT_TRACE() << "Error while preparing: " << sqlQuery;
+ SQLITE_STMT_TRACE() << "Error while preparing: " << _sqlQuery;
std::string errMsg = "sqlite3_prepare_v2 failed: ";
errMsg += sqlite3_errstr(status);
uasserted(ErrorCodes::UnknownError, errMsg);
@@ -65,8 +83,7 @@ SqliteStatement::SqliteStatement(const MobileSession& session, const std::string
}
SqliteStatement::~SqliteStatement() {
- int status = sqlite3_finalize(_stmt);
- fassert(37053, status == _exceptionStatus);
+ finalize();
}
void SqliteStatement::bindInt(int paramIndex, int64_t intValue) {
diff --git a/src/mongo/db/storage/mobile/mobile_sqlite_statement.h b/src/mongo/db/storage/mobile/mobile_sqlite_statement.h
index 095e4c94205..096d9978a12 100644
--- a/src/mongo/db/storage/mobile/mobile_sqlite_statement.h
+++ b/src/mongo/db/storage/mobile/mobile_sqlite_statement.h
@@ -116,11 +116,22 @@ public:
*/
static void execQuery(MobileSession* session, const std::string& query);
+ /**
+ * Finalizes a prepared statement.
+ */
+ void finalize();
+
+ /**
+ * Prepare a statement with the given mobile session.
+ */
+ void prepare(const MobileSession& session);
+
uint64_t _id;
private:
static AtomicInt64 _nextID;
sqlite3_stmt* _stmt;
+ std::string _sqlQuery;
// If the most recent call to sqlite3_step on this statement returned an error, the error is
// returned again when the statement is finalized. This is used to verify that the last error