summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go')
-rw-r--r--src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go2090
1 files changed, 2090 insertions, 0 deletions
diff --git a/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go b/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go
new file mode 100644
index 00000000000..54ec8676226
--- /dev/null
+++ b/src/mongo/gotools/vendor/src/gopkg.in/mgo.v2/cluster_test.go
@@ -0,0 +1,2090 @@
+// mgo - MongoDB driver for Go
+//
+// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
+//
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// 1. Redistributions of source code must retain the above copyright notice, this
+// list of conditions and the following disclaimer.
+// 2. Redistributions in binary form must reproduce the above copyright notice,
+// this list of conditions and the following disclaimer in the documentation
+// and/or other materials provided with the distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+package mgo_test
+
+import (
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "sync"
+ "time"
+
+ . "gopkg.in/check.v1"
+ "gopkg.in/mgo.v2"
+ "gopkg.in/mgo.v2/bson"
+)
+
+func (s *S) TestNewSession(c *C) {
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Do a dummy operation to wait for connection.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"_id": 1})
+ c.Assert(err, IsNil)
+
+ // Tweak safety and query settings to ensure other has copied those.
+ session.SetSafe(nil)
+ session.SetBatch(-1)
+ other := session.New()
+ defer other.Close()
+ session.SetSafe(&mgo.Safe{})
+
+ // Clone was copied while session was unsafe, so no errors.
+ otherColl := other.DB("mydb").C("mycoll")
+ err = otherColl.Insert(M{"_id": 1})
+ c.Assert(err, IsNil)
+
+ // Original session was made safe again.
+ err = coll.Insert(M{"_id": 1})
+ c.Assert(err, NotNil)
+
+ // With New(), each session has its own socket now.
+ stats := mgo.GetStats()
+ c.Assert(stats.MasterConns, Equals, 2)
+ c.Assert(stats.SocketsInUse, Equals, 2)
+
+ // Ensure query parameters were cloned.
+ err = otherColl.Insert(M{"_id": 2})
+ c.Assert(err, IsNil)
+
+ // Ping the database to ensure the nonce has been received already.
+ c.Assert(other.Ping(), IsNil)
+
+ mgo.ResetStats()
+
+ iter := otherColl.Find(M{}).Iter()
+ c.Assert(err, IsNil)
+
+ m := M{}
+ ok := iter.Next(m)
+ c.Assert(ok, Equals, true)
+ err = iter.Close()
+ c.Assert(err, IsNil)
+
+ // If Batch(-1) is in effect, a single document must have been received.
+ stats = mgo.GetStats()
+ c.Assert(stats.ReceivedDocs, Equals, 1)
+}
+
+func (s *S) TestCloneSession(c *C) {
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Do a dummy operation to wait for connection.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"_id": 1})
+ c.Assert(err, IsNil)
+
+ // Tweak safety and query settings to ensure clone is copying those.
+ session.SetSafe(nil)
+ session.SetBatch(-1)
+ clone := session.Clone()
+ defer clone.Close()
+ session.SetSafe(&mgo.Safe{})
+
+ // Clone was copied while session was unsafe, so no errors.
+ cloneColl := clone.DB("mydb").C("mycoll")
+ err = cloneColl.Insert(M{"_id": 1})
+ c.Assert(err, IsNil)
+
+ // Original session was made safe again.
+ err = coll.Insert(M{"_id": 1})
+ c.Assert(err, NotNil)
+
+ // With Clone(), same socket is shared between sessions now.
+ stats := mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 1)
+ c.Assert(stats.SocketRefs, Equals, 2)
+
+ // Refreshing one of them should let the original socket go,
+ // while preserving the safety settings.
+ clone.Refresh()
+ err = cloneColl.Insert(M{"_id": 1})
+ c.Assert(err, IsNil)
+
+ // Must have used another connection now.
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 2)
+ c.Assert(stats.SocketRefs, Equals, 2)
+
+ // Ensure query parameters were cloned.
+ err = cloneColl.Insert(M{"_id": 2})
+ c.Assert(err, IsNil)
+
+ // Ping the database to ensure the nonce has been received already.
+ c.Assert(clone.Ping(), IsNil)
+
+ mgo.ResetStats()
+
+ iter := cloneColl.Find(M{}).Iter()
+ c.Assert(err, IsNil)
+
+ m := M{}
+ ok := iter.Next(m)
+ c.Assert(ok, Equals, true)
+ err = iter.Close()
+ c.Assert(err, IsNil)
+
+ // If Batch(-1) is in effect, a single document must have been received.
+ stats = mgo.GetStats()
+ c.Assert(stats.ReceivedDocs, Equals, 1)
+}
+
+func (s *S) TestModeStrong(c *C) {
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, false)
+ session.SetMode(mgo.Strong, false)
+
+ c.Assert(session.Mode(), Equals, mgo.Strong)
+
+ result := M{}
+ cmd := session.DB("admin").C("$cmd")
+ err = cmd.Find(M{"ismaster": 1}).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, true)
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ // Wait since the sync also uses sockets.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ stats := mgo.GetStats()
+ c.Assert(stats.MasterConns, Equals, 1)
+ c.Assert(stats.SlaveConns, Equals, 2)
+ c.Assert(stats.SocketsInUse, Equals, 1)
+
+ session.SetMode(mgo.Strong, true)
+
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 0)
+}
+
+func (s *S) TestModeMonotonic(c *C) {
+ // Must necessarily connect to a slave, otherwise the
+ // master connection will be available first.
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, false)
+
+ c.Assert(session.Mode(), Equals, mgo.Monotonic)
+
+ var result struct{ IsMaster bool }
+ cmd := session.DB("admin").C("$cmd")
+ err = cmd.Find(M{"ismaster": 1}).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result.IsMaster, Equals, false)
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ err = cmd.Find(M{"ismaster": 1}).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result.IsMaster, Equals, true)
+
+ // Wait since the sync also uses sockets.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ stats := mgo.GetStats()
+ c.Assert(stats.MasterConns, Equals, 1)
+ c.Assert(stats.SlaveConns, Equals, 2)
+ c.Assert(stats.SocketsInUse, Equals, 2)
+
+ session.SetMode(mgo.Monotonic, true)
+
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 0)
+}
+
+func (s *S) TestModeMonotonicAfterStrong(c *C) {
+ // Test that a strong session shifting to a monotonic
+ // one preserves the socket untouched.
+
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Insert something to force a connection to the master.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ session.SetMode(mgo.Monotonic, false)
+
+ // Wait since the sync also uses sockets.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ // Master socket should still be reserved.
+ stats := mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 1)
+
+ // Confirm it's the master even though it's Monotonic by now.
+ result := M{}
+ cmd := session.DB("admin").C("$cmd")
+ err = cmd.Find(M{"ismaster": 1}).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, true)
+}
+
+func (s *S) TestModeStrongAfterMonotonic(c *C) {
+ // Test that shifting from Monotonic to Strong while
+ // using a slave socket will keep the socket reserved
+ // until the master socket is necessary, so that no
+ // switch over occurs unless it's actually necessary.
+
+ // Must necessarily connect to a slave, otherwise the
+ // master connection will be available first.
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, false)
+
+ // Ensure we're talking to a slave, and reserve the socket.
+ result := M{}
+ err = session.Run("ismaster", &result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, false)
+
+ // Switch to a Strong session.
+ session.SetMode(mgo.Strong, false)
+
+ // Wait since the sync also uses sockets.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ // Slave socket should still be reserved.
+ stats := mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 1)
+
+ // But any operation will switch it to the master.
+ result = M{}
+ err = session.Run("ismaster", &result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, true)
+}
+
+func (s *S) TestModeMonotonicWriteOnIteration(c *C) {
+ // Must necessarily connect to a slave, otherwise the
+ // master connection will be available first.
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, false)
+
+ c.Assert(session.Mode(), Equals, mgo.Monotonic)
+
+ coll1 := session.DB("mydb").C("mycoll1")
+ coll2 := session.DB("mydb").C("mycoll2")
+
+ ns := []int{40, 41, 42, 43, 44, 45, 46}
+ for _, n := range ns {
+ err := coll1.Insert(M{"n": n})
+ c.Assert(err, IsNil)
+ }
+
+ // Release master so we can grab a slave again.
+ session.Refresh()
+
+ // Wait until synchronization is done.
+ for {
+ n, err := coll1.Count()
+ c.Assert(err, IsNil)
+ if n == len(ns) {
+ break
+ }
+ }
+
+ iter := coll1.Find(nil).Batch(2).Iter()
+ i := 0
+ m := M{}
+ for iter.Next(&m) {
+ i++
+ if i > 3 {
+ err := coll2.Insert(M{"n": 47 + i})
+ c.Assert(err, IsNil)
+ }
+ }
+ c.Assert(i, Equals, len(ns))
+}
+
+func (s *S) TestModeEventual(c *C) {
+ // Must necessarily connect to a slave, otherwise the
+ // master connection will be available first.
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Eventual, false)
+
+ c.Assert(session.Mode(), Equals, mgo.Eventual)
+
+ result := M{}
+ err = session.Run("ismaster", &result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, false)
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ result = M{}
+ err = session.Run("ismaster", &result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, false)
+
+ // Wait since the sync also uses sockets.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ stats := mgo.GetStats()
+ c.Assert(stats.MasterConns, Equals, 1)
+ c.Assert(stats.SlaveConns, Equals, 2)
+ c.Assert(stats.SocketsInUse, Equals, 0)
+}
+
+func (s *S) TestModeEventualAfterStrong(c *C) {
+ // Test that a strong session shifting to an eventual
+ // one preserves the socket untouched.
+
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Insert something to force a connection to the master.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ session.SetMode(mgo.Eventual, false)
+
+ // Wait since the sync also uses sockets.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ // Master socket should still be reserved.
+ stats := mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 1)
+
+ // Confirm it's the master even though it's Eventual by now.
+ result := M{}
+ cmd := session.DB("admin").C("$cmd")
+ err = cmd.Find(M{"ismaster": 1}).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result["ismaster"], Equals, true)
+
+ session.SetMode(mgo.Eventual, true)
+
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 0)
+}
+
+func (s *S) TestModeStrongFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // With strong consistency, this will open a socket to the master.
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+
+ // Kill the master.
+ host := result.Host
+ s.Stop(host)
+
+ // This must fail, since the connection was broken.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ // With strong consistency, it fails again until reset.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ session.Refresh()
+
+ // Now we should be able to talk to the new master.
+ // Increase the timeout since this may take quite a while.
+ session.SetSyncTimeout(3 * time.Minute)
+
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Host, Not(Equals), host)
+
+ // Insert some data to confirm it's indeed a master.
+ err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
+ c.Assert(err, IsNil)
+}
+
+func (s *S) TestModePrimaryHiccup(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // With strong consistency, this will open a socket to the master.
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+
+ // Establish a few extra sessions to create spare sockets to
+ // the master. This increases a bit the chances of getting an
+ // incorrect cached socket.
+ var sessions []*mgo.Session
+ for i := 0; i < 20; i++ {
+ sessions = append(sessions, session.Copy())
+ err = sessions[len(sessions)-1].Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ }
+ for i := range sessions {
+ sessions[i].Close()
+ }
+
+ // Kill the master, but bring it back immediatelly.
+ host := result.Host
+ s.Stop(host)
+ s.StartAll()
+
+ // This must fail, since the connection was broken.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ // With strong consistency, it fails again until reset.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ session.Refresh()
+
+ // Now we should be able to talk to the new master.
+ // Increase the timeout since this may take quite a while.
+ session.SetSyncTimeout(3 * time.Minute)
+
+ // Insert some data to confirm it's indeed a master.
+ err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
+ c.Assert(err, IsNil)
+}
+
+func (s *S) TestModeMonotonicFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, true)
+
+ // Insert something to force a switch to the master.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ // Wait a bit for this to be synchronized to slaves.
+ time.Sleep(3 * time.Second)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+
+ // Kill the master.
+ host := result.Host
+ s.Stop(host)
+
+ // This must fail, since the connection was broken.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ // With monotonic consistency, it fails again until reset.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ session.Refresh()
+
+ // Now we should be able to talk to the new master.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Host, Not(Equals), host)
+}
+
+func (s *S) TestModeMonotonicWithSlaveFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ ssresult := &struct{ Host string }{}
+ imresult := &struct{ IsMaster bool }{}
+
+ // Figure the master while still using the strong session.
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+ err = session.Run("isMaster", imresult)
+ c.Assert(err, IsNil)
+ master := ssresult.Host
+ c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
+
+ // Create new monotonic session with an explicit address to ensure
+ // a slave is synchronized before the master, otherwise a connection
+ // with the master may be used below for lack of other options.
+ var addr string
+ switch {
+ case strings.HasSuffix(ssresult.Host, ":40021"):
+ addr = "localhost:40022"
+ case strings.HasSuffix(ssresult.Host, ":40022"):
+ addr = "localhost:40021"
+ case strings.HasSuffix(ssresult.Host, ":40023"):
+ addr = "localhost:40021"
+ default:
+ c.Fatal("Unknown host: ", ssresult.Host)
+ }
+
+ session, err = mgo.Dial(addr)
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, true)
+
+ // Check the address of the socket associated with the monotonic session.
+ c.Log("Running serverStatus and isMaster with monotonic session")
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+ err = session.Run("isMaster", imresult)
+ c.Assert(err, IsNil)
+ slave := ssresult.Host
+ c.Assert(imresult.IsMaster, Equals, false, Commentf("%s is not a slave", slave))
+
+ c.Assert(master, Not(Equals), slave)
+
+ // Kill the master.
+ s.Stop(master)
+
+ // Session must still be good, since we were talking to a slave.
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+
+ c.Assert(ssresult.Host, Equals, slave,
+ Commentf("Monotonic session moved from %s to %s", slave, ssresult.Host))
+
+ // If we try to insert something, it'll have to hold until the new
+ // master is available to move the connection, and work correctly.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ // Must now be talking to the new master.
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+ err = session.Run("isMaster", imresult)
+ c.Assert(err, IsNil)
+ c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
+
+ // ... which is not the old one, since it's still dead.
+ c.Assert(ssresult.Host, Not(Equals), master)
+}
+
+func (s *S) TestModeEventualFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ master := result.Host
+
+ session.SetMode(mgo.Eventual, true)
+
+ // Should connect to the master when needed.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ // Wait a bit for this to be synchronized to slaves.
+ time.Sleep(3 * time.Second)
+
+ // Kill the master.
+ s.Stop(master)
+
+ // Should still work, with the new master now.
+ coll = session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Host, Not(Equals), master)
+}
+
+func (s *S) TestModeSecondaryJustPrimary(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Secondary, true)
+
+ err = session.Ping()
+ c.Assert(err, ErrorMatches, "no reachable servers")
+}
+
+func (s *S) TestModeSecondaryPreferredJustPrimary(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.SecondaryPreferred, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+}
+
+func (s *S) TestModeSecondaryPreferredFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Ensure secondaries are available for being picked up.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ session.SetMode(mgo.SecondaryPreferred, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(supvName(result.Host), Not(Equals), "rs1a")
+ secondary := result.Host
+
+ // Should connect to the primary when needed.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ // Wait a bit for this to be synchronized to slaves.
+ time.Sleep(3 * time.Second)
+
+ // Kill the primary.
+ s.Stop("localhost:40011")
+
+ // It can still talk to the selected secondary.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Host, Equals, secondary)
+
+ // But cannot speak to the primary until reset.
+ coll = session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, Equals, io.EOF)
+
+ session.Refresh()
+
+ // Can still talk to a secondary.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(supvName(result.Host), Not(Equals), "rs1a")
+
+ s.StartAll()
+
+ // Should now be able to talk to the primary again.
+ coll = session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+}
+
+func (s *S) TestModePrimaryPreferredFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.PrimaryPreferred, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(supvName(result.Host), Equals, "rs1a")
+
+ // Kill the primary.
+ s.Stop("localhost:40011")
+
+ // Should now fail as there was a primary socket in use already.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ // Refresh so the reserved primary socket goes away.
+ session.Refresh()
+
+ // Should be able to talk to the secondary.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+
+ s.StartAll()
+
+ // Should wait for the new primary to become available.
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ // And should use the new primary in general, as it is preferred.
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(supvName(result.Host), Equals, "rs1a")
+}
+
+func (s *S) TestModePrimaryFallover(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetSyncTimeout(3 * time.Second)
+
+ session.SetMode(mgo.Primary, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(supvName(result.Host), Equals, "rs1a")
+
+ // Kill the primary.
+ s.Stop("localhost:40011")
+
+ session.Refresh()
+
+ err = session.Ping()
+ c.Assert(err, ErrorMatches, "no reachable servers")
+}
+
+func (s *S) TestModeSecondary(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Secondary, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(supvName(result.Host), Not(Equals), "rs1a")
+ secondary := result.Host
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Host, Equals, secondary)
+}
+
+func (s *S) TestPreserveSocketCountOnSync(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ stats := mgo.GetStats()
+ for stats.SocketsAlive != 3 {
+ c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
+ stats = mgo.GetStats()
+ time.Sleep(5e8)
+ }
+
+ c.Assert(stats.SocketsAlive, Equals, 3)
+
+ // Kill the master (with rs1, 'a' is always the master).
+ s.Stop("localhost:40011")
+
+ // Wait for the logic to run for a bit and bring it back.
+ startedAll := make(chan bool)
+ go func() {
+ time.Sleep(5e9)
+ s.StartAll()
+ startedAll <- true
+ }()
+
+ // Do not allow the test to return before the goroutine above is done.
+ defer func() {
+ <-startedAll
+ }()
+
+ // Do an action to kick the resync logic in, and also to
+ // wait until the cluster recognizes the server is back.
+ result := struct{ Ok bool }{}
+ err = session.Run("getLastError", &result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Ok, Equals, true)
+
+ for i := 0; i != 20; i++ {
+ stats = mgo.GetStats()
+ if stats.SocketsAlive == 3 {
+ break
+ }
+ c.Logf("Waiting for 3 sockets alive, have %d", stats.SocketsAlive)
+ time.Sleep(5e8)
+ }
+
+ // Ensure the number of sockets is preserved after syncing.
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsAlive, Equals, 3)
+ c.Assert(stats.SocketsInUse, Equals, 1)
+ c.Assert(stats.SocketRefs, Equals, 1)
+}
+
+// Connect to the master of a deployment with a single server,
+// run an insert, and then ensure the insert worked and that a
+// single connection was established.
+func (s *S) TestTopologySyncWithSingleMaster(c *C) {
+ // Use hostname here rather than IP, to make things trickier.
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1, "b": 2})
+ c.Assert(err, IsNil)
+
+ // One connection used for discovery. Master socket recycled for
+ // insert. Socket is reserved after insert.
+ stats := mgo.GetStats()
+ c.Assert(stats.MasterConns, Equals, 1)
+ c.Assert(stats.SlaveConns, Equals, 0)
+ c.Assert(stats.SocketsInUse, Equals, 1)
+
+ // Refresh session and socket must be released.
+ session.Refresh()
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 0)
+}
+
+func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
+ // That's supposed to be a slave. Must run discovery
+ // and find out master to insert successfully.
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ coll := session.DB("mydb").C("mycoll")
+ coll.Insert(M{"a": 1, "b": 2})
+
+ result := struct{ Ok bool }{}
+ err = session.Run("getLastError", &result)
+ c.Assert(err, IsNil)
+ c.Assert(result.Ok, Equals, true)
+
+ // One connection to each during discovery. Master
+ // socket recycled for insert.
+ stats := mgo.GetStats()
+ c.Assert(stats.MasterConns, Equals, 1)
+ c.Assert(stats.SlaveConns, Equals, 2)
+
+ // Only one socket reference alive, in the master socket owned
+ // by the above session.
+ c.Assert(stats.SocketsInUse, Equals, 1)
+
+ // Refresh it, and it must be gone.
+ session.Refresh()
+ stats = mgo.GetStats()
+ c.Assert(stats.SocketsInUse, Equals, 0)
+}
+
+func (s *S) TestSyncTimeout(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ s.Stop("localhost:40001")
+
+ timeout := 3 * time.Second
+ session.SetSyncTimeout(timeout)
+ started := time.Now()
+
+ // Do something.
+ result := struct{ Ok bool }{}
+ err = session.Run("getLastError", &result)
+ c.Assert(err, ErrorMatches, "no reachable servers")
+ c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
+ c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
+}
+
+func (s *S) TestDialWithTimeout(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ timeout := 2 * time.Second
+ started := time.Now()
+
+ // 40009 isn't used by the test servers.
+ session, err := mgo.DialWithTimeout("localhost:40009", timeout)
+ if session != nil {
+ session.Close()
+ }
+ c.Assert(err, ErrorMatches, "no reachable servers")
+ c.Assert(session, IsNil)
+ c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
+ c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
+}
+
+func (s *S) TestSocketTimeout(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ s.Freeze("localhost:40001")
+
+ timeout := 3 * time.Second
+ session.SetSocketTimeout(timeout)
+ started := time.Now()
+
+ // Do something.
+ result := struct{ Ok bool }{}
+ err = session.Run("getLastError", &result)
+ c.Assert(err, ErrorMatches, ".*: i/o timeout")
+ c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
+ c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
+}
+
+func (s *S) TestSocketTimeoutOnDial(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ timeout := 1 * time.Second
+
+ defer mgo.HackSyncSocketTimeout(timeout)()
+
+ s.Freeze("localhost:40001")
+
+ started := time.Now()
+
+ session, err := mgo.DialWithTimeout("localhost:40001", timeout)
+ c.Assert(err, ErrorMatches, "no reachable servers")
+ c.Assert(session, IsNil)
+
+ c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
+ c.Assert(started.After(time.Now().Add(-20*time.Second)), Equals, true)
+}
+
+func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ timeout := 2 * time.Second
+ session.SetSocketTimeout(timeout)
+
+ // Do something that relies on the timeout and works.
+ c.Assert(session.Ping(), IsNil)
+
+ // Freeze and wait for the timeout to go by.
+ s.Freeze("localhost:40001")
+ time.Sleep(timeout + 500*time.Millisecond)
+ s.Thaw("localhost:40001")
+
+ // Do something again. The timeout above should not have killed
+ // the socket as there was nothing to be done.
+ c.Assert(session.Ping(), IsNil)
+}
+
+func (s *S) TestDialWithReplicaSetName(c *C) {
+ seedLists := [][]string{
+ // rs1 primary and rs2 primary
+ []string{"localhost:40011", "localhost:40021"},
+ // rs1 primary and rs2 secondary
+ []string{"localhost:40011", "localhost:40022"},
+ // rs1 secondary and rs2 primary
+ []string{"localhost:40012", "localhost:40021"},
+ // rs1 secondary and rs2 secondary
+ []string{"localhost:40012", "localhost:40022"},
+ }
+
+ rs2Members := []string{":40021", ":40022", ":40023"}
+
+ verifySyncedServers := func(session *mgo.Session, numServers int) {
+ // wait for the server(s) to be synced
+ for len(session.LiveServers()) != numServers {
+ c.Log("Waiting for cluster sync to finish...")
+ time.Sleep(5e8)
+ }
+
+ // ensure none of the rs2 set members are communicated with
+ for _, addr := range session.LiveServers() {
+ for _, rs2Member := range rs2Members {
+ c.Assert(strings.HasSuffix(addr, rs2Member), Equals, false)
+ }
+ }
+ }
+
+ // only communication with rs1 members is expected
+ for _, seedList := range seedLists {
+ info := mgo.DialInfo{
+ Addrs: seedList,
+ Timeout: 5 * time.Second,
+ ReplicaSetName: "rs1",
+ }
+
+ session, err := mgo.DialWithInfo(&info)
+ c.Assert(err, IsNil)
+ verifySyncedServers(session, 3)
+ session.Close()
+
+ info.Direct = true
+ session, err = mgo.DialWithInfo(&info)
+ c.Assert(err, IsNil)
+ verifySyncedServers(session, 1)
+ session.Close()
+
+ connectionUrl := fmt.Sprintf("mongodb://%v/?replicaSet=rs1", strings.Join(seedList, ","))
+ session, err = mgo.Dial(connectionUrl)
+ c.Assert(err, IsNil)
+ verifySyncedServers(session, 3)
+ session.Close()
+
+ connectionUrl += "&connect=direct"
+ session, err = mgo.Dial(connectionUrl)
+ c.Assert(err, IsNil)
+ verifySyncedServers(session, 1)
+ session.Close()
+ }
+
+}
+
+func (s *S) TestDirect(c *C) {
+ session, err := mgo.Dial("localhost:40012?connect=direct")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // We know that server is a slave.
+ session.SetMode(mgo.Monotonic, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
+
+ stats := mgo.GetStats()
+ c.Assert(stats.SocketsAlive, Equals, 1)
+ c.Assert(stats.SocketsInUse, Equals, 1)
+ c.Assert(stats.SocketRefs, Equals, 1)
+
+ // We've got no master, so it'll timeout.
+ session.SetSyncTimeout(5e8 * time.Nanosecond)
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"test": 1})
+ c.Assert(err, ErrorMatches, "no reachable servers")
+
+ // Writing to the local database is okay.
+ coll = session.DB("local").C("mycoll")
+ defer coll.RemoveAll(nil)
+ id := bson.NewObjectId()
+ err = coll.Insert(M{"_id": id})
+ c.Assert(err, IsNil)
+
+ // Data was stored in the right server.
+ n, err := coll.Find(M{"_id": id}).Count()
+ c.Assert(err, IsNil)
+ c.Assert(n, Equals, 1)
+
+ // Server hasn't changed.
+ result.Host = ""
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
+}
+
+func (s *S) TestDirectToUnknownStateMember(c *C) {
+ session, err := mgo.Dial("localhost:40041?connect=direct")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Monotonic, true)
+
+ result := &struct{ Host string }{}
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
+
+ // We've got no master, so it'll timeout.
+ session.SetSyncTimeout(5e8 * time.Nanosecond)
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"test": 1})
+ c.Assert(err, ErrorMatches, "no reachable servers")
+
+ // Slave is still reachable.
+ result.Host = ""
+ err = session.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+ c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
+}
+
+func (s *S) TestFailFast(c *C) {
+ info := mgo.DialInfo{
+ Addrs: []string{"localhost:99999"},
+ Timeout: 5 * time.Second,
+ FailFast: true,
+ }
+
+ started := time.Now()
+
+ _, err := mgo.DialWithInfo(&info)
+ c.Assert(err, ErrorMatches, "no reachable servers")
+
+ c.Assert(started.After(time.Now().Add(-time.Second)), Equals, true)
+}
+
+func (s *S) countQueries(c *C, server string) (n int) {
+ defer func() { c.Logf("Queries for %q: %d", server, n) }()
+ session, err := mgo.Dial(server + "?connect=direct")
+ c.Assert(err, IsNil)
+ defer session.Close()
+ session.SetMode(mgo.Monotonic, true)
+ var result struct {
+ OpCounters struct {
+ Query int
+ }
+ Metrics struct {
+ Commands struct{ Find struct{ Total int } }
+ }
+ }
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ if s.versionAtLeast(3, 2) {
+ return result.Metrics.Commands.Find.Total
+ }
+ return result.OpCounters.Query
+}
+
+func (s *S) countCommands(c *C, server, commandName string) (n int) {
+ defer func() { c.Logf("Queries for %q: %d", server, n) }()
+ session, err := mgo.Dial(server + "?connect=direct")
+ c.Assert(err, IsNil)
+ defer session.Close()
+ session.SetMode(mgo.Monotonic, true)
+ var result struct {
+ Metrics struct {
+ Commands map[string]struct{ Total int }
+ }
+ }
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ return result.Metrics.Commands[commandName].Total
+}
+
+func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) {
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ ssresult := &struct{ Host string }{}
+ imresult := &struct{ IsMaster bool }{}
+
+ // Figure the master while still using the strong session.
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+ err = session.Run("isMaster", imresult)
+ c.Assert(err, IsNil)
+ master := ssresult.Host
+ c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
+
+ // Ensure mongos is aware about the current topology.
+ s.Stop(":40201")
+ s.StartAll()
+
+ mongos, err := mgo.Dial("localhost:40202")
+ c.Assert(err, IsNil)
+ defer mongos.Close()
+
+ // Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
+ err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
+ c.Assert(err, IsNil)
+
+ // Wait until all servers see the data.
+ for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
+ session, err := mgo.Dial(addr + "?connect=direct")
+ c.Assert(err, IsNil)
+ defer session.Close()
+ session.SetMode(mgo.Monotonic, true)
+ for i := 300; i >= 0; i-- {
+ n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
+ c.Assert(err, IsNil)
+ if n == 1 {
+ break
+ }
+ if i == 0 {
+ c.Fatalf("Inserted data never reached " + addr)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+
+ // Collect op counters for everyone.
+ q21a := s.countQueries(c, "localhost:40021")
+ q22a := s.countQueries(c, "localhost:40022")
+ q23a := s.countQueries(c, "localhost:40023")
+
+ // Do a SlaveOk query through MongoS
+
+ mongos.SetMode(mgo.Monotonic, true)
+
+ coll := mongos.DB("mydb").C("mycoll")
+ var result struct{ N int }
+ for i := 0; i != 5; i++ {
+ err = coll.Find(nil).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result.N, Equals, 1)
+ }
+
+ // Collect op counters for everyone again.
+ q21b := s.countQueries(c, "localhost:40021")
+ q22b := s.countQueries(c, "localhost:40022")
+ q23b := s.countQueries(c, "localhost:40023")
+
+ var masterDelta, slaveDelta int
+ switch hostPort(master) {
+ case "40021":
+ masterDelta = q21b - q21a
+ slaveDelta = (q22b - q22a) + (q23b - q23a)
+ case "40022":
+ masterDelta = q22b - q22a
+ slaveDelta = (q21b - q21a) + (q23b - q23a)
+ case "40023":
+ masterDelta = q23b - q23a
+ slaveDelta = (q21b - q21a) + (q22b - q22a)
+ default:
+ c.Fatal("Uh?")
+ }
+
+ c.Check(masterDelta, Equals, 0) // Just the counting itself.
+ c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
+}
+
+func (s *S) TestSecondaryModeWithMongos(c *C) {
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ ssresult := &struct{ Host string }{}
+ imresult := &struct{ IsMaster bool }{}
+
+ // Figure the master while still using the strong session.
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+ err = session.Run("isMaster", imresult)
+ c.Assert(err, IsNil)
+ master := ssresult.Host
+ c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
+
+ // Ensure mongos is aware about the current topology.
+ s.Stop(":40201")
+ s.StartAll()
+
+ mongos, err := mgo.Dial("localhost:40202")
+ c.Assert(err, IsNil)
+ defer mongos.Close()
+
+ mongos.SetSyncTimeout(5 * time.Second)
+
+ // Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
+ err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
+ c.Assert(err, IsNil)
+
+ // Wait until all servers see the data.
+ for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
+ session, err := mgo.Dial(addr + "?connect=direct")
+ c.Assert(err, IsNil)
+ defer session.Close()
+ session.SetMode(mgo.Monotonic, true)
+ for i := 300; i >= 0; i-- {
+ n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
+ c.Assert(err, IsNil)
+ if n == 1 {
+ break
+ }
+ if i == 0 {
+ c.Fatalf("Inserted data never reached " + addr)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+
+ // Collect op counters for everyone.
+ q21a := s.countQueries(c, "localhost:40021")
+ q22a := s.countQueries(c, "localhost:40022")
+ q23a := s.countQueries(c, "localhost:40023")
+
+ // Do a Secondary query through MongoS
+
+ mongos.SetMode(mgo.Secondary, true)
+
+ coll := mongos.DB("mydb").C("mycoll")
+ var result struct{ N int }
+ for i := 0; i != 5; i++ {
+ err = coll.Find(nil).One(&result)
+ c.Assert(err, IsNil)
+ c.Assert(result.N, Equals, 1)
+ }
+
+ // Collect op counters for everyone again.
+ q21b := s.countQueries(c, "localhost:40021")
+ q22b := s.countQueries(c, "localhost:40022")
+ q23b := s.countQueries(c, "localhost:40023")
+
+ var masterDelta, slaveDelta int
+ switch hostPort(master) {
+ case "40021":
+ masterDelta = q21b - q21a
+ slaveDelta = (q22b - q22a) + (q23b - q23a)
+ case "40022":
+ masterDelta = q22b - q22a
+ slaveDelta = (q21b - q21a) + (q23b - q23a)
+ case "40023":
+ masterDelta = q23b - q23a
+ slaveDelta = (q21b - q21a) + (q22b - q22a)
+ default:
+ c.Fatal("Uh?")
+ }
+
+ c.Check(masterDelta, Equals, 0) // Just the counting itself.
+ c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
+}
+
+func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40202")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Secondary, true)
+ session.SetSyncTimeout(4 * time.Second)
+
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(M{"a": 1})
+ c.Assert(err, IsNil)
+
+ var result struct{ A int }
+ coll.Find(nil).One(&result)
+ c.Assert(result.A, Equals, 1)
+}
+
+
+func (s *S) TestRemovalOfClusterMember(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ master, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer master.Close()
+
+ // Wait for cluster to fully sync up.
+ for i := 0; i < 10; i++ {
+ if len(master.LiveServers()) == 3 {
+ break
+ }
+ time.Sleep(5e8)
+ }
+ if len(master.LiveServers()) != 3 {
+ c.Fatalf("Test started with bad cluster state: %v", master.LiveServers())
+ }
+
+ result := &struct {
+ IsMaster bool
+ Me string
+ }{}
+ slave := master.Copy()
+ slave.SetMode(mgo.Monotonic, true) // Monotonic can hold a non-master socket persistently.
+ err = slave.Run("isMaster", result)
+ c.Assert(err, IsNil)
+ c.Assert(result.IsMaster, Equals, false)
+ slaveAddr := result.Me
+
+ defer func() {
+ config := map[string]string{
+ "40021": `{_id: 1, host: "127.0.0.1:40021", priority: 1, tags: {rs2: "a"}}`,
+ "40022": `{_id: 2, host: "127.0.0.1:40022", priority: 0, tags: {rs2: "b"}}`,
+ "40023": `{_id: 3, host: "127.0.0.1:40023", priority: 0, tags: {rs2: "c"}}`,
+ }
+ master.Refresh()
+ master.Run(bson.D{{"$eval", `rs.add(` + config[hostPort(slaveAddr)] + `)`}}, nil)
+ master.Close()
+ slave.Close()
+
+ // Ensure suite syncs up with the changes before next test.
+ s.Stop(":40201")
+ s.StartAll()
+ time.Sleep(8 * time.Second)
+ // TODO Find a better way to find out when mongos is fully aware that all
+ // servers are up. Without that follow up tests that depend on mongos will
+ // break due to their expectation of things being in a working state.
+ }()
+
+ c.Logf("========== Removing slave: %s ==========", slaveAddr)
+
+ master.Run(bson.D{{"$eval", `rs.remove("` + slaveAddr + `")`}}, nil)
+
+ master.Refresh()
+
+ // Give the cluster a moment to catch up by doing a roundtrip to the master.
+ err = master.Ping()
+ c.Assert(err, IsNil)
+
+ time.Sleep(3e9)
+
+ // This must fail since the slave has been taken off the cluster.
+ err = slave.Ping()
+ c.Assert(err, NotNil)
+
+ for i := 0; i < 15; i++ {
+ if len(master.LiveServers()) == 2 {
+ break
+ }
+ time.Sleep(time.Second)
+ }
+ live := master.LiveServers()
+ if len(live) != 2 {
+ c.Errorf("Removed server still considered live: %#s", live)
+ }
+
+ c.Log("========== Test succeeded. ==========")
+}
+
+func (s *S) TestPoolLimitSimple(c *C) {
+ for test := 0; test < 2; test++ {
+ var session *mgo.Session
+ var err error
+ if test == 0 {
+ session, err = mgo.Dial("localhost:40001")
+ c.Assert(err, IsNil)
+ session.SetPoolLimit(1)
+ } else {
+ session, err = mgo.Dial("localhost:40001?maxPoolSize=1")
+ c.Assert(err, IsNil)
+ }
+ defer session.Close()
+
+ // Put one socket in use.
+ c.Assert(session.Ping(), IsNil)
+
+ done := make(chan time.Duration)
+
+ // Now block trying to get another one due to the pool limit.
+ go func() {
+ copy := session.Copy()
+ defer copy.Close()
+ started := time.Now()
+ c.Check(copy.Ping(), IsNil)
+ done <- time.Now().Sub(started)
+ }()
+
+ time.Sleep(300 * time.Millisecond)
+
+ // Put the one socket back in the pool, freeing it for the copy.
+ session.Refresh()
+ delay := <-done
+ c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
+ }
+}
+
+func (s *S) TestPoolLimitMany(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ stats := mgo.GetStats()
+ for stats.SocketsAlive != 3 {
+ c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
+ stats = mgo.GetStats()
+ time.Sleep(5e8)
+ }
+
+ const poolLimit = 64
+ session.SetPoolLimit(poolLimit)
+
+ // Consume the whole limit for the master.
+ var master []*mgo.Session
+ for i := 0; i < poolLimit; i++ {
+ s := session.Copy()
+ defer s.Close()
+ c.Assert(s.Ping(), IsNil)
+ master = append(master, s)
+ }
+
+ before := time.Now()
+ go func() {
+ time.Sleep(3e9)
+ master[0].Refresh()
+ }()
+
+ // Then, a single ping must block, since it would need another
+ // connection to the master, over the limit. Once the goroutine
+ // above releases its socket, it should move on.
+ session.Ping()
+ delay := time.Now().Sub(before)
+ c.Assert(delay > 3e9, Equals, true)
+ c.Assert(delay < 6e9, Equals, true)
+}
+
+func (s *S) TestSetModeEventualIterBug(c *C) {
+ session1, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session1.Close()
+
+ session1.SetMode(mgo.Eventual, false)
+
+ coll1 := session1.DB("mydb").C("mycoll")
+
+ const N = 100
+ for i := 0; i < N; i++ {
+ err = coll1.Insert(M{"_id": i})
+ c.Assert(err, IsNil)
+ }
+
+ c.Logf("Waiting until secondary syncs")
+ for {
+ n, err := coll1.Count()
+ c.Assert(err, IsNil)
+ if n == N {
+ c.Logf("Found all")
+ break
+ }
+ }
+
+ session2, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session2.Close()
+
+ session2.SetMode(mgo.Eventual, false)
+
+ coll2 := session2.DB("mydb").C("mycoll")
+
+ i := 0
+ iter := coll2.Find(nil).Batch(10).Iter()
+ var result struct{}
+ for iter.Next(&result) {
+ i++
+ }
+ c.Assert(iter.Close(), Equals, nil)
+ c.Assert(i, Equals, N)
+}
+
+func (s *S) TestCustomDialOld(c *C) {
+ dials := make(chan bool, 16)
+ dial := func(addr net.Addr) (net.Conn, error) {
+ tcpaddr, ok := addr.(*net.TCPAddr)
+ if !ok {
+ return nil, fmt.Errorf("unexpected address type: %T", addr)
+ }
+ dials <- true
+ return net.DialTCP("tcp", nil, tcpaddr)
+ }
+ info := mgo.DialInfo{
+ Addrs: []string{"localhost:40012"},
+ Dial: dial,
+ }
+
+ // Use hostname here rather than IP, to make things trickier.
+ session, err := mgo.DialWithInfo(&info)
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ const N = 3
+ for i := 0; i < N; i++ {
+ select {
+ case <-dials:
+ case <-time.After(5 * time.Second):
+ c.Fatalf("expected %d dials, got %d", N, i)
+ }
+ }
+ select {
+ case <-dials:
+ c.Fatalf("got more dials than expected")
+ case <-time.After(100 * time.Millisecond):
+ }
+}
+
+func (s *S) TestCustomDialNew(c *C) {
+ dials := make(chan bool, 16)
+ dial := func(addr *mgo.ServerAddr) (net.Conn, error) {
+ dials <- true
+ if addr.TCPAddr().Port == 40012 {
+ c.Check(addr.String(), Equals, "localhost:40012")
+ }
+ return net.DialTCP("tcp", nil, addr.TCPAddr())
+ }
+ info := mgo.DialInfo{
+ Addrs: []string{"localhost:40012"},
+ DialServer: dial,
+ }
+
+ // Use hostname here rather than IP, to make things trickier.
+ session, err := mgo.DialWithInfo(&info)
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ const N = 3
+ for i := 0; i < N; i++ {
+ select {
+ case <-dials:
+ case <-time.After(5 * time.Second):
+ c.Fatalf("expected %d dials, got %d", N, i)
+ }
+ }
+ select {
+ case <-dials:
+ c.Fatalf("got more dials than expected")
+ case <-time.After(100 * time.Millisecond):
+ }
+}
+
+func (s *S) TestPrimaryShutdownOnAuthShard(c *C) {
+ if *fast {
+ c.Skip("-fast")
+ }
+
+ // Dial the shard.
+ session, err := mgo.Dial("localhost:40203")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Login and insert something to make it more realistic.
+ session.DB("admin").Login("root", "rapadura")
+ coll := session.DB("mydb").C("mycoll")
+ err = coll.Insert(bson.M{"n": 1})
+ c.Assert(err, IsNil)
+
+ // Dial the replica set to figure the master out.
+ rs, err := mgo.Dial("root:rapadura@localhost:40031")
+ c.Assert(err, IsNil)
+ defer rs.Close()
+
+ // With strong consistency, this will open a socket to the master.
+ result := &struct{ Host string }{}
+ err = rs.Run("serverStatus", result)
+ c.Assert(err, IsNil)
+
+ // Kill the master.
+ host := result.Host
+ s.Stop(host)
+
+ // This must fail, since the connection was broken.
+ err = rs.Run("serverStatus", result)
+ c.Assert(err, Equals, io.EOF)
+
+ // This won't work because the master just died.
+ err = coll.Insert(bson.M{"n": 2})
+ c.Assert(err, NotNil)
+
+ // Refresh session and wait for re-election.
+ session.Refresh()
+ for i := 0; i < 60; i++ {
+ err = coll.Insert(bson.M{"n": 3})
+ if err == nil {
+ break
+ }
+ c.Logf("Waiting for replica set to elect a new master. Last error: %v", err)
+ time.Sleep(500 * time.Millisecond)
+ }
+ c.Assert(err, IsNil)
+
+ count, err := coll.Count()
+ c.Assert(count > 1, Equals, true)
+}
+
+func (s *S) TestNearestSecondary(c *C) {
+ defer mgo.HackPingDelay(300 * time.Millisecond)()
+
+ rs1a := "127.0.0.1:40011"
+ rs1b := "127.0.0.1:40012"
+ rs1c := "127.0.0.1:40013"
+ s.Freeze(rs1b)
+
+ session, err := mgo.Dial(rs1a)
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ // Wait for the sync up to run through the first couple of servers.
+ for len(session.LiveServers()) != 2 {
+ c.Log("Waiting for two servers to be alive...")
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ // Extra delay to ensure the third server gets penalized.
+ time.Sleep(500 * time.Millisecond)
+
+ // Release third server.
+ s.Thaw(rs1b)
+
+ // Wait for it to come up.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for all servers to be alive...")
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ session.SetMode(mgo.Monotonic, true)
+ var result struct{ Host string }
+
+ // See which slave picks the line, several times to avoid chance.
+ for i := 0; i < 10; i++ {
+ session.Refresh()
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
+ }
+
+ if *fast {
+ // Don't hold back for several seconds.
+ return
+ }
+
+ // Now hold the other server for long enough to penalize it.
+ s.Freeze(rs1c)
+ time.Sleep(5 * time.Second)
+ s.Thaw(rs1c)
+
+ // Wait for the ping to be processed.
+ time.Sleep(500 * time.Millisecond)
+
+ // Repeating the test should now pick the former server consistently.
+ for i := 0; i < 10; i++ {
+ session.Refresh()
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ c.Assert(hostPort(result.Host), Equals, hostPort(rs1b))
+ }
+}
+
+func (s *S) TestNearestServer(c *C) {
+ defer mgo.HackPingDelay(300 * time.Millisecond)()
+
+ rs1a := "127.0.0.1:40011"
+ rs1b := "127.0.0.1:40012"
+ rs1c := "127.0.0.1:40013"
+
+ session, err := mgo.Dial(rs1a)
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ s.Freeze(rs1a)
+ s.Freeze(rs1b)
+
+ // Extra delay to ensure the first two servers get penalized.
+ time.Sleep(500 * time.Millisecond)
+
+ // Release them.
+ s.Thaw(rs1a)
+ s.Thaw(rs1b)
+
+ // Wait for everyone to come up.
+ for len(session.LiveServers()) != 3 {
+ c.Log("Waiting for all servers to be alive...")
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ session.SetMode(mgo.Nearest, true)
+ var result struct{ Host string }
+
+ // See which server picks the line, several times to avoid chance.
+ for i := 0; i < 10; i++ {
+ session.Refresh()
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
+ }
+
+ if *fast {
+ // Don't hold back for several seconds.
+ return
+ }
+
+ // Now hold the two secondaries for long enough to penalize them.
+ s.Freeze(rs1b)
+ s.Freeze(rs1c)
+ time.Sleep(5 * time.Second)
+ s.Thaw(rs1b)
+ s.Thaw(rs1c)
+
+ // Wait for the ping to be processed.
+ time.Sleep(500 * time.Millisecond)
+
+ // Repeating the test should now pick the primary server consistently.
+ for i := 0; i < 10; i++ {
+ session.Refresh()
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ c.Assert(hostPort(result.Host), Equals, hostPort(rs1a))
+ }
+}
+
+func (s *S) TestConnectCloseConcurrency(c *C) {
+ restore := mgo.HackPingDelay(500 * time.Millisecond)
+ defer restore()
+ var wg sync.WaitGroup
+ const n = 500
+ wg.Add(n)
+ for i := 0; i < n; i++ {
+ go func() {
+ defer wg.Done()
+ session, err := mgo.Dial("localhost:40001")
+ if err != nil {
+ c.Fatal(err)
+ }
+ time.Sleep(1)
+ session.Close()
+ }()
+ }
+ wg.Wait()
+}
+
+func (s *S) TestSelectServers(c *C) {
+ if !s.versionAtLeast(2, 2) {
+ c.Skip("read preferences introduced in 2.2")
+ }
+
+ session, err := mgo.Dial("localhost:40011")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ session.SetMode(mgo.Eventual, true)
+
+ var result struct{ Host string }
+
+ session.Refresh()
+ session.SelectServers(bson.D{{"rs1", "b"}})
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ c.Assert(hostPort(result.Host), Equals, "40012")
+
+ session.Refresh()
+ session.SelectServers(bson.D{{"rs1", "c"}})
+ err = session.Run("serverStatus", &result)
+ c.Assert(err, IsNil)
+ c.Assert(hostPort(result.Host), Equals, "40013")
+}
+
+func (s *S) TestSelectServersWithMongos(c *C) {
+ if !s.versionAtLeast(2, 2) {
+ c.Skip("read preferences introduced in 2.2")
+ }
+
+ session, err := mgo.Dial("localhost:40021")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ ssresult := &struct{ Host string }{}
+ imresult := &struct{ IsMaster bool }{}
+
+ // Figure the master while still using the strong session.
+ err = session.Run("serverStatus", ssresult)
+ c.Assert(err, IsNil)
+ err = session.Run("isMaster", imresult)
+ c.Assert(err, IsNil)
+ master := ssresult.Host
+ c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
+
+ var slave1, slave2 string
+ switch hostPort(master) {
+ case "40021":
+ slave1, slave2 = "b", "c"
+ case "40022":
+ slave1, slave2 = "a", "c"
+ case "40023":
+ slave1, slave2 = "a", "b"
+ }
+
+ // Collect op counters for everyone.
+ q21a := s.countQueries(c, "localhost:40021")
+ q22a := s.countQueries(c, "localhost:40022")
+ q23a := s.countQueries(c, "localhost:40023")
+
+ // Do a SlaveOk query through MongoS
+ mongos, err := mgo.Dial("localhost:40202")
+ c.Assert(err, IsNil)
+ defer mongos.Close()
+
+ mongos.SetMode(mgo.Monotonic, true)
+
+ mongos.Refresh()
+ mongos.SelectServers(bson.D{{"rs2", slave1}})
+ coll := mongos.DB("mydb").C("mycoll")
+ result := &struct{}{}
+ for i := 0; i != 5; i++ {
+ err := coll.Find(nil).One(result)
+ c.Assert(err, Equals, mgo.ErrNotFound)
+ }
+
+ mongos.Refresh()
+ mongos.SelectServers(bson.D{{"rs2", slave2}})
+ coll = mongos.DB("mydb").C("mycoll")
+ for i := 0; i != 7; i++ {
+ err := coll.Find(nil).One(result)
+ c.Assert(err, Equals, mgo.ErrNotFound)
+ }
+
+ // Collect op counters for everyone again.
+ q21b := s.countQueries(c, "localhost:40021")
+ q22b := s.countQueries(c, "localhost:40022")
+ q23b := s.countQueries(c, "localhost:40023")
+
+ switch hostPort(master) {
+ case "40021":
+ c.Check(q21b-q21a, Equals, 0)
+ c.Check(q22b-q22a, Equals, 5)
+ c.Check(q23b-q23a, Equals, 7)
+ case "40022":
+ c.Check(q21b-q21a, Equals, 5)
+ c.Check(q22b-q22a, Equals, 0)
+ c.Check(q23b-q23a, Equals, 7)
+ case "40023":
+ c.Check(q21b-q21a, Equals, 5)
+ c.Check(q22b-q22a, Equals, 7)
+ c.Check(q23b-q23a, Equals, 0)
+ default:
+ c.Fatal("Uh?")
+ }
+}
+
+func (s *S) TestDoNotFallbackToMonotonic(c *C) {
+ // There was a bug at some point that some functions were
+ // falling back to Monotonic mode. This test ensures all listIndexes
+ // commands go to the primary, as should happen since the session is
+ // in Strong mode.
+ if !s.versionAtLeast(3, 0) {
+ c.Skip("command-counting logic depends on 3.0+")
+ }
+
+ session, err := mgo.Dial("localhost:40012")
+ c.Assert(err, IsNil)
+ defer session.Close()
+
+ for i := 0; i < 15; i++ {
+ q11a := s.countCommands(c, "localhost:40011", "listIndexes")
+ q12a := s.countCommands(c, "localhost:40012", "listIndexes")
+ q13a := s.countCommands(c, "localhost:40013", "listIndexes")
+
+ _, err := session.DB("local").C("system.indexes").Indexes()
+ c.Assert(err, IsNil)
+
+ q11b := s.countCommands(c, "localhost:40011", "listIndexes")
+ q12b := s.countCommands(c, "localhost:40012", "listIndexes")
+ q13b := s.countCommands(c, "localhost:40013", "listIndexes")
+
+ c.Assert(q11b, Equals, q11a+1)
+ c.Assert(q12b, Equals, q12a)
+ c.Assert(q13b, Equals, q13a)
+ }
+}