diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2013-07-15 14:30:17 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2013-07-22 10:43:40 -0400 |
commit | 27c4e7fbd2ef6eeb04dccd1bcdecdb21b00522d1 (patch) | |
tree | f129314545bfcc0a5c64fb2701baa2fc4f1628a2 | |
parent | 9bf70757db09b3a7166440c508bf798d50bd212a (diff) | |
download | mongo-27c4e7fbd2ef6eeb04dccd1bcdecdb21b00522d1.tar.gz |
Revert "Revert "SERVER-6071 use command on local.slaves instead of cursor""
This reverts commit 6486b4035c5ac52679eb3e1a034c925ccdd20deb.
21 files changed, 1828 insertions, 50 deletions
diff --git a/jstests/multiVersion/replset_primary_updater1.js b/jstests/multiVersion/replset_primary_updater1.js new file mode 100644 index 00000000000..426c8739ae1 --- /dev/null +++ b/jstests/multiVersion/replset_primary_updater1.js @@ -0,0 +1,201 @@ +/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of + * sync_source_feedback's updatePosition command and the OplogReader-based method of updating the + * primary's knowledge of the secondaries' sync progress. This is done through a modified version of + * the tags.js replicaset js test because tags.js was the test that helped me discover and resolve + * the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms + * running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the + * six replset_primary_updater tests, we run tags.js with each possible permutation of new and old + * nodes along this chain. + */ + +if (!_isWindows()) { +function myprint( x ) { + print( "tags output: " + x ); +} + + +load( './jstests/multiVersion/libs/multi_rs.js' ) +load( './jstests/libs/test_background_ops.js' ) + +var oldVersion = "2.4" +var newVersion = "latest" + +var nodes = { n1 : { binVersion : oldVersion }, + n2 : { binVersion : oldVersion }, + n3 : { binVersion : oldVersion }, + n4 : { binVersion : oldVersion }, + n5 : { binVersion : newVersion } } + +// Wait for a primary node... + +var num = 5; +var host = getHostName(); +var name = "dannentest"; + +var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} ); +var nodes = replTest.startSet(); +var port = replTest.ports; +replTest.initiate({_id : name, members : + [ + {_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}}, + {_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}}, + {_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}}, + {_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}}, + {_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}}, + ], + settings : { + getLastErrorModes : { + "2 dc and 3 server" : {"dc" : 2, "server" : 3}, + "1 and 2" : {"server" : 1} + } + }}); + +var master = replTest.getMaster(); +// make everyone catch up before reconfig +replTest.awaitReplication(); + +var config = master.getDB("local").system.replset.findOne(); + +printjson(config); +var modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); + +config.version++; +config.members[1].priority = 1.5; +config.members[2].priority = 2; +modes["3 or 4"] = {"sf" : 1}; +modes["3 and 4"] = {"sf" : 2}; +modes["1 and 2"]["2"] = 1; +modes["2"] = {"2" : 1} + +try { + master.getDB("admin").runCommand({replSetReconfig : config}); +} +catch(e) { + myprint(e); +} + +replTest.awaitReplication(); + +myprint("primary should now be 2"); +master = replTest.getMaster(); +config = master.getDB("local").system.replset.findOne(); +printjson(config); + +modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); +assert.eq(modes["3 or 4"]["sf"], 1); +assert.eq(modes["3 and 4"]["sf"], 2); + +myprint("bridging"); +replTest.bridge(); +myprint("bridge 1"); +replTest.partition(0, 3); +myprint("bridge 2"); +replTest.partition(0, 4); +myprint("bridge 3"); +replTest.partition(1, 3); +myprint("bridge 4"); +replTest.partition(1, 4); +myprint("bridge 5"); +replTest.partition(2, 3); +myprint("bridge 6"); +replTest.partition(2, 4); +myprint("bridge 7"); +replTest.partition(3, 4); +myprint("done bridging"); + +myprint("paritions: [0-1-2-0] [3] [4]") +myprint("test1"); +myprint("2 should be primary"); +master = replTest.getMaster(); + +printjson(master.getDB("admin").runCommand({replSetGetStatus:1})); + +var timeout = 20000; + +master.getDB("foo").bar.insert({x:1}); +var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(1,4); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test3"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(3,4); + +myprint("partitions: [0-4-3] [0-1-2-0]"); +myprint("31004 should sync from 31001 (31026)"); +myprint("31003 should sync from 31004 (31024)"); +myprint("test4"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("non-existent w"); +result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout}); +printjson(result); +assert.eq(result.code, 14830); +assert.eq(result.ok, 0); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test two on the primary"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test5"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +replTest.unPartition(1,3); + +replTest.partition(2, 0); +replTest.partition(2, 1); +replTest.stop(2); + +myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads"); +master = replTest.getMaster(); + +myprint("test6"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.stopSet(); +myprint("\n\ntags.js SUCCESS\n\n"); + +} diff --git a/jstests/multiVersion/replset_primary_updater2.js b/jstests/multiVersion/replset_primary_updater2.js new file mode 100644 index 00000000000..bb4f240e2c7 --- /dev/null +++ b/jstests/multiVersion/replset_primary_updater2.js @@ -0,0 +1,201 @@ +/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of + * sync_source_feedback's updatePosition command and the OplogReader-based method of updating the + * primary's knowledge of the secondaries' sync progress. This is done through a modified version of + * the tags.js replicaset js test because tags.js was the test that helped me discover and resolve + * the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms + * running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the + * six replset_primary_updater tests, we run tags.js with each possible permutation of new and old + * nodes along this chain. + */ + +if (!_isWindows()) { +function myprint( x ) { + print( "tags output: " + x ); +} + + +load( './jstests/multiVersion/libs/multi_rs.js' ) +load( './jstests/libs/test_background_ops.js' ) + +var oldVersion = "2.4" +var newVersion = "latest" + +var nodes = { n1 : { binVersion : oldVersion }, + n2 : { binVersion : newVersion }, + n3 : { binVersion : oldVersion }, + n4 : { binVersion : oldVersion }, + n5 : { binVersion : oldVersion } } + +// Wait for a primary node... + +var num = 5; +var host = getHostName(); +var name = "dannentest"; + +var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} ); +var nodes = replTest.startSet(); +var port = replTest.ports; +replTest.initiate({_id : name, members : + [ + {_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}}, + {_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}}, + {_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}}, + {_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}}, + {_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}}, + ], + settings : { + getLastErrorModes : { + "2 dc and 3 server" : {"dc" : 2, "server" : 3}, + "1 and 2" : {"server" : 1} + } + }}); + +var master = replTest.getMaster(); +// make everyone catch up before reconfig +replTest.awaitReplication(); + +var config = master.getDB("local").system.replset.findOne(); + +printjson(config); +var modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); + +config.version++; +config.members[1].priority = 1.5; +config.members[2].priority = 2; +modes["3 or 4"] = {"sf" : 1}; +modes["3 and 4"] = {"sf" : 2}; +modes["1 and 2"]["2"] = 1; +modes["2"] = {"2" : 1} + +try { + master.getDB("admin").runCommand({replSetReconfig : config}); +} +catch(e) { + myprint(e); +} + +replTest.awaitReplication(); + +myprint("primary should now be 2"); +master = replTest.getMaster(); +config = master.getDB("local").system.replset.findOne(); +printjson(config); + +modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); +assert.eq(modes["3 or 4"]["sf"], 1); +assert.eq(modes["3 and 4"]["sf"], 2); + +myprint("bridging"); +replTest.bridge(); +myprint("bridge 1"); +replTest.partition(0, 3); +myprint("bridge 2"); +replTest.partition(0, 4); +myprint("bridge 3"); +replTest.partition(1, 3); +myprint("bridge 4"); +replTest.partition(1, 4); +myprint("bridge 5"); +replTest.partition(2, 3); +myprint("bridge 6"); +replTest.partition(2, 4); +myprint("bridge 7"); +replTest.partition(3, 4); +myprint("done bridging"); + +myprint("paritions: [0-1-2-0] [3] [4]") +myprint("test1"); +myprint("2 should be primary"); +master = replTest.getMaster(); + +printjson(master.getDB("admin").runCommand({replSetGetStatus:1})); + +var timeout = 20000; + +master.getDB("foo").bar.insert({x:1}); +var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(1,4); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test3"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(3,4); + +myprint("partitions: [0-4-3] [0-1-2-0]"); +myprint("31004 should sync from 31001 (31026)"); +myprint("31003 should sync from 31004 (31024)"); +myprint("test4"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("non-existent w"); +result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout}); +printjson(result); +assert.eq(result.code, 14830); +assert.eq(result.ok, 0); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test two on the primary"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test5"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +replTest.unPartition(1,3); + +replTest.partition(2, 0); +replTest.partition(2, 1); +replTest.stop(2); + +myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads"); +master = replTest.getMaster(); + +myprint("test6"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.stopSet(); +myprint("\n\ntags.js SUCCESS\n\n"); + +} diff --git a/jstests/multiVersion/replset_primary_updater3.js b/jstests/multiVersion/replset_primary_updater3.js new file mode 100644 index 00000000000..1022cd43886 --- /dev/null +++ b/jstests/multiVersion/replset_primary_updater3.js @@ -0,0 +1,201 @@ +/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of + * sync_source_feedback's updatePosition command and the OplogReader-based method of updating the + * primary's knowledge of the secondaries' sync progress. This is done through a modified version of + * the tags.js replicaset js test because tags.js was the test that helped me discover and resolve + * the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms + * running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the + * six replset_primary_updater tests, we run tags.js with each possible permutation of new and old + * nodes along this chain. + */ + +if (!_isWindows()) { +function myprint( x ) { + print( "tags output: " + x ); +} + + +load( './jstests/multiVersion/libs/multi_rs.js' ) +load( './jstests/libs/test_background_ops.js' ) + +var oldVersion = "2.4" +var newVersion = "latest" + +var nodes = { n1 : { binVersion : oldVersion }, + n2 : { binVersion : oldVersion }, + n3 : { binVersion : newVersion }, + n4 : { binVersion : oldVersion }, + n5 : { binVersion : oldVersion } } + +// Wait for a primary node... + +var num = 5; +var host = getHostName(); +var name = "dannentest"; + +var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} ); +var nodes = replTest.startSet(); +var port = replTest.ports; +replTest.initiate({_id : name, members : + [ + {_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}}, + {_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}}, + {_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}}, + {_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}}, + {_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}}, + ], + settings : { + getLastErrorModes : { + "2 dc and 3 server" : {"dc" : 2, "server" : 3}, + "1 and 2" : {"server" : 1} + } + }}); + +var master = replTest.getMaster(); +// make everyone catch up before reconfig +replTest.awaitReplication(); + +var config = master.getDB("local").system.replset.findOne(); + +printjson(config); +var modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); + +config.version++; +config.members[1].priority = 1.5; +config.members[2].priority = 2; +modes["3 or 4"] = {"sf" : 1}; +modes["3 and 4"] = {"sf" : 2}; +modes["1 and 2"]["2"] = 1; +modes["2"] = {"2" : 1} + +try { + master.getDB("admin").runCommand({replSetReconfig : config}); +} +catch(e) { + myprint(e); +} + +replTest.awaitReplication(); + +myprint("primary should now be 2"); +master = replTest.getMaster(); +config = master.getDB("local").system.replset.findOne(); +printjson(config); + +modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); +assert.eq(modes["3 or 4"]["sf"], 1); +assert.eq(modes["3 and 4"]["sf"], 2); + +myprint("bridging"); +replTest.bridge(); +myprint("bridge 1"); +replTest.partition(0, 3); +myprint("bridge 2"); +replTest.partition(0, 4); +myprint("bridge 3"); +replTest.partition(1, 3); +myprint("bridge 4"); +replTest.partition(1, 4); +myprint("bridge 5"); +replTest.partition(2, 3); +myprint("bridge 6"); +replTest.partition(2, 4); +myprint("bridge 7"); +replTest.partition(3, 4); +myprint("done bridging"); + +myprint("paritions: [0-1-2-0] [3] [4]") +myprint("test1"); +myprint("2 should be primary"); +master = replTest.getMaster(); + +printjson(master.getDB("admin").runCommand({replSetGetStatus:1})); + +var timeout = 20000; + +master.getDB("foo").bar.insert({x:1}); +var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(1,4); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test3"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(3,4); + +myprint("partitions: [0-4-3] [0-1-2-0]"); +myprint("31004 should sync from 31001 (31026)"); +myprint("31003 should sync from 31004 (31024)"); +myprint("test4"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("non-existent w"); +result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout}); +printjson(result); +assert.eq(result.code, 14830); +assert.eq(result.ok, 0); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test two on the primary"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test5"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +replTest.unPartition(1,3); + +replTest.partition(2, 0); +replTest.partition(2, 1); +replTest.stop(2); + +myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads"); +master = replTest.getMaster(); + +myprint("test6"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.stopSet(); +myprint("\n\ntags.js SUCCESS\n\n"); + +} diff --git a/jstests/multiVersion/replset_primary_updater4.js b/jstests/multiVersion/replset_primary_updater4.js new file mode 100644 index 00000000000..b2d6c16ea42 --- /dev/null +++ b/jstests/multiVersion/replset_primary_updater4.js @@ -0,0 +1,201 @@ +/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of + * sync_source_feedback's updatePosition command and the OplogReader-based method of updating the + * primary's knowledge of the secondaries' sync progress. This is done through a modified version of + * the tags.js replicaset js test because tags.js was the test that helped me discover and resolve + * the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms + * running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the + * six replset_primary_updater tests, we run tags.js with each possible permutation of new and old + * nodes along this chain. + */ + +if (!_isWindows()) { +function myprint( x ) { + print( "tags output: " + x ); +} + + +load( './jstests/multiVersion/libs/multi_rs.js' ) +load( './jstests/libs/test_background_ops.js' ) + +var oldVersion = "2.4" +var newVersion = "latest" + +var nodes = { n1 : { binVersion : oldVersion }, + n2 : { binVersion : newVersion }, + n3 : { binVersion : oldVersion }, + n4 : { binVersion : oldVersion }, + n5 : { binVersion : newVersion } } + +// Wait for a primary node... + +var num = 5; +var host = getHostName(); +var name = "dannentest"; + +var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} ); +var nodes = replTest.startSet(); +var port = replTest.ports; +replTest.initiate({_id : name, members : + [ + {_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}}, + {_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}}, + {_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}}, + {_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}}, + {_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}}, + ], + settings : { + getLastErrorModes : { + "2 dc and 3 server" : {"dc" : 2, "server" : 3}, + "1 and 2" : {"server" : 1} + } + }}); + +var master = replTest.getMaster(); +// make everyone catch up before reconfig +replTest.awaitReplication(); + +var config = master.getDB("local").system.replset.findOne(); + +printjson(config); +var modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); + +config.version++; +config.members[1].priority = 1.5; +config.members[2].priority = 2; +modes["3 or 4"] = {"sf" : 1}; +modes["3 and 4"] = {"sf" : 2}; +modes["1 and 2"]["2"] = 1; +modes["2"] = {"2" : 1} + +try { + master.getDB("admin").runCommand({replSetReconfig : config}); +} +catch(e) { + myprint(e); +} + +replTest.awaitReplication(); + +myprint("primary should now be 2"); +master = replTest.getMaster(); +config = master.getDB("local").system.replset.findOne(); +printjson(config); + +modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); +assert.eq(modes["3 or 4"]["sf"], 1); +assert.eq(modes["3 and 4"]["sf"], 2); + +myprint("bridging"); +replTest.bridge(); +myprint("bridge 1"); +replTest.partition(0, 3); +myprint("bridge 2"); +replTest.partition(0, 4); +myprint("bridge 3"); +replTest.partition(1, 3); +myprint("bridge 4"); +replTest.partition(1, 4); +myprint("bridge 5"); +replTest.partition(2, 3); +myprint("bridge 6"); +replTest.partition(2, 4); +myprint("bridge 7"); +replTest.partition(3, 4); +myprint("done bridging"); + +myprint("paritions: [0-1-2-0] [3] [4]") +myprint("test1"); +myprint("2 should be primary"); +master = replTest.getMaster(); + +printjson(master.getDB("admin").runCommand({replSetGetStatus:1})); + +var timeout = 20000; + +master.getDB("foo").bar.insert({x:1}); +var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(1,4); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test3"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(3,4); + +myprint("partitions: [0-4-3] [0-1-2-0]"); +myprint("31004 should sync from 31001 (31026)"); +myprint("31003 should sync from 31004 (31024)"); +myprint("test4"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("non-existent w"); +result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout}); +printjson(result); +assert.eq(result.code, 14830); +assert.eq(result.ok, 0); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test two on the primary"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test5"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +replTest.unPartition(1,3); + +replTest.partition(2, 0); +replTest.partition(2, 1); +replTest.stop(2); + +myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads"); +master = replTest.getMaster(); + +myprint("test6"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.stopSet(); +myprint("\n\ntags.js SUCCESS\n\n"); + +} diff --git a/jstests/multiVersion/replset_primary_updater5.js b/jstests/multiVersion/replset_primary_updater5.js new file mode 100644 index 00000000000..55e0379b704 --- /dev/null +++ b/jstests/multiVersion/replset_primary_updater5.js @@ -0,0 +1,201 @@ +/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of + * sync_source_feedback's updatePosition command and the OplogReader-based method of updating the + * primary's knowledge of the secondaries' sync progress. This is done through a modified version of + * the tags.js replicaset js test because tags.js was the test that helped me discover and resolve + * the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms + * running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the + * six replset_primary_updater tests, we run tags.js with each possible permutation of new and old + * nodes along this chain. + */ + +if (!_isWindows()) { +function myprint( x ) { + print( "tags output: " + x ); +} + + +load( './jstests/multiVersion/libs/multi_rs.js' ) +load( './jstests/libs/test_background_ops.js' ) + +var oldVersion = "2.4" +var newVersion = "latest" + +var nodes = { n1 : { binVersion : oldVersion }, + n2 : { binVersion : oldVersion }, + n3 : { binVersion : newVersion }, + n4 : { binVersion : oldVersion }, + n5 : { binVersion : newVersion } } + +// Wait for a primary node... + +var num = 5; +var host = getHostName(); +var name = "dannentest"; + +var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} ); +var nodes = replTest.startSet(); +var port = replTest.ports; +replTest.initiate({_id : name, members : + [ + {_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}}, + {_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}}, + {_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}}, + {_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}}, + {_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}}, + ], + settings : { + getLastErrorModes : { + "2 dc and 3 server" : {"dc" : 2, "server" : 3}, + "1 and 2" : {"server" : 1} + } + }}); + +var master = replTest.getMaster(); +// make everyone catch up before reconfig +replTest.awaitReplication(); + +var config = master.getDB("local").system.replset.findOne(); + +printjson(config); +var modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); + +config.version++; +config.members[1].priority = 1.5; +config.members[2].priority = 2; +modes["3 or 4"] = {"sf" : 1}; +modes["3 and 4"] = {"sf" : 2}; +modes["1 and 2"]["2"] = 1; +modes["2"] = {"2" : 1} + +try { + master.getDB("admin").runCommand({replSetReconfig : config}); +} +catch(e) { + myprint(e); +} + +replTest.awaitReplication(); + +myprint("primary should now be 2"); +master = replTest.getMaster(); +config = master.getDB("local").system.replset.findOne(); +printjson(config); + +modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); +assert.eq(modes["3 or 4"]["sf"], 1); +assert.eq(modes["3 and 4"]["sf"], 2); + +myprint("bridging"); +replTest.bridge(); +myprint("bridge 1"); +replTest.partition(0, 3); +myprint("bridge 2"); +replTest.partition(0, 4); +myprint("bridge 3"); +replTest.partition(1, 3); +myprint("bridge 4"); +replTest.partition(1, 4); +myprint("bridge 5"); +replTest.partition(2, 3); +myprint("bridge 6"); +replTest.partition(2, 4); +myprint("bridge 7"); +replTest.partition(3, 4); +myprint("done bridging"); + +myprint("paritions: [0-1-2-0] [3] [4]") +myprint("test1"); +myprint("2 should be primary"); +master = replTest.getMaster(); + +printjson(master.getDB("admin").runCommand({replSetGetStatus:1})); + +var timeout = 20000; + +master.getDB("foo").bar.insert({x:1}); +var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(1,4); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test3"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(3,4); + +myprint("partitions: [0-4-3] [0-1-2-0]"); +myprint("31004 should sync from 31001 (31026)"); +myprint("31003 should sync from 31004 (31024)"); +myprint("test4"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("non-existent w"); +result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout}); +printjson(result); +assert.eq(result.code, 14830); +assert.eq(result.ok, 0); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test two on the primary"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test5"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +replTest.unPartition(1,3); + +replTest.partition(2, 0); +replTest.partition(2, 1); +replTest.stop(2); + +myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads"); +master = replTest.getMaster(); + +myprint("test6"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.stopSet(); +myprint("\n\ntags.js SUCCESS\n\n"); + +} diff --git a/jstests/multiVersion/replset_primary_updater6.js b/jstests/multiVersion/replset_primary_updater6.js new file mode 100644 index 00000000000..54a8062e274 --- /dev/null +++ b/jstests/multiVersion/replset_primary_updater6.js @@ -0,0 +1,201 @@ +/* SERVER-6071 This test (and the other replset_primary_updater tests) check cross-compatibility of + * sync_source_feedback's updatePosition command and the OplogReader-based method of updating the + * primary's knowledge of the secondaries' sync progress. This is done through a modified version of + * the tags.js replicaset js test because tags.js was the test that helped me discover and resolve + * the largest number of bugs when creating the updatePosition command. In tags.js, a chain forms + * running from member 4 to member 1 to member 2 (nodes n5, n2, and n3, respectively). Between the + * six replset_primary_updater tests, we run tags.js with each possible permutation of new and old + * nodes along this chain. + */ + +if (!_isWindows()) { +function myprint( x ) { + print( "tags output: " + x ); +} + + +load( './jstests/multiVersion/libs/multi_rs.js' ) +load( './jstests/libs/test_background_ops.js' ) + +var oldVersion = "2.4" +var newVersion = "latest" + +var nodes = { n1 : { binVersion : oldVersion }, + n2 : { binVersion : newVersion }, + n3 : { binVersion : newVersion }, + n4 : { binVersion : oldVersion }, + n5 : { binVersion : oldVersion } } + +// Wait for a primary node... + +var num = 5; +var host = getHostName(); +var name = "dannentest"; + +var replTest = new ReplSetTest( {name: name, nodes: nodes, startPort:31000} ); +var nodes = replTest.startSet(); +var port = replTest.ports; +replTest.initiate({_id : name, members : + [ + {_id:0, host : host+":"+port[0], tags : {"server" : "0", "dc" : "ny", "ny" : "1", "rack" : "ny.rk1"}}, + {_id:1, host : host+":"+port[1], tags : {"server" : "1", "dc" : "ny", "ny" : "2", "rack" : "ny.rk1"}}, + {_id:2, host : host+":"+port[2], tags : {"server" : "2", "dc" : "ny", "ny" : "3", "rack" : "ny.rk2", "2" : "this"}}, + {_id:3, host : host+":"+port[3], tags : {"server" : "3", "dc" : "sf", "sf" : "1", "rack" : "sf.rk1"}}, + {_id:4, host : host+":"+port[4], tags : {"server" : "4", "dc" : "sf", "sf" : "2", "rack" : "sf.rk2"}}, + ], + settings : { + getLastErrorModes : { + "2 dc and 3 server" : {"dc" : 2, "server" : 3}, + "1 and 2" : {"server" : 1} + } + }}); + +var master = replTest.getMaster(); +// make everyone catch up before reconfig +replTest.awaitReplication(); + +var config = master.getDB("local").system.replset.findOne(); + +printjson(config); +var modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); + +config.version++; +config.members[1].priority = 1.5; +config.members[2].priority = 2; +modes["3 or 4"] = {"sf" : 1}; +modes["3 and 4"] = {"sf" : 2}; +modes["1 and 2"]["2"] = 1; +modes["2"] = {"2" : 1} + +try { + master.getDB("admin").runCommand({replSetReconfig : config}); +} +catch(e) { + myprint(e); +} + +replTest.awaitReplication(); + +myprint("primary should now be 2"); +master = replTest.getMaster(); +config = master.getDB("local").system.replset.findOne(); +printjson(config); + +modes = config.settings.getLastErrorModes; +assert.eq(typeof modes, "object"); +assert.eq(modes["2 dc and 3 server"].dc, 2); +assert.eq(modes["2 dc and 3 server"].server, 3); +assert.eq(modes["1 and 2"]["server"], 1); +assert.eq(modes["3 or 4"]["sf"], 1); +assert.eq(modes["3 and 4"]["sf"], 2); + +myprint("bridging"); +replTest.bridge(); +myprint("bridge 1"); +replTest.partition(0, 3); +myprint("bridge 2"); +replTest.partition(0, 4); +myprint("bridge 3"); +replTest.partition(1, 3); +myprint("bridge 4"); +replTest.partition(1, 4); +myprint("bridge 5"); +replTest.partition(2, 3); +myprint("bridge 6"); +replTest.partition(2, 4); +myprint("bridge 7"); +replTest.partition(3, 4); +myprint("done bridging"); + +myprint("paritions: [0-1-2-0] [3] [4]") +myprint("test1"); +myprint("2 should be primary"); +master = replTest.getMaster(); + +printjson(master.getDB("admin").runCommand({replSetGetStatus:1})); + +var timeout = 20000; + +master.getDB("foo").bar.insert({x:1}); +var result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(1,4); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 or 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("partitions: [1-4] [0-1-2-0] [3]"); +myprint("test3"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.unPartition(3,4); + +myprint("partitions: [0-4-3] [0-1-2-0]"); +myprint("31004 should sync from 31001 (31026)"); +myprint("31003 should sync from 31004 (31024)"); +myprint("test4"); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("non-existent w"); +result = master.getDB("foo").runCommand({getLastError:1,w:"blahblah",wtimeout:timeout}); +printjson(result); +assert.eq(result.code, 14830); +assert.eq(result.ok, 0); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test two on the primary"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"1 and 2",wtimeout:0}); +printjson(result); +assert.eq(result.err, null); + +myprint("test5"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2 dc and 3 server",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +replTest.unPartition(1,3); + +replTest.partition(2, 0); +replTest.partition(2, 1); +replTest.stop(2); + +myprint("1 must become primary here because otherwise the other members will take too long timing out their old sync threads"); +master = replTest.getMaster(); + +myprint("test6"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"3 and 4",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, null); + +myprint("test mode 2"); +master.getDB("foo").bar.insert({x:1}); +result = master.getDB("foo").runCommand({getLastError:1,w:"2",wtimeout:timeout}); +printjson(result); +assert.eq(result.err, "timeout"); + +replTest.stopSet(); +myprint("\n\ntags.js SUCCESS\n\n"); + +} diff --git a/jstests/replsets/sync2.js b/jstests/replsets/sync2.js index 57577c6c46b..6890d85dc98 100644 --- a/jstests/replsets/sync2.js +++ b/jstests/replsets/sync2.js @@ -35,9 +35,8 @@ replTest.partition(4,3); jsTestLog("Checking that ops still replicate correctly"); master.getDB("foo").bar.insert({x:1}); -replTest.awaitReplication(); -var result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:1000}); +var result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:30000}); assert.eq(null, result.err, tojson(result)); // 4 is connected to 3 @@ -45,9 +44,8 @@ replTest.partition(4,2); replTest.unPartition(4,3); master.getDB("foo").bar.insert({x:1}); -replTest.awaitReplication(); -result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:1000}); +result = master.getDB("admin").runCommand({getLastError:1,w:5,wtimeout:30000}); assert.eq(null, result.err, tojson(result)); -replTest.stopSet();
\ No newline at end of file +replTest.stopSet(); diff --git a/src/mongo/SConscript b/src/mongo/SConscript index c8fc0509532..fba3bed588c 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -458,6 +458,7 @@ serverOnlyFiles = [ "db/curop.cpp", "db/repl/master_slave.cpp", "db/repl/finding_start_cursor.cpp", "db/repl/sync.cpp", + "db/repl/sync_source_feedback.cpp", "db/repl/oplogreader.cpp", "db/repl/replication_server_status.cpp", "db/repl/repl_reads_ok.cpp", diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt index 0cf6c33223f..532f1a41564 100644 --- a/src/mongo/db/auth/action_types.txt +++ b/src/mongo/db/auth/action_types.txt @@ -72,6 +72,7 @@ "replSetReconfig", "replSetStepDown", "replSetSyncFrom", +"replSetUpdatePosition", "resync", "serverStatus", "setParameter", diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 6e9e7538f5a..f453664262d 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -22,6 +22,7 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/rs.h" #include "mongo/util/fail_point_service.h" #include "mongo/base/counter.h" #include "mongo/db/stats/timer_stats.h" @@ -79,7 +80,6 @@ namespace replset { _assumingPrimary(false), _currentSyncTarget(NULL), _oplogMarkerTarget(NULL), - _oplogMarker(true /* doHandshake */), _consumedOpTime(0, 0) { } @@ -122,6 +122,7 @@ namespace replset { void BackgroundSync::notifierThread() { Client::initThread("rsSyncNotifier"); replLocalAuth(); + theReplSet->syncSourceFeedback.go(); while (!inShutdown()) { bool clearTarget = false; @@ -168,37 +169,44 @@ namespace replset { } void BackgroundSync::markOplog() { - LOG(3) << "replset markOplog: " << _consumedOpTime << " " << theReplSet->lastOpTimeWritten << rsLog; + LOG(3) << "replset markOplog: " << _consumedOpTime << " " + << theReplSet->lastOpTimeWritten << rsLog; - if (!hasCursor()) { - sleepsecs(1); - return; + if (theReplSet->syncSourceFeedback.supportsUpdater()) { + theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten); + _consumedOpTime = theReplSet->lastOpTimeWritten; } + else { + if (!hasCursor()) { + return; + } - if (!_oplogMarker.moreInCurrentBatch()) { - _oplogMarker.more(); - } + if (!theReplSet->syncSourceFeedback.moreInCurrentBatch()) { + theReplSet->syncSourceFeedback.more(); + } - if (!_oplogMarker.more()) { - _oplogMarker.tailCheck(); - sleepsecs(1); - return; - } + if (!theReplSet->syncSourceFeedback.more()) { + theReplSet->syncSourceFeedback.tailCheck(); + return; + } - // if this member has written the op at optime T, we want to nextSafe up to and including T - while (_consumedOpTime < theReplSet->lastOpTimeWritten && _oplogMarker.more()) { - BSONObj temp = _oplogMarker.nextSafe(); - _consumedOpTime = temp["ts"]._opTime(); - } + // if this member has written the op at optime T + // we want to nextSafe up to and including T + while (_consumedOpTime < theReplSet->lastOpTimeWritten + && theReplSet->syncSourceFeedback.more()) { + BSONObj temp = theReplSet->syncSourceFeedback.nextSafe(); + _consumedOpTime = temp["ts"]._opTime(); + } - // call more() to signal the sync target that we've synced T - _oplogMarker.more(); + // call more() to signal the sync target that we've synced T + theReplSet->syncSourceFeedback.more(); + } } bool BackgroundSync::hasCursor() { { // prevent writers from blocking readers during fsync - SimpleMutex::scoped_lock fsynclk(filesLockedFsync); + SimpleMutex::scoped_lock fsynclk(filesLockedFsync); // we don't need the local write lock yet, but it's needed by OplogReader::connect // so we take it preemptively to avoid deadlocking. Lock::DBWrite lk("local"); @@ -210,25 +218,23 @@ namespace replset { return false; } - log() << "replset setting oplog notifier to " << _currentSyncTarget->fullName() << rsLog; + log() << "replset setting oplog notifier to " + << _currentSyncTarget->fullName() << rsLog; _oplogMarkerTarget = _currentSyncTarget; - _oplogMarker.resetConnection(); - - if (!_oplogMarker.connect(_oplogMarkerTarget->fullName())) { - LOG(1) << "replset could not connect to " << _oplogMarkerTarget->fullName() << rsLog; + if (!theReplSet->syncSourceFeedback.connect(_oplogMarkerTarget)) { _oplogMarkerTarget = NULL; return false; } } } - - if (!_oplogMarker.haveCursor()) { + if (!theReplSet->syncSourceFeedback.haveCursor()) { BSONObj fields = BSON("ts" << 1); - _oplogMarker.tailingQueryGTE(rsoplog, theReplSet->lastOpTimeWritten, &fields); + theReplSet->syncSourceFeedback.tailingQueryGTE(rsoplog, + theReplSet->lastOpTimeWritten, &fields); } - return _oplogMarker.haveCursor(); + return theReplSet->syncSourceFeedback.haveCursor(); } void BackgroundSync::producerThread() { @@ -525,6 +531,8 @@ namespace replset { _currentSyncTarget = target; } + theReplSet->syncSourceFeedback.connect(target); + return; } diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 1f4cb3b1530..02d359cfee5 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -86,7 +86,6 @@ namespace replset { boost::mutex _lastOpMutex; const Member* _oplogMarkerTarget; - OplogReader _oplogMarker; // not locked, only used by notifier thread OpTime _consumedOpTime; // not locked, only used by notifier thread BackgroundSync(); diff --git a/src/mongo/db/repl/health.cpp b/src/mongo/db/repl/health.cpp index 7dd9f528d27..4dc089de971 100644 --- a/src/mongo/db/repl/health.cpp +++ b/src/mongo/db/repl/health.cpp @@ -348,6 +348,15 @@ namespace mongo { return 0; } + Member* ReplSetImpl::getMutableMember(unsigned id) { + if( _self && id == _self->id() ) return _self; + + for( Member *m = head(); m; m = m->next() ) + if( m->id() == id ) + return m; + return 0; + } + Member* ReplSetImpl::findByName(const std::string& hostname) const { if (_self && hostname == _self->fullName()) { return _self; diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 5ee5a4628b4..ea55ada5c11 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -140,7 +140,7 @@ namespace mongo { if( conn() == 0 ) { _conn = shared_ptr<DBClientConnection>(new DBClientConnection(false, 0, - 30 /* tcp timeout */)); + tcp_timeout)); string errmsg; if ( !_conn->connect(hostName.c_str(), errmsg) || (AuthorizationManager::isAuthEnabled() && !replAuthenticate(_conn.get(), true)) ) { diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 801b9a4854e..345dc2be9ee 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -50,6 +50,9 @@ namespace mongo { return findOne(ns, Query().sort(reverseNaturalObj)); } + /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ + static const int tcp_timeout = 30; + /* ok to call if already connected */ bool connect(const std::string& hostname); diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index 1f9abce2b39..8e765f7b5a2 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -24,6 +24,7 @@ #include "mongo/db/cmdline.h" #include "mongo/db/commands.h" #include "mongo/db/repl/health.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_server_status.h" // replSettings #include "mongo/db/repl/rs.h" #include "mongo/db/repl/rs_config.h" @@ -396,4 +397,42 @@ namespace mongo { } } cmdReplSetSyncFrom; + class CmdReplSetUpdatePosition: public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "internal"; + } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::replSetUpdatePosition); + out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions)); + } + CmdReplSetUpdatePosition() : ReplSetCommand("replSetUpdatePosition") { } + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, + BSONObjBuilder& result, bool fromRepl) { + if (!check(errmsg, result)) + return false; + + if (cmdObj.hasField("handshake")) { + // we have received a handshake, not an update message + // handshakes are done here to ensure the receiving end supports the update command + cc().gotHandshake(cmdObj["handshake"].embeddedObject()); + // if we aren't primary, pass the handshake along + if (!theReplSet->isPrimary() && theReplSet->syncSourceFeedback.supportsUpdater()) { + theReplSet->syncSourceFeedback.forwardSlaveHandshake(); + } + return true; + } + + uassert(16888, "optimes field should be an array with an object for each secondary", + cmdObj["optimes"].type() == Array); + BSONArray newTimes = BSONArray(cmdObj["optimes"].Obj()); + updateSlaveLocations(newTimes); + + return true; + } + } cmdReplSetUpdatePosition; + } diff --git a/src/mongo/db/repl/rs.cpp b/src/mongo/db/repl/rs.cpp index 85d4c7a5b6d..4d7a635d621 100644 --- a/src/mongo/db/repl/rs.cpp +++ b/src/mongo/db/repl/rs.cpp @@ -928,8 +928,11 @@ namespace mongo { void ReplSetImpl::registerSlave(const BSONObj& rid, const int memberId) { // To prevent race conditions with clearing the cache at reconfig time, // we lock the replset mutex here. - lock lk(this); - ghost->associateSlave(rid, memberId); + { + lock lk(this); + ghost->associateSlave(rid, memberId); + } + syncSourceFeedback.associateMember(rid, memberId); } class ReplIndexPrefetch : public ServerParameter { diff --git a/src/mongo/db/repl/rs.h b/src/mongo/db/repl/rs.h index e1414b6603d..46ee578dc22 100644 --- a/src/mongo/db/repl/rs.h +++ b/src/mongo/db/repl/rs.h @@ -26,6 +26,7 @@ #include "mongo/db/repl/rs_exception.h" #include "mongo/db/repl/rs_member.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/sync_source_feedback.h" #include "mongo/util/concurrency/list.h" #include "mongo/util/concurrency/msg.h" #include "mongo/util/concurrency/thread_pool.h" @@ -341,6 +342,8 @@ namespace mongo { StateBox box; + SyncSourceFeedback syncSourceFeedback; + OpTime lastOpTimeWritten; long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork" bool forceSyncFrom(const string& host, string& errmsg, BSONObjBuilder& result); @@ -505,6 +508,7 @@ namespace mongo { Member* head() const { return _members.head(); } public: const Member* findById(unsigned id) const; + Member* getMutableMember(unsigned id); Member* findByName(const std::string& hostname) const; private: void _getTargets(list<Target>&, int &configVersion); diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp new file mode 100644 index 00000000000..21a7bffd3c4 --- /dev/null +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -0,0 +1,310 @@ +/** +* Copyright (C) 2013 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "mongo/db/repl/sync_source_feedback.h" + +#include "mongo/client/constants.h" +#include "mongo/client/dbclientcursor.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/security_key.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/rs.h" // theReplSet + +namespace mongo { + + // used in replAuthenticate + static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); + + void SyncSourceFeedback::associateMember(const BSONObj& id, const int memberId) { + const OID rid = id["_id"].OID(); + boost::unique_lock<boost::mutex> lock(_mtx); + _handshakeNeeded = true; + _members[rid] = theReplSet->getMutableMember(memberId); + _cond.notify_all(); + } + + bool SyncSourceFeedback::replAuthenticate(bool skipAuthCheck) { + + if (!AuthorizationManager::isAuthEnabled()) { + return true; + } + if (!skipAuthCheck && !cc().getAuthorizationSession()->hasInternalAuthorization()) { + log() << "replauthenticate: requires internal authorization, failing" << endl; + return false; + } + + if (isInternalAuthSet()) { + return authenticateInternalUser(_connection.get()); + } + + BSONObj user; + { + Client::ReadContext ctxt("local."); + if(!Helpers::findOne("local.system.users", userReplQuery, user) || + // try the first user in local + !Helpers::getSingleton("local.system.users", user)) { + log() << "replauthenticate: no user in local.system.users to use" + << "for authentication" << endl; + return false; + } + } + std::string u = user.getStringField("user"); + std::string p = user.getStringField("pwd"); + massert(16889, "bad user object? [1]", !u.empty()); + massert(16887, "bad user object? [2]", !p.empty()); + + std::string err; + + if( !_connection->auth("local", u.c_str(), p.c_str(), err, false) ) { + log() << "replauthenticate: can't authenticate to master server, user:" << u << endl; + return false; + } + + return true; + } + + void SyncSourceFeedback::ensureMe() { + string myname = getHostName(); + { + Client::WriteContext ctx("local"); + // local.me is an identifier for a server for getLastError w:2+ + if (!Helpers::getSingleton("local.me", _me) || + !_me.hasField("host") || + _me["host"].String() != myname) { + + // clean out local.me + Helpers::emptyCollection("local.me"); + + // repopulate + BSONObjBuilder b; + b.appendOID("_id", 0, true); + b.append("host", myname); + _me = b.obj(); + Helpers::putSingleton("local.me", _me); + } + } + } + + bool SyncSourceFeedback::replHandshake() { + ensureMe(); + + // handshake for us + BSONObjBuilder cmd; + cmd.append("replSetUpdatePosition", 1); + BSONObjBuilder sub (cmd.subobjStart("handshake")); + sub.appendAs(_me["_id"], "handshake"); + sub.append("member", theReplSet->selfId()); + sub.append("config", theReplSet->myConfig().asBson()); + sub.doneFast(); + + BSONObj res; + try { + if (!_connection->runCommand("admin", cmd.obj(), res)) { + if (res["errmsg"].str().find("no such cmd") != std::string::npos) { + _supportsUpdater = false; + } + resetConnection(); + return false; + } + else { + _supportsUpdater = true; + } + } + catch (const DBException& e) { + log() << "SyncSourceFeedback error sending handshake: " << e.what() << endl; + resetConnection(); + return false; + } + + // handshakes for those connected to us + { + for (OIDMemberMap::iterator itr = _members.begin(); + itr != _members.end(); ++itr) { + BSONObjBuilder slaveCmd; + slaveCmd.append("replSetUpdatePosition", 1); + // outer handshake indicates this is a handshake command + // inner is needed as part of the structure to be passed to gotHandshake + BSONObjBuilder slaveSub (slaveCmd.subobjStart("handshake")); + slaveSub.append("handshake", itr->first); + slaveSub.append("member", itr->second->id()); + slaveSub.append("config", itr->second->config().asBson()); + slaveSub.doneFast(); + BSONObj slaveRes; + try { + if (!_connection->runCommand("admin", slaveCmd.obj(), slaveRes)) { + resetConnection(); + return false; + } + } + catch (const DBException& e) { + log() << "SyncSourceFeedback error sending chained handshakes: " + << e.what() << endl; + resetConnection(); + return false; + } + } + } + return true; + } + + bool SyncSourceFeedback::_connect(const std::string& hostName) { + if (hasConnection()) { + return true; + } + _connection.reset(new DBClientConnection(false, 0, OplogReader::tcp_timeout)); + string errmsg; + if (!_connection->connect(hostName.c_str(), errmsg) || + (AuthorizationManager::isAuthEnabled() && !replAuthenticate(true))) { + resetConnection(); + log() << "repl: " << errmsg << endl; + return false; + } + + if (!replHandshake()) { + if (!supportsUpdater()) { + return connectOplogReader(hostName); + } + return false; + } + return true; + } + + bool SyncSourceFeedback::connect(const Member* target) { + boost::unique_lock<boost::mutex> lock(_connmtx); + resetConnection(); + resetOplogReaderConnection(); + _syncTarget = target; + if (_connect(target->fullName())) { + if (!supportsUpdater()) { + return true; + } + } + return false; + } + + void SyncSourceFeedback::forwardSlaveHandshake() { + boost::unique_lock<boost::mutex> lock(_mtx); + _handshakeNeeded = true; + } + + void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) { + boost::unique_lock<boost::mutex> lock(_mtx); + LOG(1) << "replSet last: " << _slaveMap[rid].toString() << " to " << ot.toString() << endl; + // only update if ot is newer than what we have already + if (ot > _slaveMap[rid]) { + _slaveMap[rid] = ot; + _positionChanged = true; + LOG(2) << "now last is " << _slaveMap[rid].toString() << endl; + _cond.notify_all(); + } + } + + bool SyncSourceFeedback::updateUpstream() { + if (theReplSet->isPrimary()) { + // primary has no one to update to + return true; + } + BSONObjBuilder cmd; + cmd.append("replSetUpdatePosition", 1); + // create an array containing objects each member connected to us and for ourself + BSONArrayBuilder array (cmd.subarrayStart("optimes")); + OID myID = _me["_id"].OID(); + { + for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin(); + itr != _slaveMap.end(); ++itr) { + BSONObjBuilder entry(array.subobjStart()); + entry.append("_id", itr->first); + entry.append("optime", itr->second); + if (itr->first == myID) { + entry.append("config", theReplSet->myConfig().asBson()); + } + else { + entry.append("config", _members[itr->first]->config().asBson()); + } + entry.doneFast(); + } + } + array.done(); + BSONObj res; + + bool ok; + try { + ok = _connection->runCommand("admin", cmd.obj(), res); + } + catch (const DBException& e) { + log() << "SyncSourceFeedback error sending update: " << e.what() << endl; + resetConnection(); + return false; + } + if (!ok) { + log() << "SyncSourceFeedback error sending update, response: " << res.toString() <<endl; + resetConnection(); + return false; + } + return true; + } + + void SyncSourceFeedback::run() { + Client::initThread("SyncSourceFeedbackThread"); + while (true) { + { + boost::unique_lock<boost::mutex> lock(_mtx); + while (!_positionChanged && !_handshakeNeeded) { + _cond.wait(lock); + } + boost::unique_lock<boost::mutex> conlock(_connmtx); + const Member* target = replset::BackgroundSync::get()->getSyncTarget(); + if (_syncTarget != target) { + resetConnection(); + _syncTarget = target; + } + if (!hasConnection()) { + // fix connection if need be + if (!target) { + continue; + } + if (!_connect(target->fullName())) { + continue; + } + else if (!supportsUpdater()) { + _handshakeNeeded = false; + _positionChanged = false; + continue; + } + } + if (_handshakeNeeded) { + if (!replHandshake()) { + _handshakeNeeded = true; + continue; + } + else { + _handshakeNeeded = false; + } + } + if (_positionChanged) { + if (!updateUpstream()) { + _positionChanged = true; + continue; + } + else { + _positionChanged = false; + } + } + } + } + } +} diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h new file mode 100644 index 00000000000..72073841029 --- /dev/null +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -0,0 +1,171 @@ +/** +* Copyright (C) 2013 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + + +#pragma once + +#include "mongo/db/repl/oplogreader.h" +#include "mongo/util/background.h" + + +namespace mongo { + + class Member; + + class SyncSourceFeedback : public BackgroundJob { + public: + SyncSourceFeedback() : BackgroundJob(false /*don't selfdelete*/), + _syncTarget(NULL), + _oplogReader(new OplogReader(true)), + _supportsUpdater(true) {} + + ~SyncSourceFeedback() { + delete _oplogReader; + } + + /// Adds an entry to _member for a secondary that has connected to us. + void associateMember(const BSONObj& id, const int memberId); + + /// Passes handshake up the replication chain, upon receiving a handshake. + void forwardSlaveHandshake(); + + void updateSelfInMap(const OpTime& ot) { + ensureMe(); + updateMap(_me["_id"].OID(), ot); + } + + /// Connect to sync target and create OplogReader if needed. + bool connect(const Member* target); + + void resetConnection() { + _connection.reset(); + } + + void resetOplogReaderConnection() { + _oplogReader->resetConnection(); + } + + /// Used extensively in bgsync, to see if we need to use the OplogReader syncing method. + bool supportsUpdater() const { + // oplogReader will be NULL if new updater is supported + //boost::unique_lock<boost::mutex> lock(_mtx); + return _supportsUpdater; + } + + /// Updates the _slaveMap to be forwarded to the sync target. + void updateMap(const mongo::OID& rid, const OpTime& ot); + + std::string name() const { return "SyncSourceFeedbackThread"; } + + /// Loops forever, passing updates when they are present. + void run(); + + /* The below methods just fall through to OplogReader and are only used when our sync target + * does not support the update command. + */ + bool connectOplogReader(const std::string& hostName) { + return _oplogReader->connect(hostName); + } + + bool connect(const BSONObj& rid, const int from, const string& to) { + return _oplogReader->connect(rid, from, to); + } + + void ghostQueryGTE(const char *ns, OpTime t) { + _oplogReader->ghostQueryGTE(ns, t); + } + + bool haveCursor() { + return _oplogReader->haveCursor(); + } + + bool more() { + return _oplogReader->more(); + } + + bool moreInCurrentBatch() { + return _oplogReader->moreInCurrentBatch(); + } + + BSONObj nextSafe() { + return _oplogReader->nextSafe(); + } + + void tailCheck() { + _oplogReader->tailCheck(); + } + + void tailingQueryGTE(const char *ns, OpTime t, const BSONObj* fields=0) { + _oplogReader->tailingQueryGTE(ns, t, fields); + } + + private: + /// Ensures local.me is populated and populates it if not. + void ensureMe(); + + /* Generally replAuthenticate will only be called within system threads to fully + * authenticate connections to other nodes in the cluster that will be used as part of + * internal operations. If a user-initiated action results in needing to call + * replAuthenticate, you can call it with skipAuthCheck set to false. Only do this if you + * are certain that the proper auth checks have already run to ensure that the user is + * authorized to do everything that this connection will be used for! + */ + bool replAuthenticate(bool skipAuthCheck); + + /* Sends initialization information to our sync target, also determines whether or not they + * support the updater command. + */ + bool replHandshake(); + + /* Inform the sync target of our current position in the oplog, as well as the positions + * of all secondaries chained through us. + */ + bool updateUpstream(); + + bool hasConnection() { + return _connection.get(); + } + + /// Connect to sync target and create OplogReader if needed. + bool _connect(const std::string& hostName); + + // stores our OID to be passed along in commands + BSONObj _me; + // the member we are currently syncing from + const Member* _syncTarget; + // holds the oplogReader for use when we fall back to old style updates + OplogReader* _oplogReader; + // our connection to our sync target + boost::scoped_ptr<DBClientConnection> _connection; + // tracks whether we are in fallback mode or not + bool _supportsUpdater; + // protects connection + boost::mutex _connmtx; + // protects cond and maps and the indicator bools + boost::mutex _mtx; + // contains the most recent optime of each member syncing to us + map<mongo::OID, OpTime> _slaveMap; + typedef map<mongo::OID, Member*> OIDMemberMap; + // contains a pointer to each member, which we can look up by oid + OIDMemberMap _members; + // used to alert our thread of changes which need to be passed up the chain + boost::condition _cond; + // used to indicate a position change which has not yet been pushed along + bool _positionChanged; + // used to indicate a connection change which has not yet been shook on + bool _handshakeNeeded; + }; +} diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp index da5b04a91e4..a5e6a64bf4d 100644 --- a/src/mongo/db/repl/write_concern.cpp +++ b/src/mongo/db/repl/write_concern.cpp @@ -126,20 +126,22 @@ namespace mongo { scoped_lock mylk(_mutex); - _slaves[ident] = last; - _dirty = true; + if (last > _slaves[ident]) { + _slaves[ident] = last; + _dirty = true; - if (theReplSet && theReplSet->isPrimary()) { - theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last); - } + if (theReplSet && theReplSet->isPrimary()) { + theReplSet->ghost->updateSlave(ident.obj["_id"].OID(), last); + } - if ( ! _started ) { - // start background thread here since we definitely need it - _started = true; - go(); + if ( ! _started ) { + // start background thread here since we definitely need it + _started = true; + go(); + } + + _threadsWaitingForReplication.notify_all(); } - - _threadsWaitingForReplication.notify_all(); } bool opReplicatedEnough( OpTime op , BSONElement w ) { @@ -257,6 +259,28 @@ namespace mongo { const char * SlaveTracking::NS = "local.slaves"; + // parse optimes from replUpdatePositionCommand and pass them to SyncSourceFeedback + void updateSlaveLocations(BSONArray optimes) { + BSONForEach(elem, optimes) { + BSONObj entry = elem.Obj(); + BSONObj id = BSON("_id" << entry["_id"].OID()); + OpTime ot = entry["optime"]._opTime(); + BSONObj config = entry["config"].Obj(); + + // update locally + slaveTracking.update(id, config, "local.oplog.rs", ot); + if (theReplSet && !theReplSet->isPrimary()) { + // pass along if we are not primary + theReplSet->syncSourceFeedback.updateMap(entry["_id"].OID(), ot); + // for to be backwards compatible + theReplSet->ghost->send(boost::bind(&GhostSync::percolate, + theReplSet->ghost, + id, + ot)); + } + } + } + void updateSlaveLocation( CurOp& curop, const char * ns , OpTime lastOp ) { if ( lastOp.isNull() ) return; diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h index f80df329a4b..8a6d695fb9c 100644 --- a/src/mongo/db/repl/write_concern.h +++ b/src/mongo/db/repl/write_concern.h @@ -29,6 +29,8 @@ namespace mongo { class CurOp; + void updateSlaveLocations(BSONArray optimes); + void updateSlaveLocation( CurOp& curop, const char * oplog_ns , OpTime lastOp ); /** @return true if op has made it to w servers */ |