summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/rocks/rocks_transaction.cpp
diff options
context:
space:
mode:
authorIgor Canadi <icanadi@fb.com>2014-11-24 14:28:10 -0800
committerMatt Kangas <matt.kangas@mongodb.com>2014-12-02 10:44:56 -0500
commitefb33fd38bd0e8733d14f657e7fa634880ec8f1d (patch)
tree6b46b12e434cdca4babb2e3efafa100b39b1fb6c /src/mongo/db/storage/rocks/rocks_transaction.cpp
parent11b2d7b6051766674c2d66c86d9f23d0befdee09 (diff)
downloadmongo-efb33fd38bd0e8733d14f657e7fa634880ec8f1d.tar.gz
SERVER-16259 Rocks storage engine needs to support document locking
SERVER-16309 - Fix rocks_recovery_unit SERVER-15744 Concurrency fix -- Fixed jstests/core/bench_test3.js Signed-off-by: Matt Kangas <matt.kangas@mongodb.com>
Diffstat (limited to 'src/mongo/db/storage/rocks/rocks_transaction.cpp')
-rw-r--r--src/mongo/db/storage/rocks/rocks_transaction.cpp84
1 files changed, 84 insertions, 0 deletions
diff --git a/src/mongo/db/storage/rocks/rocks_transaction.cpp b/src/mongo/db/storage/rocks/rocks_transaction.cpp
new file mode 100644
index 00000000000..3d27b5ef1ec
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_transaction.cpp
@@ -0,0 +1,84 @@
+// rocks_transaction.cpp
+
+/*
+ * TODO(mongo) Add licence header
+ */
+
+#include "mongo/db/storage/rocks/rocks_transaction.h"
+
+#include <atomic>
+#include <map>
+#include <memory>
+#include <string>
+
+// for invariant()
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+ RocksTransactionEngine::RocksTransactionEngine() : _latestSeqId(1) {
+ for (size_t i = 0; i < kNumSeqIdShards; ++i) {
+ _seqId[i] = 0;
+ _uncommittedTransactionId[i] = 0;
+ }
+ }
+
+ void RocksTransaction::commit() {
+ if (_writeShards.empty()) {
+ return;
+ }
+ uint64_t newSeqId = 0;
+ {
+ boost::mutex::scoped_lock lk(_transactionEngine->_commitLock);
+ for (auto writeShard : _writeShards) {
+ invariant(_transactionEngine->_seqId[writeShard] <= _snapshotSeqId);
+ invariant(_transactionEngine->_uncommittedTransactionId[writeShard] ==
+ _transactionId);
+ _transactionEngine->_uncommittedTransactionId[writeShard] = 0;
+ }
+ newSeqId =
+ _transactionEngine->_latestSeqId.load(std::memory_order::memory_order_relaxed) + 1;
+ for (auto writeShard : _writeShards) {
+ _transactionEngine->_seqId[writeShard] = newSeqId;
+ }
+ _transactionEngine->_latestSeqId.store(newSeqId);
+ }
+ // cleanup
+ _snapshotSeqId = newSeqId;
+ _writeShards.clear();
+ }
+
+ bool RocksTransaction::registerWrite(uint64_t hash) {
+ uint64_t shard = hash % RocksTransactionEngine::kNumSeqIdShards;
+
+ boost::mutex::scoped_lock lk(_transactionEngine->_commitLock);
+ if (_transactionEngine->_seqId[shard] > _snapshotSeqId) {
+ // write-committed write conflict
+ return false;
+ }
+ if (_transactionEngine->_uncommittedTransactionId[shard] != 0 &&
+ _transactionEngine->_uncommittedTransactionId[shard] != _transactionId) {
+ // write-uncommitted write conflict
+ return false;
+ }
+ _writeShards.insert(shard);
+ _transactionEngine->_uncommittedTransactionId[shard] = _transactionId;
+ return true;
+ }
+
+ void RocksTransaction::abort() {
+ if (_writeShards.empty()) {
+ return;
+ }
+ {
+ boost::mutex::scoped_lock lk(_transactionEngine->_commitLock);
+ for (auto writeShard : _writeShards) {
+ _transactionEngine->_uncommittedTransactionId[writeShard] = 0;
+ }
+ }
+ _writeShards.clear();
+ }
+
+ void RocksTransaction::recordSnapshotId() {
+ _snapshotSeqId = _transactionEngine->getLatestSeqId();
+ }
+}