diff options
author | Igor Canadi <icanadi@fb.com> | 2014-11-24 14:28:10 -0800 |
---|---|---|
committer | Matt Kangas <matt.kangas@mongodb.com> | 2014-12-02 10:44:56 -0500 |
commit | efb33fd38bd0e8733d14f657e7fa634880ec8f1d (patch) | |
tree | 6b46b12e434cdca4babb2e3efafa100b39b1fb6c /src/mongo/db/storage/rocks/rocks_transaction.cpp | |
parent | 11b2d7b6051766674c2d66c86d9f23d0befdee09 (diff) | |
download | mongo-efb33fd38bd0e8733d14f657e7fa634880ec8f1d.tar.gz |
SERVER-16259 Rocks storage engine needs to support document locking
SERVER-16309 - Fix rocks_recovery_unit
SERVER-15744 Concurrency fix -- Fixed jstests/core/bench_test3.js
Signed-off-by: Matt Kangas <matt.kangas@mongodb.com>
Diffstat (limited to 'src/mongo/db/storage/rocks/rocks_transaction.cpp')
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_transaction.cpp | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/src/mongo/db/storage/rocks/rocks_transaction.cpp b/src/mongo/db/storage/rocks/rocks_transaction.cpp new file mode 100644 index 00000000000..3d27b5ef1ec --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_transaction.cpp @@ -0,0 +1,84 @@ +// rocks_transaction.cpp + +/* + * TODO(mongo) Add licence header + */ + +#include "mongo/db/storage/rocks/rocks_transaction.h" + +#include <atomic> +#include <map> +#include <memory> +#include <string> + +// for invariant() +#include "mongo/util/assert_util.h" + +namespace mongo { + RocksTransactionEngine::RocksTransactionEngine() : _latestSeqId(1) { + for (size_t i = 0; i < kNumSeqIdShards; ++i) { + _seqId[i] = 0; + _uncommittedTransactionId[i] = 0; + } + } + + void RocksTransaction::commit() { + if (_writeShards.empty()) { + return; + } + uint64_t newSeqId = 0; + { + boost::mutex::scoped_lock lk(_transactionEngine->_commitLock); + for (auto writeShard : _writeShards) { + invariant(_transactionEngine->_seqId[writeShard] <= _snapshotSeqId); + invariant(_transactionEngine->_uncommittedTransactionId[writeShard] == + _transactionId); + _transactionEngine->_uncommittedTransactionId[writeShard] = 0; + } + newSeqId = + _transactionEngine->_latestSeqId.load(std::memory_order::memory_order_relaxed) + 1; + for (auto writeShard : _writeShards) { + _transactionEngine->_seqId[writeShard] = newSeqId; + } + _transactionEngine->_latestSeqId.store(newSeqId); + } + // cleanup + _snapshotSeqId = newSeqId; + _writeShards.clear(); + } + + bool RocksTransaction::registerWrite(uint64_t hash) { + uint64_t shard = hash % RocksTransactionEngine::kNumSeqIdShards; + + boost::mutex::scoped_lock lk(_transactionEngine->_commitLock); + if (_transactionEngine->_seqId[shard] > _snapshotSeqId) { + // write-committed write conflict + return false; + } + if (_transactionEngine->_uncommittedTransactionId[shard] != 0 && + _transactionEngine->_uncommittedTransactionId[shard] != _transactionId) { + // write-uncommitted write conflict + return false; + } + _writeShards.insert(shard); + _transactionEngine->_uncommittedTransactionId[shard] = _transactionId; + return true; + } + + void RocksTransaction::abort() { + if (_writeShards.empty()) { + return; + } + { + boost::mutex::scoped_lock lk(_transactionEngine->_commitLock); + for (auto writeShard : _writeShards) { + _transactionEngine->_uncommittedTransactionId[writeShard] = 0; + } + } + _writeShards.clear(); + } + + void RocksTransaction::recordSnapshotId() { + _snapshotSeqId = _transactionEngine->getLatestSeqId(); + } +} |