summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2015-06-08 13:29:37 -0400
committerAndy Schwerin <schwerin@mongodb.com>2015-06-08 13:31:22 -0400
commit9e1657ebd156ed70f44a4585d2330510d75e2ae5 (patch)
tree4eba9bdb8e5824c8caee34c482207e6c6c5d4e7b /src/mongo
parent9706bfd8946fdcfdbd97b9f451522666355985f1 (diff)
downloadmongo-9e1657ebd156ed70f44a4585d2330510d75e2ae5.tar.gz
SERVER-15192 Roll up of patches required to make logOp rollback safe in v3.0.
This rolls up commits with the following summaries from the master development branch into the v3.0 branch. * Make AuthzManager logOp listener rollback-safe * Make MigrateFromStatus logOp listener rollback-safe * Make dbhash and storedFuncMod logOp listeners rollback-safe * Remove RollBackPreventer since all logOp listeners are now rollback-safe * Do not perform a query while committing. * Terminate server if RecoveryUnit::Change throws on commit or rollback.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/auth/authorization_manager.cpp3
-rw-r--r--src/mongo/db/auth/authorization_manager.h3
-rw-r--r--src/mongo/db/auth/authz_manager_external_state.h1
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_local.cpp98
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_local.h6
-rw-r--r--src/mongo/db/auth/authz_manager_external_state_mock.cpp9
-rw-r--r--src/mongo/db/commands/apply_ops.cpp4
-rw-r--r--src/mongo/db/commands/dbhash.cpp33
-rw-r--r--src/mongo/db/commands/dbhash.h11
-rw-r--r--src/mongo/db/repl/oplog.cpp46
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp26
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp20
-rw-r--r--src/mongo/db/storage/recovery_unit_noop.h9
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.cpp24
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp41
-rw-r--r--src/mongo/s/d_migrate.cpp118
-rw-r--r--src/mongo/scripting/engine.cpp20
-rw-r--r--src/mongo/scripting/engine.h14
18 files changed, 340 insertions, 146 deletions
diff --git a/src/mongo/db/auth/authorization_manager.cpp b/src/mongo/db/auth/authorization_manager.cpp
index 5cb58574e8f..e54389c26a7 100644
--- a/src/mongo/db/auth/authorization_manager.cpp
+++ b/src/mongo/db/auth/authorization_manager.cpp
@@ -1013,13 +1013,14 @@ namespace {
}
void AuthorizationManager::logOp(
+ OperationContext* txn,
const char* op,
const char* ns,
const BSONObj& o,
BSONObj* o2,
bool* b) {
- _externalState->logOp(op, ns, o, o2, b);
+ _externalState->logOp(txn, op, ns, o, o2, b);
if (appliesToAuthzData(op, ns, o)) {
_invalidateRelevantCacheData(op, ns, o, o2);
}
diff --git a/src/mongo/db/auth/authorization_manager.h b/src/mongo/db/auth/authorization_manager.h
index 994e0776bbc..65426c4ae76 100644
--- a/src/mongo/db/auth/authorization_manager.h
+++ b/src/mongo/db/auth/authorization_manager.h
@@ -434,7 +434,8 @@ namespace mongo {
* Hook called by replication code to let the AuthorizationManager observe changes
* to relevant collections.
*/
- void logOp(const char* opstr,
+ void logOp(OperationContext* txn,
+ const char* opstr,
const char* ns,
const BSONObj& obj,
BSONObj* patt,
diff --git a/src/mongo/db/auth/authz_manager_external_state.h b/src/mongo/db/auth/authz_manager_external_state.h
index 5b88d190682..d9a81c2e8e1 100644
--- a/src/mongo/db/auth/authz_manager_external_state.h
+++ b/src/mongo/db/auth/authz_manager_external_state.h
@@ -229,6 +229,7 @@ namespace mongo {
virtual void releaseAuthzUpdateLock() = 0;
virtual void logOp(
+ OperationContext* txn,
const char* op,
const char* ns,
const BSONObj& o,
diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp
index 507db983a53..ea843a2d4b0 100644
--- a/src/mongo/db/auth/authz_manager_external_state_local.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp
@@ -36,6 +36,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/user_document_parser.h"
+#include "mongo/db/operation_context.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -365,47 +366,100 @@ namespace {
return status;
}
- void AuthzManagerExternalStateLocal::logOp(
- const char* op,
- const char* ns,
- const BSONObj& o,
- BSONObj* o2,
- bool* b) {
+ class AuthzManagerExternalStateLocal::AuthzManagerLogOpHandler : public RecoveryUnit::Change {
+ public:
- if (ns == AuthorizationManager::rolesCollectionNamespace.ns() ||
- ns == AuthorizationManager::adminCommandNamespace.ns()) {
+ // None of the parameters below (except externalState) need to live longer than
+ // the instantiations of this class
+ AuthzManagerLogOpHandler(AuthzManagerExternalStateLocal* externalState,
+ const char* op,
+ const char* ns,
+ const BSONObj& o,
+ const BSONObj* o2,
+ const bool* b):
+ _externalState(externalState),
+ _op(op),
+ _ns(ns),
+ _o(o.getOwned()),
- boost::lock_guard<boost::mutex> lk(_roleGraphMutex);
- Status status = _roleGraph.handleLogOp(op, NamespaceString(ns), o, o2);
+ _isO2Set(o2 ? true : false),
+ _o2(_isO2Set ? o2->getOwned() : BSONObj()),
+
+ _isBSet(b ? true : false),
+ _b(_isBSet ? *b : false) {
+
+ }
+
+ virtual void commit() {
+ boost::lock_guard<boost::mutex> lk(_externalState->_roleGraphMutex);
+ Status status = _externalState->_roleGraph.handleLogOp(_op.c_str(),
+ NamespaceString(_ns.c_str()),
+ _o,
+ _isO2Set ? &_o2 : NULL);
if (status == ErrorCodes::OplogOperationUnsupported) {
- _roleGraph = RoleGraph();
- _roleGraphState = roleGraphStateInitial;
+ _externalState->_roleGraph = RoleGraph();
+ _externalState->_roleGraphState = _externalState->roleGraphStateInitial;
BSONObjBuilder oplogEntryBuilder;
- oplogEntryBuilder << "op" << op << "ns" << ns << "o" << o;
- if (o2)
- oplogEntryBuilder << "o2" << *o2;
- if (b)
- oplogEntryBuilder << "b" << *b;
+ oplogEntryBuilder << "op" << _op << "ns" << _ns << "o" << _o;
+ if (_isO2Set)
+ oplogEntryBuilder << "o2" << _o2;
+ if (_isBSet)
+ oplogEntryBuilder << "b" << _b;
error() << "Unsupported modification to roles collection in oplog; "
"restart this process to reenable user-defined roles; " << status.reason() <<
"; Oplog entry: " << oplogEntryBuilder.done();
}
else if (!status.isOK()) {
warning() << "Skipping bad update to roles collection in oplog. " << status <<
- " Oplog entry: " << op;
+ " Oplog entry: " << _op;
}
- status = _roleGraph.recomputePrivilegeData();
+ status = _externalState->_roleGraph.recomputePrivilegeData();
if (status == ErrorCodes::GraphContainsCycle) {
- _roleGraphState = roleGraphStateHasCycle;
+ _externalState->_roleGraphState = _externalState->roleGraphStateHasCycle;
error() << "Inconsistent role graph during authorization manager initialization. "
"Only direct privileges available. " << status.reason() <<
- " after applying oplog entry " << op;
+ " after applying oplog entry " << _op;
}
else {
fassert(17183, status);
- _roleGraphState = roleGraphStateConsistent;
+ _externalState->_roleGraphState = _externalState->roleGraphStateConsistent;
}
+
+ }
+
+ virtual void rollback() { }
+
+ private:
+ AuthzManagerExternalStateLocal* _externalState;
+ const std::string _op;
+ const std::string _ns;
+ const BSONObj _o;
+
+ const bool _isO2Set;
+ const BSONObj _o2;
+
+ const bool _isBSet;
+ const bool _b;
+ };
+
+ void AuthzManagerExternalStateLocal::logOp(
+ OperationContext* txn,
+ const char* op,
+ const char* ns,
+ const BSONObj& o,
+ BSONObj* o2,
+ bool* b) {
+
+ if (ns == AuthorizationManager::rolesCollectionNamespace.ns() ||
+ ns == AuthorizationManager::adminCommandNamespace.ns()) {
+
+ txn->recoveryUnit()->registerChange(new AuthzManagerLogOpHandler(this,
+ op,
+ ns,
+ o,
+ o2,
+ b));
}
}
diff --git a/src/mongo/db/auth/authz_manager_external_state_local.h b/src/mongo/db/auth/authz_manager_external_state_local.h
index 1fd95a6cadc..87648b911c0 100644
--- a/src/mongo/db/auth/authz_manager_external_state_local.h
+++ b/src/mongo/db/auth/authz_manager_external_state_local.h
@@ -65,6 +65,7 @@ namespace mongo {
std::vector<BSONObj>* result);
virtual void logOp(
+ OperationContext* txn,
const char* op,
const char* ns,
const BSONObj& o,
@@ -82,6 +83,11 @@ namespace mongo {
};
/**
+ * RecoveryUnit::Change subclass used to commit work for AuthzManager logOp listener.
+ */
+ class AuthzManagerLogOpHandler;
+
+ /**
* Initializes the role graph from the contents of the admin.system.roles collection.
*/
Status _initializeRoleGraph(OperationContext* txn);
diff --git a/src/mongo/db/auth/authz_manager_external_state_mock.cpp b/src/mongo/db/auth/authz_manager_external_state_mock.cpp
index 71ee59d1797..e09a8179dab 100644
--- a/src/mongo/db/auth/authz_manager_external_state_mock.cpp
+++ b/src/mongo/db/auth/authz_manager_external_state_mock.cpp
@@ -152,14 +152,17 @@ namespace {
toInsert = document.copy();
}
_documents[collectionName].push_back(toInsert);
+
if (_authzManager) {
_authzManager->logOp(
+ txn,
"i",
collectionName.ns().c_str(),
toInsert,
NULL,
NULL);
}
+
return Status::OK();
}
@@ -190,14 +193,17 @@ namespace {
BSONObj newObj = document.getObject().copy();
*iter = newObj;
BSONObj idQuery = driver.makeOplogEntryQuery(newObj, false);
+
if (_authzManager) {
_authzManager->logOp(
+ txn,
"u",
collectionName.ns().c_str(),
logObj,
&idQuery,
NULL);
}
+
return Status::OK();
}
else if (status == ErrorCodes::NoMatchingDocument && upsert) {
@@ -243,14 +249,17 @@ namespace {
BSONObj idQuery = (*iter)["_id"].wrap();
_documents[collectionName].erase(iter);
++n;
+
if (_authzManager) {
_authzManager->logOp(
+ txn,
"d",
collectionName.ns().c_str(),
idQuery,
NULL,
NULL);
}
+
}
*numRemoved = n;
return Status::OK();
diff --git a/src/mongo/db/commands/apply_ops.cpp b/src/mongo/db/commands/apply_ops.cpp
index 1506b59bc14..0765e427e40 100644
--- a/src/mongo/db/commands/apply_ops.cpp
+++ b/src/mongo/db/commands/apply_ops.cpp
@@ -188,7 +188,9 @@ namespace mongo {
num++;
- logOpForDbHash(ns.c_str());
+ WriteUnitOfWork wuow(txn);
+ logOpForDbHash(txn, ns.c_str());
+ wuow.commit();
}
result.append( "applied" , num );
diff --git a/src/mongo/db/commands/dbhash.cpp b/src/mongo/db/commands/dbhash.cpp
index 0066d700874..8472faa2915 100644
--- a/src/mongo/db/commands/dbhash.cpp
+++ b/src/mongo/db/commands/dbhash.cpp
@@ -56,8 +56,8 @@ namespace mongo {
DBHashCmd dbhashCmd;
- void logOpForDbHash(const char* ns) {
- dbhashCmd.wipeCacheForCollection( ns );
+ void logOpForDbHash(OperationContext* txn, const char* ns) {
+ dbhashCmd.wipeCacheForCollection(txn, ns);
}
// ----
@@ -216,15 +216,34 @@ namespace mongo {
return 1;
}
- void DBHashCmd::wipeCacheForCollection( const StringData& ns ) {
+ class DBHashCmd::DBHashLogOpHandler : public RecoveryUnit::Change {
+ public:
+ DBHashLogOpHandler(DBHashCmd* dCmd,
+ StringData ns):
+ _dCmd(dCmd),
+ _ns(ns.toString()) {
+
+ }
+ void commit() {
+ scoped_lock lk( _dCmd->_cachedHashedMutex );
+ _dCmd->_cachedHashed.erase(_ns);
+ }
+ void rollback() { }
+
+ private:
+ DBHashCmd *_dCmd;
+ const std::string _ns;
+ };
+
+ void DBHashCmd::wipeCacheForCollection(OperationContext* txn,
+ StringData ns) {
if ( !isCachable( ns ) )
return;
- scoped_lock lk( _cachedHashedMutex );
- _cachedHashed.erase( ns.toString() );
+ txn->recoveryUnit()->registerChange(new DBHashLogOpHandler(this, ns));
}
- bool DBHashCmd::isCachable( const StringData& ns ) const {
- return ns.startsWith( "config." );
+ bool DBHashCmd::isCachable(StringData ns) const {
+ return ns.startsWith("config.");
}
}
diff --git a/src/mongo/db/commands/dbhash.h b/src/mongo/db/commands/dbhash.h
index 383c7fb9d80..2063cfeca22 100644
--- a/src/mongo/db/commands/dbhash.h
+++ b/src/mongo/db/commands/dbhash.h
@@ -34,7 +34,7 @@
namespace mongo {
- void logOpForDbHash( const char* ns );
+ void logOpForDbHash( OperationContext* txn, const char* ns );
class DBHashCmd : public Command {
public:
@@ -48,11 +48,16 @@ namespace mongo {
virtual bool run(OperationContext* txn, const std::string& dbname , BSONObj& cmdObj, int, std::string& errmsg, BSONObjBuilder& result, bool);
- void wipeCacheForCollection( const StringData& ns );
+ void wipeCacheForCollection(OperationContext* txn, StringData ns);
private:
- bool isCachable( const StringData& ns ) const;
+ /**
+ * RecoveryUnit::Change subclass used to commit work for dbhash logOp listener
+ */
+ class DBHashLogOpHandler;
+
+ bool isCachable( StringData ns ) const;
std::string hashCollection( OperationContext* opCtx, Database* db, const std::string& fullCollectionName, bool* fromCache );
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 84883cc5286..04e1b10adc0 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -363,41 +363,20 @@ namespace {
BSONObj* patt,
bool* b,
bool fromMigrate) {
+
if ( getGlobalReplicationCoordinator()->isReplEnabled() ) {
_logOp(txn, opstr, ns, 0, obj, patt, b, fromMigrate);
}
-
ensureShardVersionOKOrThrow(ns);
- try {
- // TODO SERVER-15192 remove this once all listeners are rollback-safe.
- class RollbackPreventer : public RecoveryUnit::Change {
- virtual void commit() {}
- virtual void rollback() {
- severe() << "Rollback of logOp not currently allowed (SERVER-15192)";
- fassertFailed(18805);
- }
- };
- txn->recoveryUnit()->registerChange(new RollbackPreventer());
- logOpForSharding(txn, opstr, ns, obj, patt, fromMigrate);
- logOpForDbHash(ns);
- getGlobalAuthorizationManager()->logOp(opstr, ns, obj, patt, b);
-
- if ( strstr( ns, ".system.js" ) ) {
- Scope::storedFuncMod(); // this is terrible
- }
- }
- catch (const DBException& ex) {
- severe() << "Fatal DBException in logOp(): " << ex.toString();
- std::terminate();
- }
- catch (const std::exception& ex) {
- severe() << "Fatal std::exception in logOp(): " << ex.what();
- std::terminate();
- }
- catch (...) {
- severe() << "Fatal error in logOp()";
- std::terminate();
+ //
+ // rollback-safe logOp listeners
+ //
+ getGlobalAuthorizationManager()->logOp(txn, opstr, ns, obj, patt, b);
+ logOpForSharding(txn, opstr, ns, obj, patt, fromMigrate);
+ logOpForDbHash(txn, ns);
+ if ( strstr( ns, ".system.js" ) ) {
+ Scope::storedFuncMod(txn);
}
}
@@ -774,12 +753,19 @@ namespace {
else {
throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) );
}
+
+ // AuthorizationManager's logOp method registers a RecoveryUnit::Change
+ // and to do so we need to have begun a UnitOfWork
+ WriteUnitOfWork wuow(txn);
getGlobalAuthorizationManager()->logOp(
+ txn,
opType,
ns,
o,
fieldO2.isABSONObj() ? &o2 : NULL,
!fieldB.eoo() ? &valueB : NULL );
+ wuow.commit();
+
return failedUpdate;
}
diff --git a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp
index 5ade0603a96..35b079b2d8e 100644
--- a/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp
+++ b/src/mongo/db/storage/in_memory/in_memory_recovery_unit.cpp
@@ -45,10 +45,15 @@ namespace mongo {
if ( _depth > 1 )
return;
- for (Changes::iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
- (*it)->commit();
+ try {
+ for (Changes::iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
+ (*it)->commit();
+ }
+ _changes.clear();
+ }
+ catch (...) {
+ std::terminate();
}
- _changes.clear();
}
void InMemoryRecoveryUnit::endUnitOfWork() {
@@ -56,10 +61,15 @@ namespace mongo {
if (_depth > 0 )
return;
- for (Changes::reverse_iterator it = _changes.rbegin(), end = _changes.rend();
- it != end; ++it) {
- (*it)->rollback();
- }
- _changes.clear();
+ try {
+ for (Changes::reverse_iterator it = _changes.rbegin(), end = _changes.rend();
+ it != end; ++it) {
+ (*it)->rollback();
+ }
+ _changes.clear();
+ }
+ catch (...) {
+ std::terminate();
+ }
}
}
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
index af4d93ba039..a806bae91be 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.cpp
@@ -107,9 +107,14 @@ namespace mongo {
if (getDur().isDurable())
markWritesForJournaling();
- for (Changes::const_iterator it = _changes.begin(), end = _changes.end();
+ try {
+ for (Changes::const_iterator it = _changes.begin(), end = _changes.end();
it != end; ++it) {
- (*it)->commit();
+ (*it)->commit();
+ }
+ }
+ catch (...) {
+ std::terminate();
}
resetChanges();
@@ -223,9 +228,14 @@ namespace mongo {
LOG(2) << " ***** ROLLING BACK " << (_changes.size()) << " custom changes";
- for (int i = _changes.size() - 1; i >= 0; i--) {
- LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i]));
- _changes[i]->rollback();
+ try {
+ for (int i = _changes.size() - 1; i >= 0; i--) {
+ LOG(2) << "CUSTOM ROLLBACK " << demangleName(typeid(*_changes[i]));
+ _changes[i]->rollback();
+ }
+ }
+ catch (...) {
+ std::terminate();
}
resetChanges();
diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h
index 7008f68298c..72556fc33aa 100644
--- a/src/mongo/db/storage/recovery_unit_noop.h
+++ b/src/mongo/db/storage/recovery_unit_noop.h
@@ -48,8 +48,13 @@ namespace mongo {
}
virtual void registerChange(Change* change) {
- change->commit();
- delete change;
+ try {
+ change->commit();
+ delete change;
+ }
+ catch (...) {
+ std::terminate();
+ }
}
virtual void* writingPtr(void* data, size_t len) {
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
index da1ab9798a7..bb458cc8e07 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
@@ -134,10 +134,15 @@ namespace mongo {
_commit();
}
- for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
- (*it)->commit();
+ try {
+ for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
+ (*it)->commit();
+ }
+ _changes.clear();
+ }
+ catch (...) {
+ std::terminate();
}
- _changes.clear();
_releaseSnapshot();
}
@@ -216,11 +221,16 @@ namespace mongo {
}
void RocksRecoveryUnit::_abort() {
- for (Changes::const_reverse_iterator it = _changes.rbegin(), end = _changes.rend();
- it != end; ++it) {
- (*it)->rollback();
+ try {
+ for (Changes::const_reverse_iterator it = _changes.rbegin(), end = _changes.rend();
+ it != end; ++it) {
+ (*it)->rollback();
+ }
+ _changes.clear();
+ }
+ catch (...) {
+ std::terminate();
}
- _changes.clear();
_deltaCounters.clear();
_writeBatch.reset();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 5d38e25635f..7361f151abb 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -111,26 +111,41 @@ namespace mongo {
}
void WiredTigerRecoveryUnit::_commit() {
- if ( _session && _active ) {
- _txnClose( true );
- }
+ try {
+ if ( _session && _active ) {
+ _txnClose( true );
+ }
- for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) {
- (*it)->commit();
+ for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end;
+ ++it) {
+ (*it)->commit();
+ }
+ _changes.clear();
+
+ invariant(!_active);
+ }
+ catch (...) {
+ std::terminate();
}
- _changes.clear();
}
void WiredTigerRecoveryUnit::_abort() {
- if ( _session && _active ) {
- _txnClose( false );
- }
+ try {
+ if ( _session && _active ) {
+ _txnClose( false );
+ }
- for (Changes::const_reverse_iterator it = _changes.rbegin(), end = _changes.rend();
- it != end; ++it) {
- (*it)->rollback();
+ for (Changes::const_reverse_iterator it = _changes.rbegin(), end = _changes.rend();
+ it != end; ++it) {
+ (*it)->rollback();
+ }
+ _changes.clear();
+
+ invariant(!_active);
+ }
+ catch (...) {
+ std::terminate();
}
- _changes.clear();
}
void WiredTigerRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index bca2660e63e..db8e6b28bc1 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -326,6 +326,16 @@ namespace mongo {
const BSONObj& obj,
BSONObj* patt,
bool notInActiveChunk) {
+ const char op = opstr[0];
+
+ if (notInActiveChunk) {
+ // Ignore writes that came from the migration process like cleanup so they
+ // won't be transferred to the recipient shard. Also ignore ops from
+ // _migrateClone and _transferMods since it is impossible to move a chunk
+ // to self.
+ return;
+ }
+
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
if (!_active)
@@ -335,65 +345,47 @@ namespace mongo {
return;
// no need to log if this is not an insertion, an update, or an actual deletion
- // note: opstr 'db' isn't a deletion but a mention that a database exists (for replication
- // machinery mostly)
- char op = opstr[0];
- if ( op == 'n' || op =='c' || ( op == 'd' && opstr[1] == 'b' ) )
+ // note: opstr 'db' isn't a deletion but a mention that a database exists
+ // (for replication machinery mostly).
+ if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b'))
return;
BSONElement ide;
- if ( patt )
- ide = patt->getField( "_id" );
+ if (patt)
+ ide = patt->getField("_id");
else
ide = obj["_id"];
- if ( ide.eoo() ) {
- warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj << migrateLog;
+ if (ide.eoo()) {
+ warning() << "logOpForSharding got mod with no _id, ignoring obj: "
+ << obj << migrateLog;
return;
}
- BSONObj it;
-
- switch ( opstr[0] ) {
-
- case 'd': {
-
- if (notInActiveChunk) {
- // we don't want to xfer things we're cleaning
- // as then they'll be deleted on TO
- // which is bad
- return;
- }
-
- scoped_lock sl(_mutex);
- // can't filter deletes :(
- _deleted.push_back( ide.wrap() );
- _memoryUsed += ide.size() + 5;
+ if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) {
return;
}
- case 'i':
- it = obj;
- break;
+ BSONObj idObj(ide.wrap());
- case 'u':
+ if (op == 'u') {
+ BSONObj fullDoc;
Client::Context ctx(txn, _ns, false);
- if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), ide.wrap(), it)) {
- warning() << "logOpForSharding couldn't find: " << ide
+ if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) {
+ warning() << "logOpForSharding couldn't find: " << idObj
<< " even though should have" << migrateLog;
+ dassert(false); // TODO: Abort the migration.
return;
}
- break;
+ if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
+ return;
+ }
}
- if (!isInRange(it, _min, _max, _shardKeyPattern)) {
- return;
- }
+ // Note: can't check if delete is in active chunk since the document is gone!
- scoped_lock sl(_mutex);
- _reload.push_back(ide.wrap());
- _memoryUsed += ide.size() + 5;
+ txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
}
/**
@@ -735,6 +727,56 @@ namespace mongo {
void _setActive( bool b ) { scoped_lock lk(_mutex); _active = b; }
/**
+ * Used to commit work for LogOpForSharding. Used to keep track of changes in documents
+ * that are part of a chunk being migrated.
+ */
+ class LogOpForShardingHandler : public RecoveryUnit::Change {
+ public:
+ /**
+ * Invariant: idObj should belong to a document that is part of the active chunk
+ * being migrated.
+ */
+ LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus,
+ const BSONObj& idObj,
+ const char op):
+ _migrateFromStatus(migrateFromStatus),
+ _idObj(idObj.getOwned()),
+ _op(op) {
+ }
+
+ virtual void commit() {
+ switch (_op) {
+ case 'd': {
+ scoped_lock sl(_migrateFromStatus->_mutex);
+ _migrateFromStatus->_deleted.push_back(_idObj);
+ _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
+ break;
+ }
+
+ case 'i':
+ case 'u':
+ {
+ scoped_lock sl(_migrateFromStatus->_mutex);
+ _migrateFromStatus->_reload.push_back(_idObj);
+ _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
+ break;
+ }
+
+ default:
+ invariant(false);
+
+ }
+ }
+
+ virtual void rollback() { }
+
+ private:
+ MigrateFromStatus* _migrateFromStatus;
+ const BSONObj _idObj;
+ const char _op;
+ };
+
+ /**
* Used to receive invalidation notifications.
*
* XXX: move to the exec/ directory.
diff --git a/src/mongo/scripting/engine.cpp b/src/mongo/scripting/engine.cpp
index 5c8f9491ffd..1118e6fbf5d 100644
--- a/src/mongo/scripting/engine.cpp
+++ b/src/mongo/scripting/engine.cpp
@@ -42,6 +42,7 @@
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/dbclientinterface.h"
#include "mongo/db/global_environment_experiment.h"
+#include "mongo/db/operation_context.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/util/file.h"
#include "mongo/util/log.h"
@@ -56,7 +57,7 @@ namespace mongo {
using std::set;
using std::string;
- long long Scope::_lastVersion = 1;
+ AtomicInt64 Scope::_lastVersion(1);
namespace {
// 2 GB is the largest support Javascript file size.
@@ -187,8 +188,16 @@ namespace {
return exec(code, filename, printResult, reportError, timeoutMs);
}
- void Scope::storedFuncMod() {
- _lastVersion++;
+ class Scope::StoredFuncModLogOpHandler : public RecoveryUnit::Change {
+ public:
+ void commit() {
+ _lastVersion.fetchAndAdd(1);
+ }
+ void rollback() { }
+ };
+
+ void Scope::storedFuncMod(OperationContext* txn) {
+ txn->recoveryUnit()->registerChange(new StoredFuncModLogOpHandler());
}
void Scope::validateObjectIdString(const string& str) {
@@ -204,10 +213,11 @@ namespace {
uassert(10208, "need to have locallyConnected already", _localDBName.size());
}
- if (_loadedVersion == _lastVersion)
+ int64_t lastVersion = _lastVersion.load();
+ if (_loadedVersion == lastVersion)
return;
- _loadedVersion = _lastVersion;
+ _loadedVersion = lastVersion;
string coll = _localDBName + ".system.js";
scoped_ptr<DBClientBase> directDBClient(createDirectClient(txn));
diff --git a/src/mongo/scripting/engine.h b/src/mongo/scripting/engine.h
index 1e97ecdff1e..535ed317b02 100644
--- a/src/mongo/scripting/engine.h
+++ b/src/mongo/scripting/engine.h
@@ -31,6 +31,7 @@
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/jsobj.h"
+#include "mongo/platform/atomic_word.h"
namespace mongo {
typedef unsigned long long ScriptingFunction;
@@ -148,7 +149,7 @@ namespace mongo {
* if any changes are made to .system.js, call this
* right now its just global - slightly inefficient, but a lot simpler
*/
- static void storedFuncMod();
+ static void storedFuncMod(OperationContext *txn);
static void validateObjectIdString(const std::string& str);
@@ -177,14 +178,21 @@ namespace mongo {
protected:
friend class PooledScope;
+
+ /**
+ * RecoveryUnit::Change subclass used to commit work for
+ * Scope::storedFuncMod logOp listener.
+ */
+ class StoredFuncModLogOpHandler;
+
virtual FunctionCacheMap& getFunctionCache() { return _cachedFunctions; }
virtual ScriptingFunction _createFunction(const char* code,
ScriptingFunction functionNumber = 0) = 0;
std::string _localDBName;
- long long _loadedVersion;
+ int64_t _loadedVersion;
std::set<std::string> _storedNames;
- static long long _lastVersion;
+ static AtomicInt64 _lastVersion;
FunctionCacheMap _cachedFunctions;
int _numTimesUsed;
bool _lastRetIsNativeCode; // v8 only: set to true if eval'd script returns a native func