summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-05-05 15:56:55 -0400
committerMathias Stearn <mathias@10gen.com>2016-05-09 17:52:12 -0400
commit955304d50df6c94bb10e757736e531ce91627d23 (patch)
tree0220a3476219d5dcdb20b15b5ca836a19c24b9eb /src/mongo
parent0257f34483b0dee3691b8bd4a6715ac33c250b67 (diff)
downloadmongo-955304d50df6c94bb10e757736e531ce91627d23.tar.gz
SERVER-24005 Allow getNextOpTime() to hand out batches of OpTimes
This allows a lot of expensive work that is done inside of a mutex to be done once per batch rather than once per document.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/global_timestamp.cpp47
-rw-r--r--src/mongo/db/global_timestamp.h3
-rw-r--r--src/mongo/db/repl/oplog.cpp117
3 files changed, 82 insertions, 85 deletions
diff --git a/src/mongo/db/global_timestamp.cpp b/src/mongo/db/global_timestamp.cpp
index d380e2a4f05..bd082cb30d1 100644
--- a/src/mongo/db/global_timestamp.cpp
+++ b/src/mongo/db/global_timestamp.cpp
@@ -37,49 +37,54 @@
namespace mongo {
namespace {
+// This is the value of the next timestamp to handed out.
AtomicUInt64 globalTimestamp(0);
} // namespace
void setGlobalTimestamp(const Timestamp& newTime) {
- globalTimestamp.store(newTime.asULL());
+ globalTimestamp.store(newTime.asULL() + 1);
}
Timestamp getLastSetTimestamp() {
- return Timestamp(globalTimestamp.load());
+ return Timestamp(globalTimestamp.load() - 1);
}
-Timestamp getNextGlobalTimestamp() {
+Timestamp getNextGlobalTimestamp(unsigned count) {
const unsigned now = durationCount<Seconds>(
getGlobalServiceContext()->getFastClockSource()->now().toDurationSinceEpoch());
+ invariant(now != 0); // This is a sentinel value for null Timestamps.
+ invariant(count != 0);
// Optimistic approach: just increment the timestamp, assuming the seconds still match.
- auto next = globalTimestamp.addAndFetch(1);
- unsigned globalSecs = Timestamp(next).getSecs();
+ auto first = globalTimestamp.fetchAndAdd(count);
+ auto currentTimestamp = first + count; // What we just set it to.
+ unsigned globalSecs = Timestamp(currentTimestamp).getSecs();
// Fail if time is not moving forward for 2**31 calls to getNextGlobalTimestamp.
- if (globalSecs > now && Timestamp(next).getInc() >= 1U << 31) {
- mongo::warning() << "clock skew detected, prev: " << Timestamp(next).getSecs()
- << " now: " << now << std::endl;
+ if (MONGO_unlikely(globalSecs > now) && Timestamp(currentTimestamp).getInc() >= 1U << 31) {
+ mongo::severe() << "clock skew detected, prev: " << globalSecs << " now: " << now;
fassertFailed(17449);
}
- // While the seconds need to be updated, try to do it.
- while (globalSecs < now) {
- const auto expected = next;
- const auto desired = Timestamp(now, 1).asULL();
+ // If the seconds need to be updated, try to do it. This can happen at most once per second.
+ if (MONGO_unlikely(globalSecs < now)) {
+ // First fix the seconds portion.
+ while (globalSecs < now) {
+ const auto desired = Timestamp(now, 1).asULL();
- // If the compareAndSwap was not successful, assume someone else updated the seconds.
- auto actual = globalTimestamp.compareAndSwap(expected, desired);
- if (actual == expected) {
- next = desired;
- } else {
- next = globalTimestamp.addAndFetch(1);
+ auto actual = globalTimestamp.compareAndSwap(currentTimestamp, desired);
+ if (actual == currentTimestamp)
+ break; // We successfully set the secs, so we're done here.
+
+ // We raced with someone else. Try again, unless they fixed the secs field for us.
+ currentTimestamp = actual;
+ globalSecs = Timestamp(currentTimestamp).getSecs();
}
- // Either way, the seconds should no longer be less than now, but repeat if we raced.
- globalSecs = Timestamp(next).getSecs();
+ // Now reserve our timestamps with the new value of secs.
+ first = globalTimestamp.fetchAndAdd(count);
}
- return Timestamp(next);
+ return Timestamp(first);
}
} // namespace mongo
diff --git a/src/mongo/db/global_timestamp.h b/src/mongo/db/global_timestamp.h
index 64d1ce551e6..9c106147b6a 100644
--- a/src/mongo/db/global_timestamp.h
+++ b/src/mongo/db/global_timestamp.h
@@ -40,6 +40,7 @@ Timestamp getLastSetTimestamp();
/**
* Generates a new and unique Timestamp.
+ * If count > 1 that many unique Timestamps are reserved starting with the returned value.
*/
-Timestamp getNextGlobalTimestamp();
+Timestamp getNextGlobalTimestamp(unsigned count = 1);
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index ca0620ab203..2566562b1da 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -129,7 +129,7 @@ void checkOplogInsert(Status result) {
struct OplogSlot {
OpTime opTime;
- int64_t hash;
+ int64_t hash = 0;
};
/**
@@ -142,14 +142,13 @@ struct OplogSlot {
* function registers the new optime with the storage system and the replication coordinator,
* and provides no facility to revert those registrations on rollback.
*/
-OplogSlot getNextOpTime(OperationContext* txn,
- Collection* oplog,
- ReplicationCoordinator* replCoord,
- ReplicationCoordinator::Mode replicationMode) {
+void getNextOpTime(OperationContext* txn,
+ Collection* oplog,
+ ReplicationCoordinator* replCoord,
+ ReplicationCoordinator::Mode replicationMode,
+ unsigned count,
+ OplogSlot* slotsOut) {
synchronizeOnCappedInFlightResource(txn->lockState(), oplog->ns());
-
- OplogSlot slot;
- slot.hash = 0;
long long term = OpTime::kUninitializedTerm;
// Fetch term out of the newOpMutex.
@@ -160,18 +159,19 @@ OplogSlot getNextOpTime(OperationContext* txn,
}
stdx::lock_guard<stdx::mutex> lk(newOpMutex);
- Timestamp ts = getNextGlobalTimestamp();
+ Timestamp ts = getNextGlobalTimestamp(count);
newTimestampNotifier.notify_all();
fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(txn, ts));
// Set hash if we're in replset mode, otherwise it remains 0 in master/slave.
- if (replicationMode == ReplicationCoordinator::modeReplSet) {
- slot.hash = hashGenerator.nextInt64();
+ const bool needHash = (replicationMode == ReplicationCoordinator::modeReplSet);
+ for (unsigned i = 0; i < count; i++) {
+ slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term};
+ if (needHash) {
+ slotsOut[i].hash = hashGenerator.nextInt64();
+ }
}
-
- slot.opTime = OpTime(ts, term);
- return slot;
}
/**
@@ -180,9 +180,10 @@ OplogSlot getNextOpTime(OperationContext* txn,
* which can be very large
* TODO: can have this build the entire doc
*/
-class OplogDocWriter : public DocWriter {
+class OplogDocWriter final : public DocWriter {
public:
- OplogDocWriter(const BSONObj& frame, const BSONObj& oField) : _frame(frame), _oField(oField) {}
+ OplogDocWriter(BSONObj frame, BSONObj oField)
+ : _frame(std::move(frame)), _oField(std::move(oField)) {}
void writeDocument(char* start) const {
char* buf = start;
@@ -211,22 +212,6 @@ private:
BSONObj _oField;
};
-class UpdateReplOpTimeChange : public RecoveryUnit::Change {
-public:
- UpdateReplOpTimeChange(OpTime newOpTime, ReplicationCoordinator* replCoord)
- : _newOpTime(newOpTime), _replCoord(replCoord) {}
-
- virtual void commit() {
- _replCoord->setMyLastAppliedOpTimeForward(_newOpTime);
- }
-
- virtual void rollback() {}
-
-private:
- const OpTime _newOpTime;
- ReplicationCoordinator* _replCoord;
-};
-
} // namespace
void setOplogCollectionName() {
@@ -277,14 +262,14 @@ bool oplogDisabled(OperationContext* txn,
return false;
}
-unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn,
- const char* opstr,
- const NamespaceString& nss,
- const BSONObj& obj,
- const BSONObj* o2,
- bool fromMigrate,
- OpTime optime,
- long long hashNew) {
+OplogDocWriter _logOpWriter(OperationContext* txn,
+ const char* opstr,
+ const NamespaceString& nss,
+ const BSONObj& obj,
+ const BSONObj* o2,
+ bool fromMigrate,
+ OpTime optime,
+ long long hashNew) {
BSONObjBuilder b(256);
b.append("ts", optime.getTimestamp());
@@ -299,7 +284,7 @@ unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn,
if (o2)
b.append("o2", *o2);
- return stdx::make_unique<OplogDocWriter>(OplogDocWriter(b.obj(), obj));
+ return OplogDocWriter(OplogDocWriter(b.obj(), obj));
}
} // end anon namespace
@@ -373,10 +358,9 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
used for "justOne" or "upsert" flags on 'd', 'u'
*/
void _logOpsInner(OperationContext* txn,
- const char* opstr,
const NamespaceString& nss,
- const vector<unique_ptr<OplogDocWriter>>& writers,
- bool fromMigrate,
+ const DocWriter* const* writers,
+ size_t nWriters,
Collection* oplogCollection,
ReplicationCoordinator::Mode replicationMode,
bool updateReplOpTime,
@@ -391,12 +375,14 @@ void _logOpsInner(OperationContext* txn,
// we jump through a bunch of hoops here to avoid copying the obj buffer twice --
// instead we do a single copy to the destination in the record store.
- for (auto it = writers.begin(); it != writers.end(); it++)
- checkOplogInsert(oplogCollection->insertDocument(txn, it->get(), false));
+ for (size_t i = 0; i < nWriters; i++)
+ checkOplogInsert(oplogCollection->insertDocument(txn, writers[i], false));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- if (updateReplOpTime)
- txn->recoveryUnit()->registerChange(new UpdateReplOpTimeChange(finalOpTime, replCoord));
+ if (updateReplOpTime) {
+ txn->recoveryUnit()->onCommit(
+ [replCoord, finalOpTime] { replCoord->setMyLastAppliedOpTimeForward(finalOpTime); });
+ }
ReplClientInfo::forClient(txn->getClient()).setLastOp(finalOpTime);
}
@@ -415,14 +401,14 @@ void _logOp(OperationContext* txn,
return;
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- vector<unique_ptr<OplogDocWriter>> writers;
Collection* oplog = getLocalOplogCollection(txn, oplogName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lock(txn->lockState(), oplogName, MODE_IX);
- auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
+ OplogSlot slot;
+ getNextOpTime(txn, oplog, replCoord, replMode, 1, &slot);
auto writer = _logOpWriter(txn, opstr, nss, obj, o2, fromMigrate, slot.opTime, slot.hash);
- writers.emplace_back(std::move(writer));
- _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, updateOpTime, slot.opTime);
+ const DocWriter* basePtr = &writer;
+ _logOpsInner(txn, nss, &basePtr, 1, oplog, replMode, updateOpTime, slot.opTime);
}
void logOps(OperationContext* txn,
@@ -431,26 +417,31 @@ void logOps(OperationContext* txn,
std::vector<BSONObj>::const_iterator begin,
std::vector<BSONObj>::const_iterator end,
bool fromMigrate) {
- ReplicationCoordinator::Mode replMode = ReplicationCoordinator::get(txn)->getReplicationMode();
+ ReplicationCoordinator* replCoord = ReplicationCoordinator::get(txn);
+ ReplicationCoordinator::Mode replMode = replCoord->getReplicationMode();
invariant(begin != end);
if (oplogDisabled(txn, replMode, nss))
return;
- vector<unique_ptr<OplogDocWriter>> writers;
- writers.reserve(end - begin);
- OpTime finalOpTime;
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ const size_t count = end - begin;
+ std::vector<OplogDocWriter> writers;
+ writers.reserve(count);
Collection* oplog = getLocalOplogCollection(txn, _oplogCollectionName);
Lock::DBLock lk(txn->lockState(), "local", MODE_IX);
Lock::CollectionLock lock(txn->lockState(), _oplogCollectionName, MODE_IX);
- for (auto it = begin; it != end; it++) {
- auto slot = getNextOpTime(txn, oplog, replCoord, replMode);
- finalOpTime = slot.opTime;
- auto writer = _logOpWriter(txn, opstr, nss, *it, NULL, fromMigrate, slot.opTime, slot.hash);
- writers.emplace_back(std::move(writer));
+ std::unique_ptr<OplogSlot[]> slots(new OplogSlot[count]);
+ getNextOpTime(txn, oplog, replCoord, replMode, count, slots.get());
+ for (size_t i = 0; i < count; i++) {
+ writers.emplace_back(_logOpWriter(
+ txn, opstr, nss, begin[i], NULL, fromMigrate, slots[i].opTime, slots[i].hash));
+ }
+
+ std::unique_ptr<DocWriter const* []> basePtrs(new DocWriter const* [count]);
+ for (size_t i = 0; i < count; i++) {
+ basePtrs[i] = &writers[i];
}
- _logOpsInner(txn, opstr, nss, writers, fromMigrate, oplog, replMode, true, finalOpTime);
+ _logOpsInner(txn, nss, basePtrs.get(), count, oplog, replMode, true, slots[count - 1].opTime);
}