summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2015-01-22 14:27:47 -0500
committerEric Milkie <milkie@10gen.com>2015-01-22 14:28:02 -0500
commit816defcefba70ffe815639fa6bb157b69ef034ad (patch)
tree561bc8b22af5c5a9d38c4cb269737d344710b730
parent78c3e1803f96d045e91bb2efb9163eee9f43d237 (diff)
downloadmongo-816defcefba70ffe815639fa6bb157b69ef034ad.tar.gz
SERVER-16994 handle WriteConflictException when writing oplog on secondaries
-rw-r--r--src/mongo/db/repl/oplog.cpp96
-rw-r--r--src/mongo/db/repl/oplog.h10
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp8
-rw-r--r--src/mongo/db/repl/sync_tail.cpp32
-rw-r--r--src/mongo/db/repl/sync_tail.h7
5 files changed, 69 insertions, 84 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b3e167e81b0..dfb724dbdaa 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/oplog.h"
+#include <deque>
#include <vector>
#include "mongo/db/auth/action_set.h"
@@ -45,6 +46,7 @@
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/dbhash.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/global_environment_experiment.h"
@@ -410,58 +412,64 @@ namespace {
}
}
- /** write an op to the oplog that is already built.
- todo : make _logOpRS() call this so we don't repeat ourself?
+ OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops) {
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ OpTime lastOptime = replCoord->getMyLastOptime();
+ invariant(!ops.empty());
+
+ while (1) {
+ try {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+
+ if ( localOplogRSCollection == 0 ) {
+ Client::Context ctx(txn, rsoplog);
+
+ localDB = ctx.db();
+ verify( localDB );
+ localOplogRSCollection = localDB->getCollection(rsoplog);
+ massert(13389,
+ "local.oplog.rs missing. did you drop it? if so restart server",
+ localOplogRSCollection);
+ }
- NOTE(schwerin): This implementation requires that the lock for the oplog or one of its
- parents be held in the exclusive (MODE_X) state, or to otherwise ensure that setMyLastOptime
- is called with strictly increasing timestamp values.
- */
- OpTime _logOpObjRS(OperationContext* txn, const BSONObj& op) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+ Client::Context ctx(txn, rsoplog, localDB);
+ WriteUnitOfWork wunit(txn);
- const OpTime ts = op["ts"]._opTime();
- long long hash = op["h"].numberLong();
+ for (std::deque<BSONObj>::const_iterator it = ops.begin();
+ it != ops.end();
+ ++it) {
+ const BSONObj& op = *it;
+ const OpTime ts = op["ts"]._opTime();
- {
- if ( localOplogRSCollection == 0 ) {
- Client::Context ctx(txn, rsoplog);
+ checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false));
- localDB = ctx.db();
- verify( localDB );
- localOplogRSCollection = localDB->getCollection(rsoplog);
- massert(13389,
- "local.oplog.rs missing. did you drop it? if so restart server",
- localOplogRSCollection);
- }
- Client::Context ctx(txn, rsoplog, localDB);
- // TODO(geert): soon this needs to be part of an outer WUOW not its own.
- // We can't do this yet due to locking limitations.
- WriteUnitOfWork wunit(txn);
- checkOplogInsert(localOplogRSCollection->insertDocument(txn, op, false));
-
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- OpTime myLastOptime = replCoord->getMyLastOptime();
- if (!(myLastOptime < ts)) {
- severe() << "replication oplog stream went back in time. previous timestamp: "
- << myLastOptime << " newest timestamp: " << ts << ". Op being applied: "
- << op;
- fassertFailedNoTrace(18905);
- }
- wunit.commit();
+ if (!(lastOptime < ts)) {
+ severe() << "replication oplog stream went back in time. "
+ "previous timestamp: " << lastOptime << " newest timestamp: " << ts
+ << ". Op being applied: " << op;
+ fassertFailedNoTrace(18905);
+ }
+ lastOptime = ts;
+ }
+ wunit.commit();
- BackgroundSync* bgsync = BackgroundSync::get();
- // Keep this up-to-date, in case we step up to primary.
- bgsync->setLastAppliedHash(hash);
+ BackgroundSync* bgsync = BackgroundSync::get();
+ // Keep this up-to-date, in case we step up to primary.
+ long long hash = ops.back()["h"].numberLong();
+ bgsync->setLastAppliedHash(hash);
- ctx.getClient()->setLastOp( ts );
+ ctx.getClient()->setLastOp(lastOptime);
- replCoord->setMyLastOptime(ts);
- }
+ replCoord->setMyLastOptime(lastOptime);
+ setNewOptime(lastOptime);
- setNewOptime(ts);
- return ts;
+ return lastOptime;
+ }
+ catch (const WriteConflictException& wce) {
+ log() << "WriteConflictException while writing oplog, retrying.";
+ }
+ }
}
void createOplog(OperationContext* txn) {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index be0e60061ce..07b0723417a 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -29,6 +29,7 @@
#pragma once
#include <cstddef>
+#include <deque>
#include <string>
namespace mongo {
@@ -47,10 +48,11 @@ namespace repl {
// If the collection already exists, set the 'last' OpTime if master/slave (side effect!)
void createOplog(OperationContext* txn);
- // This poorly-named function writes an op into the replica-set oplog;
- // used internally by replication secondaries after they have applied an op
- // Returns the newly-generated optime for the applied op.
- OpTime _logOpObjRS(OperationContext* txn, const BSONObj& op);
+ // This function writes ops into the replica-set oplog;
+ // used internally by replication secondaries after they have applied ops. Updates the global
+ // optime.
+ // Returns the optime for the last op inserted.
+ OpTime writeOpsToOplog(OperationContext* txn, const std::deque<BSONObj>& ops);
const char rsoplog[] = "local.oplog.rs";
static const int OPLOG_VERSION = 2;
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index c1fa5efe18d..2c97825015d 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -302,7 +302,9 @@ namespace {
// prime oplog
try {
_tryToApplyOpWithRetry(&txn, &init, lastOp);
- _logOpObjRS(&txn, lastOp);
+ std::deque<BSONObj> ops;
+ ops.push_back(lastOp);
+ writeOpsToOplog(&txn, ops);
return Status::OK();
} catch (DBException& e) {
// Return if in shutdown
@@ -332,7 +334,9 @@ namespace {
// prime oplog
_tryToApplyOpWithRetry(&txn, &init, lastOp);
- _logOpObjRS(&txn, lastOp);
+ std::deque<BSONObj> ops;
+ ops.push_back(lastOp);
+ writeOpsToOplog(&txn, ops);
std::string msg = "oplog sync 1 of 3";
log() << msg;
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index f84d1d08e7b..0a5506d5bfe 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -277,9 +277,12 @@ namespace repl {
}
applyOps(writerVectors);
- return applyOpsToOplog(txn, &ops);
- }
+ OpTime lastOpTime = writeOpsToOplog(txn, ops);
+
+ BackgroundSync::get()->notify(txn);
+ return lastOpTime;
+ }
void SyncTail::fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors) {
@@ -554,31 +557,6 @@ namespace {
return false;
}
- OpTime SyncTail::applyOpsToOplog(OperationContext* txn, std::deque<BSONObj>* ops) {
- OpTime lastOpTime;
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
- WriteUnitOfWork wunit(txn);
-
- while (!ops->empty()) {
- const BSONObj& op = ops->front();
- // this updates lastOpTimeApplied
- lastOpTime = _logOpObjRS(txn, op);
- ops->pop_front();
- }
- wunit.commit();
- }
-
- // This call may result in us assuming PRIMARY role if we'd been waiting for our sync
- // buffer to drain and it's now empty. This will acquire a global lock to drop all
- // temp collections, so we must release the above lock on the local database before
- // doing so.
- BackgroundSync::get()->notify(txn);
-
- return lastOpTime;
- }
-
void SyncTail::handleSlaveDelay(const BSONObj& lastOp) {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
int slaveDelaySecs = replCoord->getSlaveDelaySecs().total_seconds();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 98c0d836007..1802a05f41c 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -111,13 +111,6 @@ namespace repl {
void _applyOplogUntil(OperationContext* txn, const OpTime& endOpTime);
private:
- // After ops have been written to db, call this
- // to update local oplog.rs, as well as notify the primary
- // that we have applied the ops.
- // Ops are removed from the deque.
- // Returns the optime of the last op applied.
- OpTime applyOpsToOplog(OperationContext* txn, std::deque<BSONObj>* ops);
-
BackgroundSyncInterface* _networkQueue;
// Function to use during applyOps