summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2017-07-05 09:36:09 -0400
committerDaniel Gottlieb <daniel.gottlieb@mongodb.com>2017-07-05 09:36:09 -0400
commitb188121303e27761b34bfb67be0732c346999168 (patch)
tree0a4d819e59cd8b6d1292bd64b899516f52bb6513
parentb62fbc2a3fe1ed9f1df55bf579e36128e849331b (diff)
downloadmongo-b188121303e27761b34bfb67be0732c346999168.tar.gz
SERVER-27831 SERVER-28737: Push threadsafety responsibility inside of KVCatalog's RecordStore
RecordStores that don't implement document level locking are typically protected from concurrent reads and writes. However one exception is the RecordStore passed into KVCatalog. Previously, if StorageEngine::supportsDocLocking was false, the KVCatalog would use an additional lock that participated in two phase locking to ensure reader-writer protection to the underlying record store (and more specifically, delay releasing until any potential rollbacks were processed). However, access to the catalog can happen anywhere and this lock did not have a formally assigned acquisition time relative to other locks resulting in potential deadlocks. This patch forces the thread-safety requirement into the RecordStore. Specifically, EphemeralForTest was changed to acquire a finer grained mutex (i.e: does not participate in two-phase locking) to protect its internal state. Now that the lock exists inside EphemeralForTest, it is also able to grab the mutex when needed for the onCommit/onRollback callbacks. (cherry picked from commit 71a149b45c8bb019cbc8179f4a411be66bda2062) (cherry picked from commit 99d1ad9e6ef9bd270f9f668966a71596a71f7f72)
-rw-r--r--src/mongo/db/concurrency/locker_noop.h4
-rw-r--r--src/mongo/db/storage/README.md2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/SConscript1
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp79
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h11
-rw-r--r--src/mongo/db/storage/kv/kv_catalog.cpp78
-rw-r--r--src/mongo/db/storage/kv/kv_catalog.h8
-rw-r--r--src/mongo/db/storage/kv/kv_catalog_feature_tracker_test.cpp2
-rw-r--r--src/mongo/db/storage/kv/kv_engine_test_harness.cpp12
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp6
-rw-r--r--src/mongo/db/storage/record_store.h4
11 files changed, 88 insertions, 119 deletions
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index 2de30224d83..518fbcf308b 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -93,7 +93,7 @@ public:
LockMode mode,
Milliseconds timeout,
bool checkDeadlock) {
- invariant(false);
+ return LockResult::LOCK_OK;
}
virtual void downgrade(ResourceId resId, LockMode newMode) {
@@ -101,7 +101,7 @@ public:
}
virtual bool unlock(ResourceId resId) {
- invariant(false);
+ return true;
}
virtual LockMode getLockMode(ResourceId resId) const {
diff --git a/src/mongo/db/storage/README.md b/src/mongo/db/storage/README.md
index 755bd173915..8326f47c8ae 100644
--- a/src/mongo/db/storage/README.md
+++ b/src/mongo/db/storage/README.md
@@ -107,7 +107,7 @@ details.
* [KVEngine](kv/kv_engine.h)
* [RecordStore](record_store.h)
-* [RecoveryUnit](ecovery_unit.h)
+* [RecoveryUnit](recovery_unit.h)
* [SeekableRecordCursor](record_store.h)
* [SortedDataInterface](sorted_data_interface.h)
* [ServerStatusSection](../commands/server_status.h)
diff --git a/src/mongo/db/storage/ephemeral_for_test/SConscript b/src/mongo/db/storage/ephemeral_for_test/SConscript
index c3ec899e74a..8938edf7e05 100644
--- a/src/mongo/db/storage/ephemeral_for_test/SConscript
+++ b/src/mongo/db/storage/ephemeral_for_test/SConscript
@@ -8,6 +8,7 @@ env.Library(
],
LIBDEPS= [
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/storage/oplog_hack',
]
)
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
index 3d51b105240..a537a9c63c6 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h"
-
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@@ -50,9 +49,12 @@ using std::shared_ptr;
class EphemeralForTestRecordStore::InsertChange : public RecoveryUnit::Change {
public:
- InsertChange(Data* data, RecordId loc) : _data(data), _loc(loc) {}
+ InsertChange(OperationContext* opCtx, Data* data, RecordId loc)
+ : _opCtx(opCtx), _data(data), _loc(loc) {}
virtual void commit() {}
virtual void rollback() {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
Records::iterator it = _data->records.find(_loc);
if (it != _data->records.end()) {
_data->dataSize -= it->second.size;
@@ -61,6 +63,7 @@ public:
}
private:
+ OperationContext* _opCtx;
Data* const _data;
const RecordId _loc;
};
@@ -68,11 +71,16 @@ private:
// Works for both removes and updates
class EphemeralForTestRecordStore::RemoveChange : public RecoveryUnit::Change {
public:
- RemoveChange(Data* data, RecordId loc, const EphemeralForTestRecord& rec)
- : _data(data), _loc(loc), _rec(rec) {}
+ RemoveChange(OperationContext* opCtx,
+ Data* data,
+ RecordId loc,
+ const EphemeralForTestRecord& rec)
+ : _opCtx(opCtx), _data(data), _loc(loc), _rec(rec) {}
virtual void commit() {}
virtual void rollback() {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
Records::iterator it = _data->records.find(_loc);
if (it != _data->records.end()) {
_data->dataSize -= it->second.size;
@@ -83,6 +91,7 @@ public:
}
private:
+ OperationContext* _opCtx;
Data* const _data;
const RecordId _loc;
const EphemeralForTestRecord _rec;
@@ -90,8 +99,10 @@ private:
class EphemeralForTestRecordStore::TruncateChange : public RecoveryUnit::Change {
public:
- TruncateChange(Data* data) : _data(data), _dataSize(0) {
+ TruncateChange(OperationContext* opCtx, Data* data) : _opCtx(opCtx), _data(data), _dataSize(0) {
using std::swap;
+
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
swap(_dataSize, _data->dataSize);
swap(_records, _data->records);
}
@@ -99,11 +110,14 @@ public:
virtual void commit() {}
virtual void rollback() {
using std::swap;
+
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
swap(_dataSize, _data->dataSize);
swap(_records, _data->records);
}
private:
+ OperationContext* _opCtx;
Data* const _data;
int64_t _dataSize;
Records _records;
@@ -264,7 +278,7 @@ EphemeralForTestRecordStore::EphemeralForTestRecordStore(StringData ns,
_cappedMaxDocs(cappedMaxDocs),
_cappedCallback(cappedCallback),
_data(*dataInOut ? static_cast<Data*>(dataInOut->get())
- : new Data(NamespaceString::oplog(ns))) {
+ : new Data(ns, NamespaceString::oplog(ns))) {
if (!*dataInOut) {
dataInOut->reset(_data); // takes ownership
}
@@ -283,6 +297,7 @@ const char* EphemeralForTestRecordStore::name() const {
}
RecordData EphemeralForTestRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
return recordFor(loc)->toRecordData();
}
@@ -311,6 +326,8 @@ EphemeralForTestRecordStore::EphemeralForTestRecord* EphemeralForTestRecordStore
bool EphemeralForTestRecordStore::findRecord(OperationContext* txn,
const RecordId& loc,
RecordData* rd) const {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
Records::const_iterator it = _data->records.find(loc);
if (it == _data->records.end()) {
return false;
@@ -320,13 +337,19 @@ bool EphemeralForTestRecordStore::findRecord(OperationContext* txn,
}
void EphemeralForTestRecordStore::deleteRecord(OperationContext* txn, const RecordId& loc) {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
+ deleteRecord_inlock(txn, loc);
+}
+
+void EphemeralForTestRecordStore::deleteRecord_inlock(OperationContext* txn, const RecordId& loc) {
EphemeralForTestRecord* rec = recordFor(loc);
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *rec));
+ txn->recoveryUnit()->registerChange(new RemoveChange(txn, _data, loc, *rec));
_data->dataSize -= rec->size;
invariant(_data->records.erase(loc) == 1);
}
-bool EphemeralForTestRecordStore::cappedAndNeedDelete(OperationContext* txn) const {
+bool EphemeralForTestRecordStore::cappedAndNeedDelete_inlock(OperationContext* txn) const {
if (!_isCapped)
return false;
@@ -339,8 +362,8 @@ bool EphemeralForTestRecordStore::cappedAndNeedDelete(OperationContext* txn) con
return false;
}
-void EphemeralForTestRecordStore::cappedDeleteAsNeeded(OperationContext* txn) {
- while (cappedAndNeedDelete(txn)) {
+void EphemeralForTestRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn) {
+ while (cappedAndNeedDelete_inlock(txn)) {
invariant(!_data->records.empty());
Records::iterator oldest = _data->records.begin();
@@ -350,7 +373,7 @@ void EphemeralForTestRecordStore::cappedDeleteAsNeeded(OperationContext* txn) {
if (_cappedCallback)
uassertStatusOK(_cappedCallback->aboutToDeleteCapped(txn, id, data));
- deleteRecord(txn, id);
+ deleteRecord_inlock(txn, id);
}
}
@@ -376,6 +399,7 @@ StatusWith<RecordId> EphemeralForTestRecordStore::insertRecord(OperationContext*
return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
}
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
EphemeralForTestRecord rec(len);
memcpy(rec.data.get(), data, len);
@@ -389,11 +413,11 @@ StatusWith<RecordId> EphemeralForTestRecordStore::insertRecord(OperationContext*
loc = allocateLoc();
}
- txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
+ txn->recoveryUnit()->registerChange(new InsertChange(txn, _data, loc));
_data->dataSize += len;
_data->records[loc] = rec;
- cappedDeleteAsNeeded(txn);
+ cappedDeleteAsNeeded_inlock(txn);
return StatusWith<RecordId>(loc);
}
@@ -402,6 +426,8 @@ Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext*
const DocWriter* const* docs,
size_t nDocs,
RecordId* idsOut) {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
for (size_t i = 0; i < nDocs; i++) {
const int len = docs[i]->documentSize();
if (_isCapped && len > _cappedMaxSize) {
@@ -423,11 +449,11 @@ Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext*
loc = allocateLoc();
}
- txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
+ txn->recoveryUnit()->registerChange(new InsertChange(txn, _data, loc));
_data->dataSize += len;
_data->records[loc] = rec;
- cappedDeleteAsNeeded(txn);
+ cappedDeleteAsNeeded_inlock(txn);
if (idsOut)
idsOut[i] = loc;
@@ -442,6 +468,7 @@ Status EphemeralForTestRecordStore::updateRecord(OperationContext* txn,
int len,
bool enforceQuota,
UpdateNotifier* notifier) {
+ stdx::unique_lock<stdx::mutex> lock(_data->recordsMutex);
EphemeralForTestRecord* oldRecord = recordFor(loc);
int oldLen = oldRecord->size;
@@ -451,21 +478,22 @@ Status EphemeralForTestRecordStore::updateRecord(OperationContext* txn,
if (notifier) {
// The in-memory KV engine uses the invalidation framework (does not support
// doc-locking), and therefore must notify that it is updating a document.
+ lock.unlock();
Status callbackStatus = notifier->recordStoreGoingToUpdateInPlace(txn, loc);
if (!callbackStatus.isOK()) {
return callbackStatus;
}
+ lock.lock();
}
EphemeralForTestRecord newRecord(len);
memcpy(newRecord.data.get(), data, len);
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord));
+ txn->recoveryUnit()->registerChange(new RemoveChange(txn, _data, loc, *oldRecord));
_data->dataSize += len - oldLen;
*oldRecord = newRecord;
- cappedDeleteAsNeeded(txn);
-
+ cappedDeleteAsNeeded_inlock(txn);
return Status::OK();
}
@@ -479,16 +507,19 @@ StatusWith<RecordData> EphemeralForTestRecordStore::updateWithDamages(
const RecordData& oldRec,
const char* damageSource,
const mutablebson::DamageVector& damages) {
+
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
EphemeralForTestRecord* oldRecord = recordFor(loc);
const int len = oldRecord->size;
EphemeralForTestRecord newRecord(len);
memcpy(newRecord.data.get(), oldRecord->data.get(), len);
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord));
+ txn->recoveryUnit()->registerChange(new RemoveChange(txn, _data, loc, *oldRecord));
*oldRecord = newRecord;
- cappedDeleteAsNeeded(txn);
+ cappedDeleteAsNeeded_inlock(txn);
char* root = newRecord.data.get();
mutablebson::DamageVector::const_iterator where = damages.begin();
@@ -514,17 +545,18 @@ std::unique_ptr<SeekableRecordCursor> EphemeralForTestRecordStore::getCursor(Ope
Status EphemeralForTestRecordStore::truncate(OperationContext* txn) {
// Unlike other changes, TruncateChange mutates _data on construction to perform the
// truncate
- txn->recoveryUnit()->registerChange(new TruncateChange(_data));
+ txn->recoveryUnit()->registerChange(new TruncateChange(txn, _data));
return Status::OK();
}
void EphemeralForTestRecordStore::temp_cappedTruncateAfter(OperationContext* txn,
RecordId end,
bool inclusive) {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
Records::iterator it =
inclusive ? _data->records.lower_bound(end) : _data->records.upper_bound(end);
while (it != _data->records.end()) {
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, it->first, it->second));
+ txn->recoveryUnit()->registerChange(new RemoveChange(txn, _data, it->first, it->second));
_data->dataSize -= it->second.size;
_data->records.erase(it++);
}
@@ -535,6 +567,8 @@ Status EphemeralForTestRecordStore::validate(OperationContext* txn,
ValidateAdaptor* adaptor,
ValidateResults* results,
BSONObjBuilder* output) {
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
+
results->valid = true;
if (level == kValidateFull) {
for (Records::const_iterator it = _data->records.begin(); it != _data->records.end();
@@ -602,6 +636,7 @@ boost::optional<RecordId> EphemeralForTestRecordStore::oplogStartHack(
if (!_data->isOplog)
return boost::none;
+ stdx::lock_guard<stdx::mutex> lock(_data->recordsMutex);
const Records& records = _data->records;
if (records.empty())
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
index 58dd3d564e3..4cc9eb377bb 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
@@ -33,8 +33,10 @@
#include <boost/shared_array.hpp>
#include <map>
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/storage/capped_callback.h"
#include "mongo/db/storage/record_store.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -179,8 +181,9 @@ private:
StatusWith<RecordId> extractAndCheckLocForOplog(const char* data, int len) const;
RecordId allocateLoc();
- bool cappedAndNeedDelete(OperationContext* txn) const;
- void cappedDeleteAsNeeded(OperationContext* txn);
+ bool cappedAndNeedDelete_inlock(OperationContext* txn) const;
+ void cappedDeleteAsNeeded_inlock(OperationContext* txn);
+ void deleteRecord_inlock(OperationContext* txn, const RecordId& dl);
// TODO figure out a proper solution to metadata
const bool _isCapped;
@@ -190,9 +193,11 @@ private:
// This is the "persistent" data.
struct Data {
- Data(bool isOplog) : dataSize(0), nextId(1), isOplog(isOplog) {}
+ Data(StringData ns, bool isOplog)
+ : dataSize(0), recordsMutex(), nextId(1), isOplog(isOplog) {}
int64_t dataSize;
+ stdx::mutex recordsMutex;
Records records;
int64_t nextId;
const bool isOplog;
diff --git a/src/mongo/db/storage/kv/kv_catalog.cpp b/src/mongo/db/storage/kv/kv_catalog.cpp
index a4b3925f078..588e52e24ad 100644
--- a/src/mongo/db/storage/kv/kv_catalog.cpp
+++ b/src/mongo/db/storage/kv/kv_catalog.cpp
@@ -53,9 +53,6 @@ namespace {
// This is a global resource, which protects accesses to the catalog metadata (instance-wide).
// It is never used with KVEngines that support doc-level locking so this should never conflict
// with anything else.
-//
-// NOTE: Must be locked *before* _identLock.
-const ResourceId resourceIdCatalogMetadata(RESOURCE_METADATA, 1ULL);
const char kIsFeatureDocumentFieldName[] = "isFeatureDoc";
const char kNamespaceFieldName[] = "ns";
@@ -123,12 +120,6 @@ bool KVCatalog::FeatureTracker::isFeatureDocument(BSONObj obj) {
}
Status KVCatalog::FeatureTracker::isCompatibleWithCurrentCode(OperationContext* opCtx) const {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk = stdx::make_unique<Lock::ResourceLock>(
- opCtx->lockState(), resourceIdCatalogMetadata, MODE_S);
- }
-
FeatureBits versionInfo = getInfo(opCtx);
uint64_t unrecognizedNonRepairableFeatures =
@@ -173,22 +164,12 @@ std::unique_ptr<KVCatalog::FeatureTracker> KVCatalog::FeatureTracker::create(
bool KVCatalog::FeatureTracker::isNonRepairableFeatureInUse(OperationContext* opCtx,
NonRepairableFeature feature) const {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_S));
- }
-
FeatureBits versionInfo = getInfo(opCtx);
return versionInfo.nonRepairableFeatures & static_cast<NonRepairableFeatureMask>(feature);
}
void KVCatalog::FeatureTracker::markNonRepairableFeatureAsInUse(OperationContext* opCtx,
NonRepairableFeature feature) {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
FeatureBits versionInfo = getInfo(opCtx);
versionInfo.nonRepairableFeatures |= static_cast<NonRepairableFeatureMask>(feature);
putInfo(opCtx, versionInfo);
@@ -196,11 +177,6 @@ void KVCatalog::FeatureTracker::markNonRepairableFeatureAsInUse(OperationContext
void KVCatalog::FeatureTracker::markNonRepairableFeatureAsNotInUse(OperationContext* opCtx,
NonRepairableFeature feature) {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
FeatureBits versionInfo = getInfo(opCtx);
versionInfo.nonRepairableFeatures &= ~static_cast<NonRepairableFeatureMask>(feature);
putInfo(opCtx, versionInfo);
@@ -208,22 +184,12 @@ void KVCatalog::FeatureTracker::markNonRepairableFeatureAsNotInUse(OperationCont
bool KVCatalog::FeatureTracker::isRepairableFeatureInUse(OperationContext* opCtx,
RepairableFeature feature) const {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_S));
- }
-
FeatureBits versionInfo = getInfo(opCtx);
return versionInfo.repairableFeatures & static_cast<RepairableFeatureMask>(feature);
}
void KVCatalog::FeatureTracker::markRepairableFeatureAsInUse(OperationContext* opCtx,
RepairableFeature feature) {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
FeatureBits versionInfo = getInfo(opCtx);
versionInfo.repairableFeatures |= static_cast<RepairableFeatureMask>(feature);
putInfo(opCtx, versionInfo);
@@ -231,11 +197,6 @@ void KVCatalog::FeatureTracker::markRepairableFeatureAsInUse(OperationContext* o
void KVCatalog::FeatureTracker::markRepairableFeatureAsNotInUse(OperationContext* opCtx,
RepairableFeature feature) {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
FeatureBits versionInfo = getInfo(opCtx);
versionInfo.repairableFeatures &= ~static_cast<RepairableFeatureMask>(feature);
putInfo(opCtx, versionInfo);
@@ -243,10 +204,6 @@ void KVCatalog::FeatureTracker::markRepairableFeatureAsNotInUse(OperationContext
KVCatalog::FeatureTracker::FeatureBits KVCatalog::FeatureTracker::getInfo(
OperationContext* opCtx) const {
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- invariant(opCtx->lockState()->isLockHeldForMode(resourceIdCatalogMetadata, MODE_S));
- }
-
if (_rid.isNull()) {
return {};
}
@@ -274,10 +231,6 @@ KVCatalog::FeatureTracker::FeatureBits KVCatalog::FeatureTracker::getInfo(
}
void KVCatalog::FeatureTracker::putInfo(OperationContext* opCtx, const FeatureBits& versionInfo) {
- if (!_catalog->_isRsThreadSafe && opCtx->lockState()) {
- invariant(opCtx->lockState()->isLockHeldForMode(resourceIdCatalogMetadata, MODE_X));
- }
-
BSONObjBuilder bob;
bob.appendBool(kIsFeatureDocumentFieldName, true);
// We intentionally include the "ns" field with a null value in the feature document to prevent
@@ -306,12 +259,8 @@ void KVCatalog::FeatureTracker::putInfo(OperationContext* opCtx, const FeatureBi
}
}
-KVCatalog::KVCatalog(RecordStore* rs,
- bool isRsThreadSafe,
- bool directoryPerDb,
- bool directoryForIndexes)
+KVCatalog::KVCatalog(RecordStore* rs, bool directoryPerDb, bool directoryForIndexes)
: _rs(rs),
- _isRsThreadSafe(isRsThreadSafe),
_directoryPerDb(directoryPerDb),
_directoryForIndexes(directoryForIndexes),
_rand(_newRand()) {}
@@ -392,11 +341,6 @@ Status KVCatalog::newCollection(OperationContext* opCtx,
invariant(opCtx->lockState() == NULL ||
opCtx->lockState()->isDbLockedForMode(nsToDatabaseSubstring(ns), MODE_X));
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
const string ident = _newUniqueIdent(ns, "collection");
stdx::lock_guard<stdx::mutex> lk(_identsLock);
@@ -444,11 +388,6 @@ std::string KVCatalog::getIndexIdent(OperationContext* opCtx,
}
BSONObj KVCatalog::_findEntry(OperationContext* opCtx, StringData ns, RecordId* out) const {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_S));
- }
-
RecordId dl;
{
stdx::lock_guard<stdx::mutex> lk(_identsLock);
@@ -488,11 +427,6 @@ const BSONCollectionCatalogEntry::MetaData KVCatalog::getMetaData(OperationConte
void KVCatalog::putMetaData(OperationContext* opCtx,
StringData ns,
BSONCollectionCatalogEntry::MetaData& md) {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
RecordId loc;
BSONObj obj = _findEntry(opCtx, ns, &loc);
@@ -533,11 +467,6 @@ Status KVCatalog::renameCollection(OperationContext* opCtx,
StringData fromNS,
StringData toNS,
bool stayTemp) {
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
RecordId loc;
BSONObj old = _findEntry(opCtx, fromNS, &loc).getOwned();
{
@@ -575,11 +504,6 @@ Status KVCatalog::renameCollection(OperationContext* opCtx,
Status KVCatalog::dropCollection(OperationContext* opCtx, StringData ns) {
invariant(opCtx->lockState() == NULL ||
opCtx->lockState()->isDbLockedForMode(nsToDatabaseSubstring(ns), MODE_X));
- std::unique_ptr<Lock::ResourceLock> rLk;
- if (!_isRsThreadSafe && opCtx->lockState()) {
- rLk.reset(new Lock::ResourceLock(opCtx->lockState(), resourceIdCatalogMetadata, MODE_X));
- }
-
stdx::lock_guard<stdx::mutex> lk(_identsLock);
const NSToIdentMap::iterator it = _idents.find(ns.toString());
if (it == _idents.end()) {
diff --git a/src/mongo/db/storage/kv/kv_catalog.h b/src/mongo/db/storage/kv/kv_catalog.h
index d84edfdde3b..8f05b52564d 100644
--- a/src/mongo/db/storage/kv/kv_catalog.h
+++ b/src/mongo/db/storage/kv/kv_catalog.h
@@ -50,9 +50,12 @@ public:
class FeatureTracker;
/**
- * @param rs - does NOT take ownership
+ * @param rs - does NOT take ownership. The RecordStore must be thread-safe, in particular
+ * with concurrent calls to RecordStore::find, updateRecord, insertRecord, deleteRecord and
+ * dataFor. The KVCatalog does not utilize Cursors and those methods may omit further
+ * protection.
*/
- KVCatalog(RecordStore* rs, bool isRsThreadSafe, bool directoryPerDb, bool directoryForIndexes);
+ KVCatalog(RecordStore* rs, bool directoryPerDb, bool directoryForIndexes);
~KVCatalog();
void init(OperationContext* opCtx);
@@ -108,7 +111,6 @@ private:
bool _hasEntryCollidingWithRand() const;
RecordStore* _rs; // not owned
- const bool _isRsThreadSafe;
const bool _directoryPerDb;
const bool _directoryForIndexes;
diff --git a/src/mongo/db/storage/kv/kv_catalog_feature_tracker_test.cpp b/src/mongo/db/storage/kv/kv_catalog_feature_tracker_test.cpp
index 03e56361cd8..a2c5cae27ae 100644
--- a/src/mongo/db/storage/kv/kv_catalog_feature_tracker_test.cpp
+++ b/src/mongo/db/storage/kv/kv_catalog_feature_tracker_test.cpp
@@ -79,7 +79,7 @@ public:
wuow.commit();
}
- _catalog = stdx::make_unique<KVCatalog>(_rs.get(), true, false, false);
+ _catalog = stdx::make_unique<KVCatalog>(_rs.get(), false, false);
_catalog->init(opCtx.get());
{
diff --git a/src/mongo/db/storage/kv/kv_engine_test_harness.cpp b/src/mongo/db/storage/kv/kv_engine_test_harness.cpp
index a7e169a4610..52fd05587cc 100644
--- a/src/mongo/db/storage/kv/kv_engine_test_harness.cpp
+++ b/src/mongo/db/storage/kv/kv_engine_test_harness.cpp
@@ -174,7 +174,7 @@ TEST(KVCatalogTest, Coll1) {
WriteUnitOfWork uow(&opCtx);
ASSERT_OK(engine->createRecordStore(&opCtx, "catalog", "catalog", CollectionOptions()));
rs = engine->getRecordStore(&opCtx, "catalog", "catalog", CollectionOptions());
- catalog.reset(new KVCatalog(rs.get(), true, false, false));
+ catalog.reset(new KVCatalog(rs.get(), false, false));
uow.commit();
}
@@ -190,7 +190,7 @@ TEST(KVCatalogTest, Coll1) {
{
MyOperationContext opCtx(engine);
WriteUnitOfWork uow(&opCtx);
- catalog.reset(new KVCatalog(rs.get(), true, false, false));
+ catalog.reset(new KVCatalog(rs.get(), false, false));
catalog->init(&opCtx);
uow.commit();
}
@@ -218,7 +218,7 @@ TEST(KVCatalogTest, Idx1) {
WriteUnitOfWork uow(&opCtx);
ASSERT_OK(engine->createRecordStore(&opCtx, "catalog", "catalog", CollectionOptions()));
rs = engine->getRecordStore(&opCtx, "catalog", "catalog", CollectionOptions());
- catalog.reset(new KVCatalog(rs.get(), true, false, false));
+ catalog.reset(new KVCatalog(rs.get(), false, false));
uow.commit();
}
@@ -291,7 +291,7 @@ TEST(KVCatalogTest, DirectoryPerDb1) {
WriteUnitOfWork uow(&opCtx);
ASSERT_OK(engine->createRecordStore(&opCtx, "catalog", "catalog", CollectionOptions()));
rs = engine->getRecordStore(&opCtx, "catalog", "catalog", CollectionOptions());
- catalog.reset(new KVCatalog(rs.get(), true, true, false));
+ catalog.reset(new KVCatalog(rs.get(), true, false));
uow.commit();
}
@@ -333,7 +333,7 @@ TEST(KVCatalogTest, Split1) {
WriteUnitOfWork uow(&opCtx);
ASSERT_OK(engine->createRecordStore(&opCtx, "catalog", "catalog", CollectionOptions()));
rs = engine->getRecordStore(&opCtx, "catalog", "catalog", CollectionOptions());
- catalog.reset(new KVCatalog(rs.get(), true, false, true));
+ catalog.reset(new KVCatalog(rs.get(), false, true));
uow.commit();
}
@@ -375,7 +375,7 @@ TEST(KVCatalogTest, DirectoryPerAndSplit1) {
WriteUnitOfWork uow(&opCtx);
ASSERT_OK(engine->createRecordStore(&opCtx, "catalog", "catalog", CollectionOptions()));
rs = engine->getRecordStore(&opCtx, "catalog", "catalog", CollectionOptions());
- catalog.reset(new KVCatalog(rs.get(), true, true, true));
+ catalog.reset(new KVCatalog(rs.get(), true, true));
uow.commit();
}
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index 14b42dce18c..775abfdfb03 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -99,10 +99,8 @@ KVStorageEngine::KVStorageEngine(KVEngine* engine, const KVStorageEngineOptions&
_catalogRecordStore =
_engine->getRecordStore(&opCtx, catalogInfo, catalogInfo, CollectionOptions());
- _catalog.reset(new KVCatalog(_catalogRecordStore.get(),
- _supportsDocLocking,
- _options.directoryPerDB,
- _options.directoryForIndexes));
+ _catalog.reset(new KVCatalog(
+ _catalogRecordStore.get(), _options.directoryPerDB, _options.directoryForIndexes));
_catalog->init(&opCtx);
std::vector<std::string> collections;
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index ac04c5ca5e2..64ccf71c39f 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -277,6 +277,10 @@ public:
* Many methods take an OperationContext parameter. This contains the RecoveryUnit, with
* all RecordStore specific transaction information, as well as the LockState. Methods that take
* an OperationContext may throw a WriteConflictException.
+ *
+ * This class must be thread-safe for document-level locking storage engines. In addition, for
+ * storage engines implementing the KVEngine some methods must be thread safe, see KVCatalog. Only
+ * for MMAPv1 is this class not thread-safe.
*/
class RecordStore {
MONGO_DISALLOW_COPYING(RecordStore);