diff options
Diffstat (limited to 'src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go')
-rw-r--r-- | src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go | 2090 |
1 files changed, 2090 insertions, 0 deletions
diff --git a/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go b/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go new file mode 100644 index 00000000000..54ec8676226 --- /dev/null +++ b/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go @@ -0,0 +1,2090 @@ +// mgo - MongoDB driver for Go +// +// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net> +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package mgo_test + +import ( + "fmt" + "io" + "net" + "strings" + "sync" + "time" + + . "gopkg.in/check.v1" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" +) + +func (s *S) TestNewSession(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + // Do a dummy operation to wait for connection. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"_id": 1}) + c.Assert(err, IsNil) + + // Tweak safety and query settings to ensure other has copied those. + session.SetSafe(nil) + session.SetBatch(-1) + other := session.New() + defer other.Close() + session.SetSafe(&mgo.Safe{}) + + // Clone was copied while session was unsafe, so no errors. + otherColl := other.DB("mydb").C("mycoll") + err = otherColl.Insert(M{"_id": 1}) + c.Assert(err, IsNil) + + // Original session was made safe again. + err = coll.Insert(M{"_id": 1}) + c.Assert(err, NotNil) + + // With New(), each session has its own socket now. + stats := mgo.GetStats() + c.Assert(stats.MasterConns, Equals, 2) + c.Assert(stats.SocketsInUse, Equals, 2) + + // Ensure query parameters were cloned. + err = otherColl.Insert(M{"_id": 2}) + c.Assert(err, IsNil) + + // Ping the database to ensure the nonce has been received already. + c.Assert(other.Ping(), IsNil) + + mgo.ResetStats() + + iter := otherColl.Find(M{}).Iter() + c.Assert(err, IsNil) + + m := M{} + ok := iter.Next(m) + c.Assert(ok, Equals, true) + err = iter.Close() + c.Assert(err, IsNil) + + // If Batch(-1) is in effect, a single document must have been received. + stats = mgo.GetStats() + c.Assert(stats.ReceivedDocs, Equals, 1) +} + +func (s *S) TestCloneSession(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + // Do a dummy operation to wait for connection. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"_id": 1}) + c.Assert(err, IsNil) + + // Tweak safety and query settings to ensure clone is copying those. + session.SetSafe(nil) + session.SetBatch(-1) + clone := session.Clone() + defer clone.Close() + session.SetSafe(&mgo.Safe{}) + + // Clone was copied while session was unsafe, so no errors. + cloneColl := clone.DB("mydb").C("mycoll") + err = cloneColl.Insert(M{"_id": 1}) + c.Assert(err, IsNil) + + // Original session was made safe again. + err = coll.Insert(M{"_id": 1}) + c.Assert(err, NotNil) + + // With Clone(), same socket is shared between sessions now. + stats := mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 1) + c.Assert(stats.SocketRefs, Equals, 2) + + // Refreshing one of them should let the original socket go, + // while preserving the safety settings. + clone.Refresh() + err = cloneColl.Insert(M{"_id": 1}) + c.Assert(err, IsNil) + + // Must have used another connection now. + stats = mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 2) + c.Assert(stats.SocketRefs, Equals, 2) + + // Ensure query parameters were cloned. + err = cloneColl.Insert(M{"_id": 2}) + c.Assert(err, IsNil) + + // Ping the database to ensure the nonce has been received already. + c.Assert(clone.Ping(), IsNil) + + mgo.ResetStats() + + iter := cloneColl.Find(M{}).Iter() + c.Assert(err, IsNil) + + m := M{} + ok := iter.Next(m) + c.Assert(ok, Equals, true) + err = iter.Close() + c.Assert(err, IsNil) + + // If Batch(-1) is in effect, a single document must have been received. + stats = mgo.GetStats() + c.Assert(stats.ReceivedDocs, Equals, 1) +} + +func (s *S) TestModeStrong(c *C) { + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, false) + session.SetMode(mgo.Strong, false) + + c.Assert(session.Mode(), Equals, mgo.Strong) + + result := M{} + cmd := session.DB("admin").C("$cmd") + err = cmd.Find(M{"ismaster": 1}).One(&result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, true) + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + // Wait since the sync also uses sockets. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + stats := mgo.GetStats() + c.Assert(stats.MasterConns, Equals, 1) + c.Assert(stats.SlaveConns, Equals, 2) + c.Assert(stats.SocketsInUse, Equals, 1) + + session.SetMode(mgo.Strong, true) + + stats = mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 0) +} + +func (s *S) TestModeMonotonic(c *C) { + // Must necessarily connect to a slave, otherwise the + // master connection will be available first. + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, false) + + c.Assert(session.Mode(), Equals, mgo.Monotonic) + + var result struct{ IsMaster bool } + cmd := session.DB("admin").C("$cmd") + err = cmd.Find(M{"ismaster": 1}).One(&result) + c.Assert(err, IsNil) + c.Assert(result.IsMaster, Equals, false) + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + err = cmd.Find(M{"ismaster": 1}).One(&result) + c.Assert(err, IsNil) + c.Assert(result.IsMaster, Equals, true) + + // Wait since the sync also uses sockets. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + stats := mgo.GetStats() + c.Assert(stats.MasterConns, Equals, 1) + c.Assert(stats.SlaveConns, Equals, 2) + c.Assert(stats.SocketsInUse, Equals, 2) + + session.SetMode(mgo.Monotonic, true) + + stats = mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 0) +} + +func (s *S) TestModeMonotonicAfterStrong(c *C) { + // Test that a strong session shifting to a monotonic + // one preserves the socket untouched. + + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + // Insert something to force a connection to the master. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + session.SetMode(mgo.Monotonic, false) + + // Wait since the sync also uses sockets. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + // Master socket should still be reserved. + stats := mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 1) + + // Confirm it's the master even though it's Monotonic by now. + result := M{} + cmd := session.DB("admin").C("$cmd") + err = cmd.Find(M{"ismaster": 1}).One(&result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, true) +} + +func (s *S) TestModeStrongAfterMonotonic(c *C) { + // Test that shifting from Monotonic to Strong while + // using a slave socket will keep the socket reserved + // until the master socket is necessary, so that no + // switch over occurs unless it's actually necessary. + + // Must necessarily connect to a slave, otherwise the + // master connection will be available first. + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, false) + + // Ensure we're talking to a slave, and reserve the socket. + result := M{} + err = session.Run("ismaster", &result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, false) + + // Switch to a Strong session. + session.SetMode(mgo.Strong, false) + + // Wait since the sync also uses sockets. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + // Slave socket should still be reserved. + stats := mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 1) + + // But any operation will switch it to the master. + result = M{} + err = session.Run("ismaster", &result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, true) +} + +func (s *S) TestModeMonotonicWriteOnIteration(c *C) { + // Must necessarily connect to a slave, otherwise the + // master connection will be available first. + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, false) + + c.Assert(session.Mode(), Equals, mgo.Monotonic) + + coll1 := session.DB("mydb").C("mycoll1") + coll2 := session.DB("mydb").C("mycoll2") + + ns := []int{40, 41, 42, 43, 44, 45, 46} + for _, n := range ns { + err := coll1.Insert(M{"n": n}) + c.Assert(err, IsNil) + } + + // Release master so we can grab a slave again. + session.Refresh() + + // Wait until synchronization is done. + for { + n, err := coll1.Count() + c.Assert(err, IsNil) + if n == len(ns) { + break + } + } + + iter := coll1.Find(nil).Batch(2).Iter() + i := 0 + m := M{} + for iter.Next(&m) { + i++ + if i > 3 { + err := coll2.Insert(M{"n": 47 + i}) + c.Assert(err, IsNil) + } + } + c.Assert(i, Equals, len(ns)) +} + +func (s *S) TestModeEventual(c *C) { + // Must necessarily connect to a slave, otherwise the + // master connection will be available first. + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Eventual, false) + + c.Assert(session.Mode(), Equals, mgo.Eventual) + + result := M{} + err = session.Run("ismaster", &result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, false) + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + result = M{} + err = session.Run("ismaster", &result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, false) + + // Wait since the sync also uses sockets. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + stats := mgo.GetStats() + c.Assert(stats.MasterConns, Equals, 1) + c.Assert(stats.SlaveConns, Equals, 2) + c.Assert(stats.SocketsInUse, Equals, 0) +} + +func (s *S) TestModeEventualAfterStrong(c *C) { + // Test that a strong session shifting to an eventual + // one preserves the socket untouched. + + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + // Insert something to force a connection to the master. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + session.SetMode(mgo.Eventual, false) + + // Wait since the sync also uses sockets. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + // Master socket should still be reserved. + stats := mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 1) + + // Confirm it's the master even though it's Eventual by now. + result := M{} + cmd := session.DB("admin").C("$cmd") + err = cmd.Find(M{"ismaster": 1}).One(&result) + c.Assert(err, IsNil) + c.Assert(result["ismaster"], Equals, true) + + session.SetMode(mgo.Eventual, true) + + stats = mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 0) +} + +func (s *S) TestModeStrongFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + // With strong consistency, this will open a socket to the master. + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + + // Kill the master. + host := result.Host + s.Stop(host) + + // This must fail, since the connection was broken. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + // With strong consistency, it fails again until reset. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + session.Refresh() + + // Now we should be able to talk to the new master. + // Increase the timeout since this may take quite a while. + session.SetSyncTimeout(3 * time.Minute) + + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(result.Host, Not(Equals), host) + + // Insert some data to confirm it's indeed a master. + err = session.DB("mydb").C("mycoll").Insert(M{"n": 42}) + c.Assert(err, IsNil) +} + +func (s *S) TestModePrimaryHiccup(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + // With strong consistency, this will open a socket to the master. + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + + // Establish a few extra sessions to create spare sockets to + // the master. This increases a bit the chances of getting an + // incorrect cached socket. + var sessions []*mgo.Session + for i := 0; i < 20; i++ { + sessions = append(sessions, session.Copy()) + err = sessions[len(sessions)-1].Run("serverStatus", result) + c.Assert(err, IsNil) + } + for i := range sessions { + sessions[i].Close() + } + + // Kill the master, but bring it back immediatelly. + host := result.Host + s.Stop(host) + s.StartAll() + + // This must fail, since the connection was broken. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + // With strong consistency, it fails again until reset. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + session.Refresh() + + // Now we should be able to talk to the new master. + // Increase the timeout since this may take quite a while. + session.SetSyncTimeout(3 * time.Minute) + + // Insert some data to confirm it's indeed a master. + err = session.DB("mydb").C("mycoll").Insert(M{"n": 42}) + c.Assert(err, IsNil) +} + +func (s *S) TestModeMonotonicFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, true) + + // Insert something to force a switch to the master. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + // Wait a bit for this to be synchronized to slaves. + time.Sleep(3 * time.Second) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + + // Kill the master. + host := result.Host + s.Stop(host) + + // This must fail, since the connection was broken. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + // With monotonic consistency, it fails again until reset. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + session.Refresh() + + // Now we should be able to talk to the new master. + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(result.Host, Not(Equals), host) +} + +func (s *S) TestModeMonotonicWithSlaveFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + ssresult := &struct{ Host string }{} + imresult := &struct{ IsMaster bool }{} + + // Figure the master while still using the strong session. + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + err = session.Run("isMaster", imresult) + c.Assert(err, IsNil) + master := ssresult.Host + c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master)) + + // Create new monotonic session with an explicit address to ensure + // a slave is synchronized before the master, otherwise a connection + // with the master may be used below for lack of other options. + var addr string + switch { + case strings.HasSuffix(ssresult.Host, ":40021"): + addr = "localhost:40022" + case strings.HasSuffix(ssresult.Host, ":40022"): + addr = "localhost:40021" + case strings.HasSuffix(ssresult.Host, ":40023"): + addr = "localhost:40021" + default: + c.Fatal("Unknown host: ", ssresult.Host) + } + + session, err = mgo.Dial(addr) + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, true) + + // Check the address of the socket associated with the monotonic session. + c.Log("Running serverStatus and isMaster with monotonic session") + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + err = session.Run("isMaster", imresult) + c.Assert(err, IsNil) + slave := ssresult.Host + c.Assert(imresult.IsMaster, Equals, false, Commentf("%s is not a slave", slave)) + + c.Assert(master, Not(Equals), slave) + + // Kill the master. + s.Stop(master) + + // Session must still be good, since we were talking to a slave. + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + + c.Assert(ssresult.Host, Equals, slave, + Commentf("Monotonic session moved from %s to %s", slave, ssresult.Host)) + + // If we try to insert something, it'll have to hold until the new + // master is available to move the connection, and work correctly. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + // Must now be talking to the new master. + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + err = session.Run("isMaster", imresult) + c.Assert(err, IsNil) + c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master)) + + // ... which is not the old one, since it's still dead. + c.Assert(ssresult.Host, Not(Equals), master) +} + +func (s *S) TestModeEventualFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + master := result.Host + + session.SetMode(mgo.Eventual, true) + + // Should connect to the master when needed. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + // Wait a bit for this to be synchronized to slaves. + time.Sleep(3 * time.Second) + + // Kill the master. + s.Stop(master) + + // Should still work, with the new master now. + coll = session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(result.Host, Not(Equals), master) +} + +func (s *S) TestModeSecondaryJustPrimary(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Secondary, true) + + err = session.Ping() + c.Assert(err, ErrorMatches, "no reachable servers") +} + +func (s *S) TestModeSecondaryPreferredJustPrimary(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.SecondaryPreferred, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) +} + +func (s *S) TestModeSecondaryPreferredFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + // Ensure secondaries are available for being picked up. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + session.SetMode(mgo.SecondaryPreferred, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(supvName(result.Host), Not(Equals), "rs1a") + secondary := result.Host + + // Should connect to the primary when needed. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + // Wait a bit for this to be synchronized to slaves. + time.Sleep(3 * time.Second) + + // Kill the primary. + s.Stop("localhost:40011") + + // It can still talk to the selected secondary. + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(result.Host, Equals, secondary) + + // But cannot speak to the primary until reset. + coll = session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, Equals, io.EOF) + + session.Refresh() + + // Can still talk to a secondary. + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(supvName(result.Host), Not(Equals), "rs1a") + + s.StartAll() + + // Should now be able to talk to the primary again. + coll = session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) +} + +func (s *S) TestModePrimaryPreferredFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.PrimaryPreferred, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(supvName(result.Host), Equals, "rs1a") + + // Kill the primary. + s.Stop("localhost:40011") + + // Should now fail as there was a primary socket in use already. + err = session.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + // Refresh so the reserved primary socket goes away. + session.Refresh() + + // Should be able to talk to the secondary. + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + + s.StartAll() + + // Should wait for the new primary to become available. + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + // And should use the new primary in general, as it is preferred. + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(supvName(result.Host), Equals, "rs1a") +} + +func (s *S) TestModePrimaryFallover(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + session.SetSyncTimeout(3 * time.Second) + + session.SetMode(mgo.Primary, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(supvName(result.Host), Equals, "rs1a") + + // Kill the primary. + s.Stop("localhost:40011") + + session.Refresh() + + err = session.Ping() + c.Assert(err, ErrorMatches, "no reachable servers") +} + +func (s *S) TestModeSecondary(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Secondary, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(supvName(result.Host), Not(Equals), "rs1a") + secondary := result.Host + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(result.Host, Equals, secondary) +} + +func (s *S) TestPreserveSocketCountOnSync(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + stats := mgo.GetStats() + for stats.SocketsAlive != 3 { + c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive) + stats = mgo.GetStats() + time.Sleep(5e8) + } + + c.Assert(stats.SocketsAlive, Equals, 3) + + // Kill the master (with rs1, 'a' is always the master). + s.Stop("localhost:40011") + + // Wait for the logic to run for a bit and bring it back. + startedAll := make(chan bool) + go func() { + time.Sleep(5e9) + s.StartAll() + startedAll <- true + }() + + // Do not allow the test to return before the goroutine above is done. + defer func() { + <-startedAll + }() + + // Do an action to kick the resync logic in, and also to + // wait until the cluster recognizes the server is back. + result := struct{ Ok bool }{} + err = session.Run("getLastError", &result) + c.Assert(err, IsNil) + c.Assert(result.Ok, Equals, true) + + for i := 0; i != 20; i++ { + stats = mgo.GetStats() + if stats.SocketsAlive == 3 { + break + } + c.Logf("Waiting for 3 sockets alive, have %d", stats.SocketsAlive) + time.Sleep(5e8) + } + + // Ensure the number of sockets is preserved after syncing. + stats = mgo.GetStats() + c.Assert(stats.SocketsAlive, Equals, 3) + c.Assert(stats.SocketsInUse, Equals, 1) + c.Assert(stats.SocketRefs, Equals, 1) +} + +// Connect to the master of a deployment with a single server, +// run an insert, and then ensure the insert worked and that a +// single connection was established. +func (s *S) TestTopologySyncWithSingleMaster(c *C) { + // Use hostname here rather than IP, to make things trickier. + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1, "b": 2}) + c.Assert(err, IsNil) + + // One connection used for discovery. Master socket recycled for + // insert. Socket is reserved after insert. + stats := mgo.GetStats() + c.Assert(stats.MasterConns, Equals, 1) + c.Assert(stats.SlaveConns, Equals, 0) + c.Assert(stats.SocketsInUse, Equals, 1) + + // Refresh session and socket must be released. + session.Refresh() + stats = mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 0) +} + +func (s *S) TestTopologySyncWithSlaveSeed(c *C) { + // That's supposed to be a slave. Must run discovery + // and find out master to insert successfully. + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + coll.Insert(M{"a": 1, "b": 2}) + + result := struct{ Ok bool }{} + err = session.Run("getLastError", &result) + c.Assert(err, IsNil) + c.Assert(result.Ok, Equals, true) + + // One connection to each during discovery. Master + // socket recycled for insert. + stats := mgo.GetStats() + c.Assert(stats.MasterConns, Equals, 1) + c.Assert(stats.SlaveConns, Equals, 2) + + // Only one socket reference alive, in the master socket owned + // by the above session. + c.Assert(stats.SocketsInUse, Equals, 1) + + // Refresh it, and it must be gone. + session.Refresh() + stats = mgo.GetStats() + c.Assert(stats.SocketsInUse, Equals, 0) +} + +func (s *S) TestSyncTimeout(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + s.Stop("localhost:40001") + + timeout := 3 * time.Second + session.SetSyncTimeout(timeout) + started := time.Now() + + // Do something. + result := struct{ Ok bool }{} + err = session.Run("getLastError", &result) + c.Assert(err, ErrorMatches, "no reachable servers") + c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true) + c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true) +} + +func (s *S) TestDialWithTimeout(c *C) { + if *fast { + c.Skip("-fast") + } + + timeout := 2 * time.Second + started := time.Now() + + // 40009 isn't used by the test servers. + session, err := mgo.DialWithTimeout("localhost:40009", timeout) + if session != nil { + session.Close() + } + c.Assert(err, ErrorMatches, "no reachable servers") + c.Assert(session, IsNil) + c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true) + c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true) +} + +func (s *S) TestSocketTimeout(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + s.Freeze("localhost:40001") + + timeout := 3 * time.Second + session.SetSocketTimeout(timeout) + started := time.Now() + + // Do something. + result := struct{ Ok bool }{} + err = session.Run("getLastError", &result) + c.Assert(err, ErrorMatches, ".*: i/o timeout") + c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true) + c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true) +} + +func (s *S) TestSocketTimeoutOnDial(c *C) { + if *fast { + c.Skip("-fast") + } + + timeout := 1 * time.Second + + defer mgo.HackSyncSocketTimeout(timeout)() + + s.Freeze("localhost:40001") + + started := time.Now() + + session, err := mgo.DialWithTimeout("localhost:40001", timeout) + c.Assert(err, ErrorMatches, "no reachable servers") + c.Assert(session, IsNil) + + c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true) + c.Assert(started.After(time.Now().Add(-20*time.Second)), Equals, true) +} + +func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + timeout := 2 * time.Second + session.SetSocketTimeout(timeout) + + // Do something that relies on the timeout and works. + c.Assert(session.Ping(), IsNil) + + // Freeze and wait for the timeout to go by. + s.Freeze("localhost:40001") + time.Sleep(timeout + 500*time.Millisecond) + s.Thaw("localhost:40001") + + // Do something again. The timeout above should not have killed + // the socket as there was nothing to be done. + c.Assert(session.Ping(), IsNil) +} + +func (s *S) TestDialWithReplicaSetName(c *C) { + seedLists := [][]string{ + // rs1 primary and rs2 primary + []string{"localhost:40011", "localhost:40021"}, + // rs1 primary and rs2 secondary + []string{"localhost:40011", "localhost:40022"}, + // rs1 secondary and rs2 primary + []string{"localhost:40012", "localhost:40021"}, + // rs1 secondary and rs2 secondary + []string{"localhost:40012", "localhost:40022"}, + } + + rs2Members := []string{":40021", ":40022", ":40023"} + + verifySyncedServers := func(session *mgo.Session, numServers int) { + // wait for the server(s) to be synced + for len(session.LiveServers()) != numServers { + c.Log("Waiting for cluster sync to finish...") + time.Sleep(5e8) + } + + // ensure none of the rs2 set members are communicated with + for _, addr := range session.LiveServers() { + for _, rs2Member := range rs2Members { + c.Assert(strings.HasSuffix(addr, rs2Member), Equals, false) + } + } + } + + // only communication with rs1 members is expected + for _, seedList := range seedLists { + info := mgo.DialInfo{ + Addrs: seedList, + Timeout: 5 * time.Second, + ReplicaSetName: "rs1", + } + + session, err := mgo.DialWithInfo(&info) + c.Assert(err, IsNil) + verifySyncedServers(session, 3) + session.Close() + + info.Direct = true + session, err = mgo.DialWithInfo(&info) + c.Assert(err, IsNil) + verifySyncedServers(session, 1) + session.Close() + + connectionUrl := fmt.Sprintf("mongodb://%v/?replicaSet=rs1", strings.Join(seedList, ",")) + session, err = mgo.Dial(connectionUrl) + c.Assert(err, IsNil) + verifySyncedServers(session, 3) + session.Close() + + connectionUrl += "&connect=direct" + session, err = mgo.Dial(connectionUrl) + c.Assert(err, IsNil) + verifySyncedServers(session, 1) + session.Close() + } + +} + +func (s *S) TestDirect(c *C) { + session, err := mgo.Dial("localhost:40012?connect=direct") + c.Assert(err, IsNil) + defer session.Close() + + // We know that server is a slave. + session.SetMode(mgo.Monotonic, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true) + + stats := mgo.GetStats() + c.Assert(stats.SocketsAlive, Equals, 1) + c.Assert(stats.SocketsInUse, Equals, 1) + c.Assert(stats.SocketRefs, Equals, 1) + + // We've got no master, so it'll timeout. + session.SetSyncTimeout(5e8 * time.Nanosecond) + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"test": 1}) + c.Assert(err, ErrorMatches, "no reachable servers") + + // Writing to the local database is okay. + coll = session.DB("local").C("mycoll") + defer coll.RemoveAll(nil) + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id}) + c.Assert(err, IsNil) + + // Data was stored in the right server. + n, err := coll.Find(M{"_id": id}).Count() + c.Assert(err, IsNil) + c.Assert(n, Equals, 1) + + // Server hasn't changed. + result.Host = "" + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true) +} + +func (s *S) TestDirectToUnknownStateMember(c *C) { + session, err := mgo.Dial("localhost:40041?connect=direct") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Monotonic, true) + + result := &struct{ Host string }{} + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true) + + // We've got no master, so it'll timeout. + session.SetSyncTimeout(5e8 * time.Nanosecond) + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"test": 1}) + c.Assert(err, ErrorMatches, "no reachable servers") + + // Slave is still reachable. + result.Host = "" + err = session.Run("serverStatus", result) + c.Assert(err, IsNil) + c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true) +} + +func (s *S) TestFailFast(c *C) { + info := mgo.DialInfo{ + Addrs: []string{"localhost:99999"}, + Timeout: 5 * time.Second, + FailFast: true, + } + + started := time.Now() + + _, err := mgo.DialWithInfo(&info) + c.Assert(err, ErrorMatches, "no reachable servers") + + c.Assert(started.After(time.Now().Add(-time.Second)), Equals, true) +} + +func (s *S) countQueries(c *C, server string) (n int) { + defer func() { c.Logf("Queries for %q: %d", server, n) }() + session, err := mgo.Dial(server + "?connect=direct") + c.Assert(err, IsNil) + defer session.Close() + session.SetMode(mgo.Monotonic, true) + var result struct { + OpCounters struct { + Query int + } + Metrics struct { + Commands struct{ Find struct{ Total int } } + } + } + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + if s.versionAtLeast(3, 2) { + return result.Metrics.Commands.Find.Total + } + return result.OpCounters.Query +} + +func (s *S) countCommands(c *C, server, commandName string) (n int) { + defer func() { c.Logf("Queries for %q: %d", server, n) }() + session, err := mgo.Dial(server + "?connect=direct") + c.Assert(err, IsNil) + defer session.Close() + session.SetMode(mgo.Monotonic, true) + var result struct { + Metrics struct { + Commands map[string]struct{ Total int } + } + } + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + return result.Metrics.Commands[commandName].Total +} + +func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) { + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + ssresult := &struct{ Host string }{} + imresult := &struct{ IsMaster bool }{} + + // Figure the master while still using the strong session. + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + err = session.Run("isMaster", imresult) + c.Assert(err, IsNil) + master := ssresult.Host + c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master)) + + // Ensure mongos is aware about the current topology. + s.Stop(":40201") + s.StartAll() + + mongos, err := mgo.Dial("localhost:40202") + c.Assert(err, IsNil) + defer mongos.Close() + + // Insert some data as otherwise 3.2+ doesn't seem to run the query at all. + err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1}) + c.Assert(err, IsNil) + + // Wait until all servers see the data. + for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} { + session, err := mgo.Dial(addr + "?connect=direct") + c.Assert(err, IsNil) + defer session.Close() + session.SetMode(mgo.Monotonic, true) + for i := 300; i >= 0; i-- { + n, err := session.DB("mydb").C("mycoll").Find(nil).Count() + c.Assert(err, IsNil) + if n == 1 { + break + } + if i == 0 { + c.Fatalf("Inserted data never reached " + addr) + } + time.Sleep(100 * time.Millisecond) + } + } + + // Collect op counters for everyone. + q21a := s.countQueries(c, "localhost:40021") + q22a := s.countQueries(c, "localhost:40022") + q23a := s.countQueries(c, "localhost:40023") + + // Do a SlaveOk query through MongoS + + mongos.SetMode(mgo.Monotonic, true) + + coll := mongos.DB("mydb").C("mycoll") + var result struct{ N int } + for i := 0; i != 5; i++ { + err = coll.Find(nil).One(&result) + c.Assert(err, IsNil) + c.Assert(result.N, Equals, 1) + } + + // Collect op counters for everyone again. + q21b := s.countQueries(c, "localhost:40021") + q22b := s.countQueries(c, "localhost:40022") + q23b := s.countQueries(c, "localhost:40023") + + var masterDelta, slaveDelta int + switch hostPort(master) { + case "40021": + masterDelta = q21b - q21a + slaveDelta = (q22b - q22a) + (q23b - q23a) + case "40022": + masterDelta = q22b - q22a + slaveDelta = (q21b - q21a) + (q23b - q23a) + case "40023": + masterDelta = q23b - q23a + slaveDelta = (q21b - q21a) + (q22b - q22a) + default: + c.Fatal("Uh?") + } + + c.Check(masterDelta, Equals, 0) // Just the counting itself. + c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above. +} + +func (s *S) TestSecondaryModeWithMongos(c *C) { + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + ssresult := &struct{ Host string }{} + imresult := &struct{ IsMaster bool }{} + + // Figure the master while still using the strong session. + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + err = session.Run("isMaster", imresult) + c.Assert(err, IsNil) + master := ssresult.Host + c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master)) + + // Ensure mongos is aware about the current topology. + s.Stop(":40201") + s.StartAll() + + mongos, err := mgo.Dial("localhost:40202") + c.Assert(err, IsNil) + defer mongos.Close() + + mongos.SetSyncTimeout(5 * time.Second) + + // Insert some data as otherwise 3.2+ doesn't seem to run the query at all. + err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1}) + c.Assert(err, IsNil) + + // Wait until all servers see the data. + for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} { + session, err := mgo.Dial(addr + "?connect=direct") + c.Assert(err, IsNil) + defer session.Close() + session.SetMode(mgo.Monotonic, true) + for i := 300; i >= 0; i-- { + n, err := session.DB("mydb").C("mycoll").Find(nil).Count() + c.Assert(err, IsNil) + if n == 1 { + break + } + if i == 0 { + c.Fatalf("Inserted data never reached " + addr) + } + time.Sleep(100 * time.Millisecond) + } + } + + // Collect op counters for everyone. + q21a := s.countQueries(c, "localhost:40021") + q22a := s.countQueries(c, "localhost:40022") + q23a := s.countQueries(c, "localhost:40023") + + // Do a Secondary query through MongoS + + mongos.SetMode(mgo.Secondary, true) + + coll := mongos.DB("mydb").C("mycoll") + var result struct{ N int } + for i := 0; i != 5; i++ { + err = coll.Find(nil).One(&result) + c.Assert(err, IsNil) + c.Assert(result.N, Equals, 1) + } + + // Collect op counters for everyone again. + q21b := s.countQueries(c, "localhost:40021") + q22b := s.countQueries(c, "localhost:40022") + q23b := s.countQueries(c, "localhost:40023") + + var masterDelta, slaveDelta int + switch hostPort(master) { + case "40021": + masterDelta = q21b - q21a + slaveDelta = (q22b - q22a) + (q23b - q23a) + case "40022": + masterDelta = q22b - q22a + slaveDelta = (q21b - q21a) + (q23b - q23a) + case "40023": + masterDelta = q23b - q23a + slaveDelta = (q21b - q21a) + (q22b - q22a) + default: + c.Fatal("Uh?") + } + + c.Check(masterDelta, Equals, 0) // Just the counting itself. + c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above. +} + +func (s *S) TestSecondaryModeWithMongosInsert(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40202") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Secondary, true) + session.SetSyncTimeout(4 * time.Second) + + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(M{"a": 1}) + c.Assert(err, IsNil) + + var result struct{ A int } + coll.Find(nil).One(&result) + c.Assert(result.A, Equals, 1) +} + + +func (s *S) TestRemovalOfClusterMember(c *C) { + if *fast { + c.Skip("-fast") + } + + master, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer master.Close() + + // Wait for cluster to fully sync up. + for i := 0; i < 10; i++ { + if len(master.LiveServers()) == 3 { + break + } + time.Sleep(5e8) + } + if len(master.LiveServers()) != 3 { + c.Fatalf("Test started with bad cluster state: %v", master.LiveServers()) + } + + result := &struct { + IsMaster bool + Me string + }{} + slave := master.Copy() + slave.SetMode(mgo.Monotonic, true) // Monotonic can hold a non-master socket persistently. + err = slave.Run("isMaster", result) + c.Assert(err, IsNil) + c.Assert(result.IsMaster, Equals, false) + slaveAddr := result.Me + + defer func() { + config := map[string]string{ + "40021": `{_id: 1, host: "127.0.0.1:40021", priority: 1, tags: {rs2: "a"}}`, + "40022": `{_id: 2, host: "127.0.0.1:40022", priority: 0, tags: {rs2: "b"}}`, + "40023": `{_id: 3, host: "127.0.0.1:40023", priority: 0, tags: {rs2: "c"}}`, + } + master.Refresh() + master.Run(bson.D{{"$eval", `rs.add(` + config[hostPort(slaveAddr)] + `)`}}, nil) + master.Close() + slave.Close() + + // Ensure suite syncs up with the changes before next test. + s.Stop(":40201") + s.StartAll() + time.Sleep(8 * time.Second) + // TODO Find a better way to find out when mongos is fully aware that all + // servers are up. Without that follow up tests that depend on mongos will + // break due to their expectation of things being in a working state. + }() + + c.Logf("========== Removing slave: %s ==========", slaveAddr) + + master.Run(bson.D{{"$eval", `rs.remove("` + slaveAddr + `")`}}, nil) + + master.Refresh() + + // Give the cluster a moment to catch up by doing a roundtrip to the master. + err = master.Ping() + c.Assert(err, IsNil) + + time.Sleep(3e9) + + // This must fail since the slave has been taken off the cluster. + err = slave.Ping() + c.Assert(err, NotNil) + + for i := 0; i < 15; i++ { + if len(master.LiveServers()) == 2 { + break + } + time.Sleep(time.Second) + } + live := master.LiveServers() + if len(live) != 2 { + c.Errorf("Removed server still considered live: %#s", live) + } + + c.Log("========== Test succeeded. ==========") +} + +func (s *S) TestPoolLimitSimple(c *C) { + for test := 0; test < 2; test++ { + var session *mgo.Session + var err error + if test == 0 { + session, err = mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + session.SetPoolLimit(1) + } else { + session, err = mgo.Dial("localhost:40001?maxPoolSize=1") + c.Assert(err, IsNil) + } + defer session.Close() + + // Put one socket in use. + c.Assert(session.Ping(), IsNil) + + done := make(chan time.Duration) + + // Now block trying to get another one due to the pool limit. + go func() { + copy := session.Copy() + defer copy.Close() + started := time.Now() + c.Check(copy.Ping(), IsNil) + done <- time.Now().Sub(started) + }() + + time.Sleep(300 * time.Millisecond) + + // Put the one socket back in the pool, freeing it for the copy. + session.Refresh() + delay := <-done + c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + } +} + +func (s *S) TestPoolLimitMany(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + stats := mgo.GetStats() + for stats.SocketsAlive != 3 { + c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive) + stats = mgo.GetStats() + time.Sleep(5e8) + } + + const poolLimit = 64 + session.SetPoolLimit(poolLimit) + + // Consume the whole limit for the master. + var master []*mgo.Session + for i := 0; i < poolLimit; i++ { + s := session.Copy() + defer s.Close() + c.Assert(s.Ping(), IsNil) + master = append(master, s) + } + + before := time.Now() + go func() { + time.Sleep(3e9) + master[0].Refresh() + }() + + // Then, a single ping must block, since it would need another + // connection to the master, over the limit. Once the goroutine + // above releases its socket, it should move on. + session.Ping() + delay := time.Now().Sub(before) + c.Assert(delay > 3e9, Equals, true) + c.Assert(delay < 6e9, Equals, true) +} + +func (s *S) TestSetModeEventualIterBug(c *C) { + session1, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session1.Close() + + session1.SetMode(mgo.Eventual, false) + + coll1 := session1.DB("mydb").C("mycoll") + + const N = 100 + for i := 0; i < N; i++ { + err = coll1.Insert(M{"_id": i}) + c.Assert(err, IsNil) + } + + c.Logf("Waiting until secondary syncs") + for { + n, err := coll1.Count() + c.Assert(err, IsNil) + if n == N { + c.Logf("Found all") + break + } + } + + session2, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session2.Close() + + session2.SetMode(mgo.Eventual, false) + + coll2 := session2.DB("mydb").C("mycoll") + + i := 0 + iter := coll2.Find(nil).Batch(10).Iter() + var result struct{} + for iter.Next(&result) { + i++ + } + c.Assert(iter.Close(), Equals, nil) + c.Assert(i, Equals, N) +} + +func (s *S) TestCustomDialOld(c *C) { + dials := make(chan bool, 16) + dial := func(addr net.Addr) (net.Conn, error) { + tcpaddr, ok := addr.(*net.TCPAddr) + if !ok { + return nil, fmt.Errorf("unexpected address type: %T", addr) + } + dials <- true + return net.DialTCP("tcp", nil, tcpaddr) + } + info := mgo.DialInfo{ + Addrs: []string{"localhost:40012"}, + Dial: dial, + } + + // Use hostname here rather than IP, to make things trickier. + session, err := mgo.DialWithInfo(&info) + c.Assert(err, IsNil) + defer session.Close() + + const N = 3 + for i := 0; i < N; i++ { + select { + case <-dials: + case <-time.After(5 * time.Second): + c.Fatalf("expected %d dials, got %d", N, i) + } + } + select { + case <-dials: + c.Fatalf("got more dials than expected") + case <-time.After(100 * time.Millisecond): + } +} + +func (s *S) TestCustomDialNew(c *C) { + dials := make(chan bool, 16) + dial := func(addr *mgo.ServerAddr) (net.Conn, error) { + dials <- true + if addr.TCPAddr().Port == 40012 { + c.Check(addr.String(), Equals, "localhost:40012") + } + return net.DialTCP("tcp", nil, addr.TCPAddr()) + } + info := mgo.DialInfo{ + Addrs: []string{"localhost:40012"}, + DialServer: dial, + } + + // Use hostname here rather than IP, to make things trickier. + session, err := mgo.DialWithInfo(&info) + c.Assert(err, IsNil) + defer session.Close() + + const N = 3 + for i := 0; i < N; i++ { + select { + case <-dials: + case <-time.After(5 * time.Second): + c.Fatalf("expected %d dials, got %d", N, i) + } + } + select { + case <-dials: + c.Fatalf("got more dials than expected") + case <-time.After(100 * time.Millisecond): + } +} + +func (s *S) TestPrimaryShutdownOnAuthShard(c *C) { + if *fast { + c.Skip("-fast") + } + + // Dial the shard. + session, err := mgo.Dial("localhost:40203") + c.Assert(err, IsNil) + defer session.Close() + + // Login and insert something to make it more realistic. + session.DB("admin").Login("root", "rapadura") + coll := session.DB("mydb").C("mycoll") + err = coll.Insert(bson.M{"n": 1}) + c.Assert(err, IsNil) + + // Dial the replica set to figure the master out. + rs, err := mgo.Dial("root:rapadura@localhost:40031") + c.Assert(err, IsNil) + defer rs.Close() + + // With strong consistency, this will open a socket to the master. + result := &struct{ Host string }{} + err = rs.Run("serverStatus", result) + c.Assert(err, IsNil) + + // Kill the master. + host := result.Host + s.Stop(host) + + // This must fail, since the connection was broken. + err = rs.Run("serverStatus", result) + c.Assert(err, Equals, io.EOF) + + // This won't work because the master just died. + err = coll.Insert(bson.M{"n": 2}) + c.Assert(err, NotNil) + + // Refresh session and wait for re-election. + session.Refresh() + for i := 0; i < 60; i++ { + err = coll.Insert(bson.M{"n": 3}) + if err == nil { + break + } + c.Logf("Waiting for replica set to elect a new master. Last error: %v", err) + time.Sleep(500 * time.Millisecond) + } + c.Assert(err, IsNil) + + count, err := coll.Count() + c.Assert(count > 1, Equals, true) +} + +func (s *S) TestNearestSecondary(c *C) { + defer mgo.HackPingDelay(300 * time.Millisecond)() + + rs1a := "127.0.0.1:40011" + rs1b := "127.0.0.1:40012" + rs1c := "127.0.0.1:40013" + s.Freeze(rs1b) + + session, err := mgo.Dial(rs1a) + c.Assert(err, IsNil) + defer session.Close() + + // Wait for the sync up to run through the first couple of servers. + for len(session.LiveServers()) != 2 { + c.Log("Waiting for two servers to be alive...") + time.Sleep(100 * time.Millisecond) + } + + // Extra delay to ensure the third server gets penalized. + time.Sleep(500 * time.Millisecond) + + // Release third server. + s.Thaw(rs1b) + + // Wait for it to come up. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for all servers to be alive...") + time.Sleep(100 * time.Millisecond) + } + + session.SetMode(mgo.Monotonic, true) + var result struct{ Host string } + + // See which slave picks the line, several times to avoid chance. + for i := 0; i < 10; i++ { + session.Refresh() + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + c.Assert(hostPort(result.Host), Equals, hostPort(rs1c)) + } + + if *fast { + // Don't hold back for several seconds. + return + } + + // Now hold the other server for long enough to penalize it. + s.Freeze(rs1c) + time.Sleep(5 * time.Second) + s.Thaw(rs1c) + + // Wait for the ping to be processed. + time.Sleep(500 * time.Millisecond) + + // Repeating the test should now pick the former server consistently. + for i := 0; i < 10; i++ { + session.Refresh() + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + c.Assert(hostPort(result.Host), Equals, hostPort(rs1b)) + } +} + +func (s *S) TestNearestServer(c *C) { + defer mgo.HackPingDelay(300 * time.Millisecond)() + + rs1a := "127.0.0.1:40011" + rs1b := "127.0.0.1:40012" + rs1c := "127.0.0.1:40013" + + session, err := mgo.Dial(rs1a) + c.Assert(err, IsNil) + defer session.Close() + + s.Freeze(rs1a) + s.Freeze(rs1b) + + // Extra delay to ensure the first two servers get penalized. + time.Sleep(500 * time.Millisecond) + + // Release them. + s.Thaw(rs1a) + s.Thaw(rs1b) + + // Wait for everyone to come up. + for len(session.LiveServers()) != 3 { + c.Log("Waiting for all servers to be alive...") + time.Sleep(100 * time.Millisecond) + } + + session.SetMode(mgo.Nearest, true) + var result struct{ Host string } + + // See which server picks the line, several times to avoid chance. + for i := 0; i < 10; i++ { + session.Refresh() + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + c.Assert(hostPort(result.Host), Equals, hostPort(rs1c)) + } + + if *fast { + // Don't hold back for several seconds. + return + } + + // Now hold the two secondaries for long enough to penalize them. + s.Freeze(rs1b) + s.Freeze(rs1c) + time.Sleep(5 * time.Second) + s.Thaw(rs1b) + s.Thaw(rs1c) + + // Wait for the ping to be processed. + time.Sleep(500 * time.Millisecond) + + // Repeating the test should now pick the primary server consistently. + for i := 0; i < 10; i++ { + session.Refresh() + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + c.Assert(hostPort(result.Host), Equals, hostPort(rs1a)) + } +} + +func (s *S) TestConnectCloseConcurrency(c *C) { + restore := mgo.HackPingDelay(500 * time.Millisecond) + defer restore() + var wg sync.WaitGroup + const n = 500 + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + session, err := mgo.Dial("localhost:40001") + if err != nil { + c.Fatal(err) + } + time.Sleep(1) + session.Close() + }() + } + wg.Wait() +} + +func (s *S) TestSelectServers(c *C) { + if !s.versionAtLeast(2, 2) { + c.Skip("read preferences introduced in 2.2") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + session.SetMode(mgo.Eventual, true) + + var result struct{ Host string } + + session.Refresh() + session.SelectServers(bson.D{{"rs1", "b"}}) + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + c.Assert(hostPort(result.Host), Equals, "40012") + + session.Refresh() + session.SelectServers(bson.D{{"rs1", "c"}}) + err = session.Run("serverStatus", &result) + c.Assert(err, IsNil) + c.Assert(hostPort(result.Host), Equals, "40013") +} + +func (s *S) TestSelectServersWithMongos(c *C) { + if !s.versionAtLeast(2, 2) { + c.Skip("read preferences introduced in 2.2") + } + + session, err := mgo.Dial("localhost:40021") + c.Assert(err, IsNil) + defer session.Close() + + ssresult := &struct{ Host string }{} + imresult := &struct{ IsMaster bool }{} + + // Figure the master while still using the strong session. + err = session.Run("serverStatus", ssresult) + c.Assert(err, IsNil) + err = session.Run("isMaster", imresult) + c.Assert(err, IsNil) + master := ssresult.Host + c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master)) + + var slave1, slave2 string + switch hostPort(master) { + case "40021": + slave1, slave2 = "b", "c" + case "40022": + slave1, slave2 = "a", "c" + case "40023": + slave1, slave2 = "a", "b" + } + + // Collect op counters for everyone. + q21a := s.countQueries(c, "localhost:40021") + q22a := s.countQueries(c, "localhost:40022") + q23a := s.countQueries(c, "localhost:40023") + + // Do a SlaveOk query through MongoS + mongos, err := mgo.Dial("localhost:40202") + c.Assert(err, IsNil) + defer mongos.Close() + + mongos.SetMode(mgo.Monotonic, true) + + mongos.Refresh() + mongos.SelectServers(bson.D{{"rs2", slave1}}) + coll := mongos.DB("mydb").C("mycoll") + result := &struct{}{} + for i := 0; i != 5; i++ { + err := coll.Find(nil).One(result) + c.Assert(err, Equals, mgo.ErrNotFound) + } + + mongos.Refresh() + mongos.SelectServers(bson.D{{"rs2", slave2}}) + coll = mongos.DB("mydb").C("mycoll") + for i := 0; i != 7; i++ { + err := coll.Find(nil).One(result) + c.Assert(err, Equals, mgo.ErrNotFound) + } + + // Collect op counters for everyone again. + q21b := s.countQueries(c, "localhost:40021") + q22b := s.countQueries(c, "localhost:40022") + q23b := s.countQueries(c, "localhost:40023") + + switch hostPort(master) { + case "40021": + c.Check(q21b-q21a, Equals, 0) + c.Check(q22b-q22a, Equals, 5) + c.Check(q23b-q23a, Equals, 7) + case "40022": + c.Check(q21b-q21a, Equals, 5) + c.Check(q22b-q22a, Equals, 0) + c.Check(q23b-q23a, Equals, 7) + case "40023": + c.Check(q21b-q21a, Equals, 5) + c.Check(q22b-q22a, Equals, 7) + c.Check(q23b-q23a, Equals, 0) + default: + c.Fatal("Uh?") + } +} + +func (s *S) TestDoNotFallbackToMonotonic(c *C) { + // There was a bug at some point that some functions were + // falling back to Monotonic mode. This test ensures all listIndexes + // commands go to the primary, as should happen since the session is + // in Strong mode. + if !s.versionAtLeast(3, 0) { + c.Skip("command-counting logic depends on 3.0+") + } + + session, err := mgo.Dial("localhost:40012") + c.Assert(err, IsNil) + defer session.Close() + + for i := 0; i < 15; i++ { + q11a := s.countCommands(c, "localhost:40011", "listIndexes") + q12a := s.countCommands(c, "localhost:40012", "listIndexes") + q13a := s.countCommands(c, "localhost:40013", "listIndexes") + + _, err := session.DB("local").C("system.indexes").Indexes() + c.Assert(err, IsNil) + + q11b := s.countCommands(c, "localhost:40011", "listIndexes") + q12b := s.countCommands(c, "localhost:40012", "listIndexes") + q13b := s.countCommands(c, "localhost:40013", "listIndexes") + + c.Assert(q11b, Equals, q11a+1) + c.Assert(q12b, Equals, q12a) + c.Assert(q13b, Equals, q13a) + } +} |