summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2014-11-04 13:13:38 -0500
committerMathias Stearn <mathias@10gen.com>2014-11-04 14:13:14 -0500
commitd5f6eb21a94a39bd39c3c7a7b0ac107aca33a7e4 (patch)
treed34aff684828b74bbccd4f83aa4310f35e8391d2
parentf0bafc7c171217b7541d337723c6390a793be359 (diff)
downloadmongo-d5f6eb21a94a39bd39c3c7a7b0ac107aca33a7e4.tar.gz
SERVER-15948 Fast OplogStart impl for RSs supporting arbitrary DiskLocs
Storage engines must opt-in to this functionality. For now, heap1 is the only one to do so.
-rw-r--r--jstests/core/query_oplogreplay.js92
-rw-r--r--jstests/repl/master1.js6
-rw-r--r--src/mongo/db/commands/rename_collection.cpp6
-rw-r--r--src/mongo/db/diskloc.h13
-rw-r--r--src/mongo/db/namespace_string-inl.h2
-rw-r--r--src/mongo/db/query/new_find.cpp65
-rw-r--r--src/mongo/db/storage/SConscript10
-rw-r--r--src/mongo/db/storage/heap1/SConscript1
-rw-r--r--src/mongo/db/storage/heap1/heap1_engine.cpp4
-rw-r--r--src/mongo/db/storage/heap1/record_store_heap.cpp59
-rw-r--r--src/mongo/db/storage/heap1/record_store_heap.h8
-rw-r--r--src/mongo/db/storage/mmap_v1/btree/btree_ondisk.cpp6
-rw-r--r--src/mongo/db/storage/mmap_v1/btree/btree_ondisk.h4
-rw-r--r--src/mongo/db/storage/record_store.h13
14 files changed, 208 insertions, 81 deletions
diff --git a/jstests/core/query_oplogreplay.js b/jstests/core/query_oplogreplay.js
index ab291af2be1..1ae9be9071f 100644
--- a/jstests/core/query_oplogreplay.js
+++ b/jstests/core/query_oplogreplay.js
@@ -1,45 +1,57 @@
// Test queries that set the OplogReplay flag.
-var t = db.jstests_query_oplogreplay;
-t.drop();
+function test(t) {
+ t.drop();
+ assert.commandWorked(t.getDB().createCollection(t.getName(), {capped: true, size: 16*1024}));
-for (var i = 0; i < 100; i++) {
- t.save({_id: i, ts: i});
+ function makeTS(i) {
+ return Timestamp(1000, i)
+ }
+
+ for (var i = 0; i < 100; i++) {
+ t.save({_id: i, ts: makeTS(i)});
+ }
+
+ // Missing 'ts' field.
+ assert.throws(function() {
+ t.find().addOption(DBQuery.Option.oplogReplay).next();
+ });
+ assert.throws(function() {
+ t.find({_id: 3}).addOption(DBQuery.Option.oplogReplay).next();
+ });
+
+ // 'ts' field is not top-level.
+ assert.throws(function() {
+ t.find({$or: [{ts: {$gt: makeTS(3)}}, {foo: 3}]})
+ .addOption(DBQuery.Option.oplogReplay).next();
+ });
+ assert.throws(function() {
+ t.find({$nor: [{ts: {$gt: makeTS(4)}}, {foo: 4}]})
+ .addOption(DBQuery.Option.oplogReplay).next();
+ });
+
+ // Predicate over 'ts' is not $gt or $gte.
+ assert.throws(function() {
+ t.find({ts: {$lt: makeTS(4)}}).addOption(DBQuery.Option.oplogReplay).next();
+ });
+ assert.throws(function() {
+ t.find({ts: {$lt: makeTS(4)}, _id: 3}).addOption(DBQuery.Option.oplogReplay).next();
+ });
+
+ // Query on just the 'ts' field.
+ var cursor = t.find({ts: {$gt: makeTS(20)}}).addOption(DBQuery.Option.oplogReplay);
+ assert.eq(21, cursor.next()["_id"]);
+ assert.eq(22, cursor.next()["_id"]);
+
+ // Query over both 'ts' and '_id' should only pay attention to the 'ts'
+ // field for finding the oplog start (SERVER-13566).
+ cursor = t.find({ts: {$gte: makeTS(20)}, _id: 25}).addOption(DBQuery.Option.oplogReplay);
+ assert.eq(25, cursor.next()["_id"]);
+ assert(!cursor.hasNext());
}
-// Missing 'ts' field.
-assert.throws(function() {
- t.find().addOption(DBQuery.Option.oplogReplay).next();
-});
-assert.throws(function() {
- t.find({_id: 3}).addOption(DBQuery.Option.oplogReplay).next();
-});
-
-// 'ts' field is not top-level.
-assert.throws(function() {
- t.find({$or: [{ts: {$gt: 3}}, {foo: 3}]})
- .addOption(DBQuery.Option.oplogReplay).next();
-});
-assert.throws(function() {
- t.find({$nor: [{ts: {$gt: 4}}, {foo: 4}]})
- .addOption(DBQuery.Option.oplogReplay).next();
-});
-
-// Predicate over 'ts' is not $gt or $gte.
-assert.throws(function() {
- t.find({ts: {$lt: 4}}).addOption(DBQuery.Option.oplogReplay).next();
-});
-assert.throws(function() {
- t.find({ts: {$lt: 4}, _id: 3}).addOption(DBQuery.Option.oplogReplay).next();
-});
-
-// Query on just the 'ts' field.
-var cursor = t.find({ts: {$gt: 20}}).addOption(DBQuery.Option.oplogReplay);
-assert.eq(21, cursor.next()["_id"]);
-assert.eq(22, cursor.next()["_id"]);
-
-// Query over both 'ts' and '_id' should only pay attention to the 'ts'
-// field for finding the oplog start (SERVER-13566).
-cursor = t.find({ts: {$gte: 20}, _id: 25}).addOption(DBQuery.Option.oplogReplay);
-assert.eq(25, cursor.next()["_id"]);
-assert(!cursor.hasNext());
+// test on non-oplog
+test(db.jstests_query_oplogreplay);
+
+// test on real oplog
+test(db.getSiblingDB('local').oplog.jstests_query_oplogreplay);
diff --git a/jstests/repl/master1.js b/jstests/repl/master1.js
index 49b3416d202..565c2ca98e4 100644
--- a/jstests/repl/master1.js
+++ b/jstests/repl/master1.js
@@ -24,7 +24,7 @@ assert.eq( "i", lastop().op );
op = lastop();
printjson( op );
op.ts.t = op.ts.t + 600000 // 10 minutes
-m.getDB( "local" ).runCommand( {godinsert:"oplog.$main", obj:op} );
+assert.commandWorked(m.getDB( "local" ).runCommand( {godinsert:"oplog.$main", obj:op} ));
rt.stop( true );
m = rt.start( true, null, true );
@@ -36,9 +36,9 @@ assert.eq( op.ts.i + 1, lastop().ts.i );
op = lastop();
printjson( op );
-op.ts.i = Math.pow(2,31);
+op.ts.i = Math.pow(2,31)-1;
printjson( op );
-m.getDB( "local" ).runCommand( {godinsert:"oplog.$main", obj:op} );
+assert.commandWorked(m.getDB( "local" ).runCommand( {godinsert:"oplog.$main", obj:op} ));
rt.stop( true );
m = rt.start( true, null, true );
diff --git a/src/mongo/db/commands/rename_collection.cpp b/src/mongo/db/commands/rename_collection.cpp
index 0d159cac2b6..6eafd2e9430 100644
--- a/src/mongo/db/commands/rename_collection.cpp
+++ b/src/mongo/db/commands/rename_collection.cpp
@@ -127,6 +127,12 @@ namespace mongo {
return false;
}
+ if (NamespaceString::oplog(source) != NamespaceString::oplog(target)) {
+ errmsg =
+ "If either the source or target of a rename is an oplog name, both must be";
+ return false;
+ }
+
if (!fromRepl) { // If it got through on the master, need to allow it here too
Status sourceStatus = userAllowedWriteNS(source);
if (!sourceStatus.isOK()) {
diff --git a/src/mongo/db/diskloc.h b/src/mongo/db/diskloc.h
index d043dc40b00..081fdf563b8 100644
--- a/src/mongo/db/diskloc.h
+++ b/src/mongo/db/diskloc.h
@@ -138,9 +138,6 @@ namespace mongo {
return x;
return ofs - b.ofs;
}
- bool operator<(const DiskLoc& b) const {
- return compare(b) < 0;
- }
/**
* Hash value for this disk location. The hash implementation may be modified, and its
@@ -161,6 +158,11 @@ namespace mongo {
};
#pragma pack()
+ inline bool operator< (const DiskLoc& rhs, const DiskLoc& lhs) { return rhs.compare(lhs) < 0; }
+ inline bool operator<=(const DiskLoc& rhs, const DiskLoc& lhs) { return rhs.compare(lhs) <= 0; }
+ inline bool operator> (const DiskLoc& rhs, const DiskLoc& lhs) { return rhs.compare(lhs) > 0; }
+ inline bool operator>=(const DiskLoc& rhs, const DiskLoc& lhs) { return rhs.compare(lhs) >= 0; }
+
inline size_t DiskLoc::Hasher::operator()( DiskLoc loc ) const {
size_t hash = 0;
boost::hash_combine(hash, loc.a());
@@ -176,10 +178,9 @@ namespace mongo {
// headers must precede Records in a file.
const DiskLoc minDiskLoc(0, 0);
- // Maximum allowed DiskLoc. Note that only three bytes are used to represent the file number
- // for consistency with the v1 index DiskLoc storage format, which uses only 7 bytes total.
+ // Maximum allowed DiskLoc.
// No Record may begin at this location because the minimum size of a Record is larger than one
// byte. Also, the last bit is not able to be used because mmapv1 uses that for "used".
- const DiskLoc maxDiskLoc(0x00ffffff, 0x7ffffffe);
+ const DiskLoc maxDiskLoc(0x7fffffff, 0x7ffffffe);
} // namespace mongo
diff --git a/src/mongo/db/namespace_string-inl.h b/src/mongo/db/namespace_string-inl.h
index 597d9a93159..e5f9906b875 100644
--- a/src/mongo/db/namespace_string-inl.h
+++ b/src/mongo/db/namespace_string-inl.h
@@ -50,7 +50,7 @@ namespace mongo {
}
inline bool NamespaceString::oplog(const StringData& ns) {
- return ns == "local.oplog.rs" || ns == "local.oplog.$main";
+ return ns.startsWith("local.oplog.");
}
inline bool NamespaceString::special(const StringData& ns) {
diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp
index f546dc4db19..8474f796daf 100644
--- a/src/mongo/db/query/new_find.cpp
+++ b/src/mongo/db/query/new_find.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/db/storage/oplog_hack.h"
#include "mongo/db/storage_options.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/s/chunk_version.h"
@@ -105,6 +106,11 @@ namespace {
return mongoutils::str::equals(me->path().rawData(), "ts");
}
+ mongo::BSONElement extractOplogTsOptime(const mongo::MatchExpression* me) {
+ invariant(isOplogTsPred(me));
+ return static_cast<const mongo::ComparisonMatchExpression*>(me)->getData();
+ }
+
} // namespace
namespace mongo {
@@ -451,31 +457,48 @@ namespace mongo {
"$gt or $gte over the 'ts' field.");
}
- // Make an oplog start finding stage.
- WorkingSet* oplogws = new WorkingSet();
- OplogStart* stage = new OplogStart(txn, collection, tsExpr, oplogws);
-
- PlanExecutor* rawExec;
- // Takes ownership of ws and stage.
- Status execStatus = PlanExecutor::make(txn, oplogws, stage, collection,
- PlanExecutor::YIELD_AUTO, &rawExec);
- invariant(execStatus.isOK());
- scoped_ptr<PlanExecutor> exec(rawExec);
+ DiskLoc startLoc = DiskLoc().setInvalid();
- // The stage returns a DiskLoc of where to start.
- DiskLoc startLoc;
- PlanExecutor::ExecState state = exec->getNext(NULL, &startLoc);
+ // See if the RecordStore supports the oplogStartHack
+ const BSONElement tsElem = extractOplogTsOptime(tsExpr);
+ if (tsElem.type() == Timestamp) {
+ StatusWith<DiskLoc> goal = oploghack::keyForOptime(tsElem._opTime());
+ if (goal.isOK()) {
+ startLoc = collection->getRecordStore()->oplogStartHack(txn, goal.getValue());
+ }
+ }
- // This is normal. The start of the oplog is the beginning of the collection.
- if (PlanExecutor::IS_EOF == state) {
- return getExecutor(txn, collection, autoCq.release(), PlanExecutor::YIELD_AUTO,
- execOut);
+ if (startLoc.isValid()) {
+ LOG(3) << "Using direct oplog seek";
}
+ else {
+ LOG(3) << "Using OplogStart stage";
+
+ // Fallback to trying the OplogStart stage.
+ WorkingSet* oplogws = new WorkingSet();
+ OplogStart* stage = new OplogStart(txn, collection, tsExpr, oplogws);
+ PlanExecutor* rawExec;
+
+ // Takes ownership of oplogws and stage.
+ Status execStatus = PlanExecutor::make(txn, oplogws, stage, collection,
+ PlanExecutor::YIELD_AUTO, &rawExec);
+ invariant(execStatus.isOK());
+ scoped_ptr<PlanExecutor> exec(rawExec);
+
+ // The stage returns a DiskLoc of where to start.
+ PlanExecutor::ExecState state = exec->getNext(NULL, &startLoc);
+
+ // This is normal. The start of the oplog is the beginning of the collection.
+ if (PlanExecutor::IS_EOF == state) {
+ return getExecutor(txn, collection, autoCq.release(), PlanExecutor::YIELD_AUTO,
+ execOut);
+ }
- // This is not normal. An error was encountered.
- if (PlanExecutor::ADVANCED != state) {
- return Status(ErrorCodes::InternalError,
- "quick oplog start location had error...?");
+ // This is not normal. An error was encountered.
+ if (PlanExecutor::ADVANCED != state) {
+ return Status(ErrorCodes::InternalError,
+ "quick oplog start location had error...?");
+ }
}
// cout << "diskloc is " << startLoc.toString() << endl;
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 3bfcb27e074..bcadf435c27 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -17,6 +17,16 @@ env.Library(
)
env.Library(
+ target='oplog_hack',
+ source=[
+ 'oplog_hack.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/bson',
+ ]
+ )
+
+env.Library(
target='sorted_data_interface_test_harness',
source=[
'sorted_data_interface_test_bulkbuilder.cpp',
diff --git a/src/mongo/db/storage/heap1/SConscript b/src/mongo/db/storage/heap1/SConscript
index f0b193122b8..cf6f0f6a965 100644
--- a/src/mongo/db/storage/heap1/SConscript
+++ b/src/mongo/db/storage/heap1/SConscript
@@ -7,6 +7,7 @@ env.Library(
],
LIBDEPS= [
'$BUILD_DIR/mongo/bson',
+ '$BUILD_DIR/mongo/db/storage/oplog_hack',
'$BUILD_DIR/mongo/foundation',
]
)
diff --git a/src/mongo/db/storage/heap1/heap1_engine.cpp b/src/mongo/db/storage/heap1/heap1_engine.cpp
index 05e15b2f16a..c4f5b36a77d 100644
--- a/src/mongo/db/storage/heap1/heap1_engine.cpp
+++ b/src/mongo/db/storage/heap1/heap1_engine.cpp
@@ -55,14 +55,14 @@ namespace mongo {
const CollectionOptions& options) {
boost::mutex::scoped_lock lk(_mutex);
if (options.capped) {
- return new HeapRecordStore(ident,
+ return new HeapRecordStore(ns,
&_dataMap[ident],
true,
options.cappedSize ? options.cappedSize : 4096,
options.cappedMaxDocs ? options.cappedMaxDocs : -1);
}
else {
- return new HeapRecordStore(ident, &_dataMap[ident]);
+ return new HeapRecordStore(ns, &_dataMap[ident]);
}
}
diff --git a/src/mongo/db/storage/heap1/record_store_heap.cpp b/src/mongo/db/storage/heap1/record_store_heap.cpp
index 488466ceb47..0eb2172b786 100644
--- a/src/mongo/db/storage/heap1/record_store_heap.cpp
+++ b/src/mongo/db/storage/heap1/record_store_heap.cpp
@@ -32,8 +32,10 @@
#include "mongo/db/storage/heap1/record_store_heap.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/oplog_hack.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/util/log.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -93,8 +95,8 @@ namespace mongo {
_cappedMaxSize(cappedMaxSize),
_cappedMaxDocs(cappedMaxDocs),
_cappedDeleteCallback(cappedDeleteCallback),
- _data(*dataInOut ? static_cast<Data*>(dataInOut->get()) : new Data()) {
-
+ _data(*dataInOut ? static_cast<Data*>(dataInOut->get())
+ : new Data(NamespaceString::oplog(ns))) {
if (!*dataInOut) {
dataInOut->reset(_data); // takes ownership
}
@@ -176,6 +178,18 @@ namespace mongo {
}
}
+ StatusWith<DiskLoc> HeapRecordStore::extractAndCheckLocForOplog(const char* data,
+ int len) const {
+ StatusWith<DiskLoc> status = oploghack::extractKey(data, len);
+ if (!status.isOK())
+ return status;
+
+ if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first)
+ return StatusWith<DiskLoc>(ErrorCodes::BadValue, "ts not higher than highest");
+
+ return status;
+ }
+
StatusWith<DiskLoc> HeapRecordStore::insertRecord(OperationContext* txn,
const char* data,
int len,
@@ -190,7 +204,17 @@ namespace mongo {
HeapRecord rec(len);
memcpy(rec.data.get(), data, len);
- const DiskLoc loc = allocateLoc();
+ DiskLoc loc;
+ if (_data->isOplog) {
+ StatusWith<DiskLoc> status = extractAndCheckLocForOplog(data, len);
+ if (!status.isOK())
+ return status;
+ loc = status.getValue();
+ }
+ else {
+ loc = allocateLoc();
+ }
+
txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
_data->dataSize += len;
_data->records[loc] = rec;
@@ -214,7 +238,17 @@ namespace mongo {
HeapRecord rec(len);
doc->writeDocument(rec.data.get());
- const DiskLoc loc = allocateLoc();
+ DiskLoc loc;
+ if (_data->isOplog) {
+ StatusWith<DiskLoc> status = extractAndCheckLocForOplog(rec.data.get(), len);
+ if (!status.isOK())
+ return status;
+ loc = status.getValue();
+ }
+ else {
+ loc = allocateLoc();
+ }
+
txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
_data->dataSize += len;
_data->records[loc] = rec;
@@ -413,6 +447,23 @@ namespace mongo {
return DiskLoc(int(id >> 30), int((id << 1) & ~(1<<31)));
}
+ DiskLoc HeapRecordStore::oplogStartHack(OperationContext* txn,
+ const DiskLoc& startingPosition) const {
+ if (!_data->isOplog)
+ return DiskLoc().setInvalid();
+
+ const Records& records = _data->records;
+
+ if (records.empty())
+ return DiskLoc();
+
+ Records::const_iterator it = records.lower_bound(startingPosition);
+ if (it == records.end() || it->first > startingPosition)
+ --it;
+
+ return it->first;
+ }
+
//
// Forward Iterator
//
diff --git a/src/mongo/db/storage/heap1/record_store_heap.h b/src/mongo/db/storage/heap1/record_store_heap.h
index 57f0c0abb5c..4a01dbacd7f 100644
--- a/src/mongo/db/storage/heap1/record_store_heap.h
+++ b/src/mongo/db/storage/heap1/record_store_heap.h
@@ -129,6 +129,9 @@ namespace mongo {
virtual long long numRecords( OperationContext* txn ) const { return _data->records.size(); }
+ virtual DiskLoc oplogStartHack(OperationContext* txn,
+ const DiskLoc& startingPosition) const;
+
protected:
struct HeapRecord {
HeapRecord() :size(0) {}
@@ -159,6 +162,8 @@ namespace mongo {
class InsertChange;
class RemoveChange;
+ StatusWith<DiskLoc> extractAndCheckLocForOplog(const char* data, int len) const;
+
DiskLoc allocateLoc();
bool cappedAndNeedDelete(OperationContext* txn) const;
void cappedDeleteAsNeeded(OperationContext* txn);
@@ -171,11 +176,12 @@ namespace mongo {
// This is the "persistant" data.
struct Data {
- Data() :dataSize(0), nextId(1) {}
+ Data(bool isOplog) :dataSize(0), nextId(1), isOplog(isOplog) {}
int64_t dataSize;
Records records;
int64_t nextId;
+ const bool isOplog;
};
Data* const _data;
diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.cpp
index b9389a6e3c1..717e5bcb37b 100644
--- a/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.cpp
+++ b/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.cpp
@@ -40,7 +40,11 @@ namespace mongo {
void DiskLoc56Bit::operator=(const DiskLoc& loc) {
ofs = loc.getOfs();
int la = loc.a();
- invariant( la <= 0xffffff ); // must fit in 3 bytes
+ if (la == maxDiskLoc.a()) {
+ invariant(ofs == maxDiskLoc.getOfs());
+ la = OurMaxA;
+ }
+ invariant( la <= OurMaxA ); // must fit in 3 bytes
if( la < 0 ) {
if ( la != -1 ) {
log() << "btree diskloc isn't negative 1: " << la << std::endl;
diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.h b/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.h
index dcb0697a2b0..bbb720a4040 100644
--- a/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.h
+++ b/src/mongo/db/storage/mmap_v1/btree/btree_ondisk.h
@@ -258,8 +258,8 @@ namespace mongo {
//
enum {
- // first bit of offsets used in _KeyNode we don't use -1 here.
- OurNullOfs = -2
+ OurNullOfs = -2, // first bit of offsets used in _KeyNode we don't use -1 here
+ OurMaxA = 0xffffff, // highest 3-byte value
};
void Null() {
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index 0a4152ebb2b..b25ba1978c7 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -281,6 +281,19 @@ namespace mongo {
virtual Status setCustomOption( OperationContext* txn,
const BSONElement& option,
BSONObjBuilder* info = NULL ) = 0;
+
+ /**
+ * Return the DiskLoc of an oplog entry as close to startingPosition as possible without
+ * being higher. If there are no entries <= startingPosition, return DiskLoc().
+ *
+ * If you don't implement the oplogStartHack, just use the default implementation which
+ * returns an Invalid DiskLoc.
+ */
+ virtual DiskLoc oplogStartHack(OperationContext* txn,
+ const DiskLoc& startingPosition) const {
+ return DiskLoc().setInvalid();
+ }
+
protected:
std::string _ns;
};