diff options
author | Eric Milkie <milkie@10gen.com> | 2015-01-22 14:27:47 -0500 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2015-01-22 14:28:02 -0500 |
commit | 816defcefba70ffe815639fa6bb157b69ef034ad (patch) | |
tree | 561bc8b22af5c5a9d38c4cb269737d344710b730 | |
parent | 78c3e1803f96d045e91bb2efb9163eee9f43d237 (diff) | |
download | mongo-816defcefba70ffe815639fa6bb157b69ef034ad.tar.gz |
SERVER-16994 handle WriteConflictException when writing oplog on secondaries
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 96 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 7 |
5 files changed, 69 insertions, 84 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b3e167e81b0..dfb724dbdaa 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/oplog.h" +#include <deque> #include <vector> #include "mongo/db/auth/action_set.h" @@ -45,6 +46,7 @@ #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/commands.h" #include "mongo/db/commands/dbhash.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/global_environment_experiment.h" @@ -410,58 +412,64 @@ namespace { } } - /** write an op to the oplog that is already built. - todo : make _logOpRS() call this so we don't repeat ourself? + OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + OpTime lastOptime = replCoord->getMyLastOptime(); + invariant(!ops.empty()); + + while (1) { + try { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + + if ( localOplogRSCollection == 0 ) { + Client::Context ctx(txn, rsoplog); + + localDB = ctx.db(); + verify( localDB ); + localOplogRSCollection = localDB->getCollection(rsoplog); + massert(13389, + "local.oplog.rs missing. did you drop it? if so restart server", + localOplogRSCollection); + } - NOTE(schwerin): This implementation requires that the lock for the oplog or one of its - parents be held in the exclusive (MODE_X) state, or to otherwise ensure that setMyLastOptime - is called with strictly increasing timestamp values. - */ - OpTime _logOpObjRS(OperationContext* txn, const BSONObj& op) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); + Client::Context ctx(txn, rsoplog, localDB); + WriteUnitOfWork wunit(txn); - const OpTime ts = op["ts"]._opTime(); - long long hash = op["h"].numberLong(); + for (std::deque<BSONObj>::const_iterator it = ops.begin(); + it != ops.end(); + ++it) { + const BSONObj& op = *it; + const OpTime ts = op["ts"]._opTime(); - { - if ( localOplogRSCollection == 0 ) { - Client::Context ctx(txn, rsoplog); + checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); - localDB = ctx.db(); - verify( localDB ); - localOplogRSCollection = localDB->getCollection(rsoplog); - massert(13389, - "local.oplog.rs missing. did you drop it? if so restart server", - localOplogRSCollection); - } - Client::Context ctx(txn, rsoplog, localDB); - // TODO(geert): soon this needs to be part of an outer WUOW not its own. - // We can't do this yet due to locking limitations. - WriteUnitOfWork wunit(txn); - checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false)); - - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - OpTime myLastOptime = replCoord->getMyLastOptime(); - if (!(myLastOptime < ts)) { - severe() << "replication oplog stream went back in time. previous timestamp: " - << myLastOptime << " newest timestamp: " << ts << ". Op being applied: " - << op; - fassertFailedNoTrace(18905); - } - wunit.commit(); + if (!(lastOptime < ts)) { + severe() << "replication oplog stream went back in time. " + "previous timestamp: " << lastOptime << " newest timestamp: " << ts + << ". Op being applied: " << op; + fassertFailedNoTrace(18905); + } + lastOptime = ts; + } + wunit.commit(); - BackgroundSync* bgsync = BackgroundSync::get(); - // Keep this up-to-date, in case we step up to primary. - bgsync->setLastAppliedHash(hash); + BackgroundSync* bgsync = BackgroundSync::get(); + // Keep this up-to-date, in case we step up to primary. + long long hash = ops.back()["h"].numberLong(); + bgsync->setLastAppliedHash(hash); - ctx.getClient()->setLastOp( ts ); + ctx.getClient()->setLastOp(lastOptime); - replCoord->setMyLastOptime(ts); - } + replCoord->setMyLastOptime(lastOptime); + setNewOptime(lastOptime); - setNewOptime(ts); - return ts; + return lastOptime; + } + catch (const WriteConflictException& wce) { + log() << "WriteConflictException while writing oplog, retrying."; + } + } } void createOplog(OperationContext* txn) { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index be0e60061ce..07b0723417a 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -29,6 +29,7 @@ #pragma once #include <cstddef> +#include <deque> #include <string> namespace mongo { @@ -47,10 +48,11 @@ namespace repl { // If the collection already exists, set the 'last' OpTime if master/slave (side effect!) void createOplog(OperationContext* txn); - // This poorly-named function writes an op into the replica-set oplog; - // used internally by replication secondaries after they have applied an op - // Returns the newly-generated optime for the applied op. - OpTime _logOpObjRS(OperationContext* txn, const BSONObj& op); + // This function writes ops into the replica-set oplog; + // used internally by replication secondaries after they have applied ops. Updates the global + // optime. + // Returns the optime for the last op inserted. + OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops); const char rsoplog[] = "local.oplog.rs"; static const int OPLOG_VERSION = 2; diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index c1fa5efe18d..2c97825015d 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -302,7 +302,9 @@ namespace { // prime oplog try { _tryToApplyOpWithRetry(&txn, &init, lastOp); - _logOpObjRS(&txn, lastOp); + std::deque<BSONObj> ops; + ops.push_back(lastOp); + writeOpsToOplog(&txn, ops); return Status::OK(); } catch (DBException& e) { // Return if in shutdown @@ -332,7 +334,9 @@ namespace { // prime oplog _tryToApplyOpWithRetry(&txn, &init, lastOp); - _logOpObjRS(&txn, lastOp); + std::deque<BSONObj> ops; + ops.push_back(lastOp); + writeOpsToOplog(&txn, ops); std::string msg = "oplog sync 1 of 3"; log() << msg; diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index f84d1d08e7b..0a5506d5bfe 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -277,9 +277,12 @@ namespace repl { } applyOps(writerVectors); - return applyOpsToOplog(txn, &ops); - } + OpTime lastOpTime = writeOpsToOplog(txn, ops); + + BackgroundSync::get()->notify(txn); + return lastOpTime; + } void SyncTail::fillWriterVectors(const std::deque<BSONObj>& ops, std::vector< std::vector<BSONObj> >* writerVectors) { @@ -554,31 +557,6 @@ namespace { return false; } - OpTime SyncTail::applyOpsToOplog(OperationContext* txn, std::deque<BSONObj>* ops) { - OpTime lastOpTime; - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - WriteUnitOfWork wunit(txn); - - while (!ops->empty()) { - const BSONObj& op = ops->front(); - // this updates lastOpTimeApplied - lastOpTime = _logOpObjRS(txn, op); - ops->pop_front(); - } - wunit.commit(); - } - - // This call may result in us assuming PRIMARY role if we'd been waiting for our sync - // buffer to drain and it's now empty. This will acquire a global lock to drop all - // temp collections, so we must release the above lock on the local database before - // doing so. - BackgroundSync::get()->notify(txn); - - return lastOpTime; - } - void SyncTail::handleSlaveDelay(const BSONObj& lastOp) { ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); int slaveDelaySecs = replCoord->getSlaveDelaySecs().total_seconds(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 98c0d836007..1802a05f41c 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -111,13 +111,6 @@ namespace repl { void _applyOplogUntil(OperationContext* txn, const OpTime& endOpTime); private: - // After ops have been written to db, call this - // to update local oplog.rs, as well as notify the primary - // that we have applied the ops. - // Ops are removed from the deque. - // Returns the optime of the last op applied. - OpTime applyOpsToOplog(OperationContext* txn, std::deque<BSONObj>* ops); - BackgroundSyncInterface* _networkQueue; // Function to use during applyOps |