summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/common/db/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/common/db/db.go')
-rw-r--r--src/mongo/gotools/common/db/db.go243
1 files changed, 243 insertions, 0 deletions
diff --git a/src/mongo/gotools/common/db/db.go b/src/mongo/gotools/common/db/db.go
new file mode 100644
index 00000000000..a3207c4b467
--- /dev/null
+++ b/src/mongo/gotools/common/db/db.go
@@ -0,0 +1,243 @@
+// Package db implements generic connection to MongoDB, and contains
+// subpackages for specific methods of connection.
+package db
+
+import (
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/password"
+ "gopkg.in/mgo.v2"
+ "gopkg.in/mgo.v2/bson"
+
+ "fmt"
+ "io"
+ "strings"
+ "sync"
+)
+
+type (
+ sessionFlag uint32
+ // Used to get appropriate the DBConnector(s) based on opts
+ GetConnectorFunc func(opts options.ToolOptions) DBConnector
+)
+
+// Session flags.
+const (
+ None sessionFlag = 0
+ Monotonic sessionFlag = 1 << iota
+ DisableSocketTimeout
+)
+
+// MongoDB enforced limits.
+const (
+ MaxBSONSize = 16 * 1024 * 1024 // 16MB - maximum BSON document size
+)
+
+// Default port for integration tests
+const (
+ DefaultTestPort = "33333"
+)
+
+const (
+ ErrLostConnection = "lost connection to server"
+ ErrNoReachableServers = "no reachable servers"
+ ErrNsNotFound = "ns not found"
+ // replication errors list the replset name if we are talking to a mongos,
+ // so we can only check for this universal prefix
+ ErrReplTimeoutPrefix = "waiting for replication timed out"
+ ErrCouldNotContactPrimaryPrefix = "could not contact primary for replica set"
+ ErrCouldNotFindPrimaryPrefix = `could not find host matching read preference { mode: "primary"`
+ ErrUnableToTargetPrefix = "unable to target"
+ ErrNotMaster = "not master"
+ ErrConnectionRefusedSuffix = "Connection refused"
+)
+
+var (
+ GetConnectorFuncs = []GetConnectorFunc{}
+)
+
+// Used to manage database sessions
+type SessionProvider struct {
+
+ // For connecting to the database
+ connector DBConnector
+
+ // used to avoid a race condition around creating the master session
+ masterSessionLock sync.Mutex
+
+ // the master session to use for connection pooling
+ masterSession *mgo.Session
+
+ // flags for generating the master session
+ bypassDocumentValidation bool
+ flags sessionFlag
+ readPreference mgo.Mode
+ tags bson.D
+}
+
+// ApplyOpsResponse represents the response from an 'applyOps' command.
+type ApplyOpsResponse struct {
+ Ok bool `bson:"ok"`
+ ErrMsg string `bson:"errmsg"`
+}
+
+// Oplog represents a MongoDB oplog document.
+type Oplog struct {
+ Timestamp bson.MongoTimestamp `bson:"ts"`
+ HistoryID int64 `bson:"h"`
+ Version int `bson:"v"`
+ Operation string `bson:"op"`
+ Namespace string `bson:"ns"`
+ Object bson.D `bson:"o"`
+ Query bson.D `bson:"o2"`
+}
+
+// Returns a session connected to the database server for which the
+// session provider is configured.
+func (self *SessionProvider) GetSession() (*mgo.Session, error) {
+ self.masterSessionLock.Lock()
+ defer self.masterSessionLock.Unlock()
+
+ // The master session is initialized
+ if self.masterSession != nil {
+ return self.masterSession.Copy(), nil
+ }
+
+ // initialize the provider's master session
+ var err error
+ self.masterSession, err = self.connector.GetNewSession()
+ if err != nil {
+ return nil, fmt.Errorf("error connecting to db server: %v", err)
+ }
+
+ // update masterSession based on flags
+ self.refresh()
+
+ // copy the provider's master session, for connection pooling
+ return self.masterSession.Copy(), nil
+}
+
+// refresh is a helper for modifying the session based on the
+// session provider flags passed in with SetFlags.
+// This helper assumes a lock is already taken.
+func (self *SessionProvider) refresh() {
+ // handle bypassDocumentValidation
+ self.masterSession.SetBypassValidation(self.bypassDocumentValidation)
+
+ // handle readPreference
+ self.masterSession.SetMode(self.readPreference, true)
+
+ // disable timeouts
+ if (self.flags & DisableSocketTimeout) > 0 {
+ self.masterSession.SetSocketTimeout(0)
+ }
+ if self.tags != nil {
+ self.masterSession.SelectServers(self.tags)
+ }
+}
+
+// SetFlags allows certain modifications to the masterSession after initial creation.
+func (self *SessionProvider) SetFlags(flagBits sessionFlag) {
+ self.masterSessionLock.Lock()
+ defer self.masterSessionLock.Unlock()
+
+ self.flags = flagBits
+
+ // make sure we update the master session if one already exists
+ if self.masterSession != nil {
+ self.refresh()
+ }
+}
+
+// SetReadPreference sets the read preference mode in the SessionProvider
+// and eventually in the masterSession
+func (self *SessionProvider) SetReadPreference(pref mgo.Mode) {
+ self.masterSessionLock.Lock()
+ defer self.masterSessionLock.Unlock()
+
+ self.readPreference = pref
+
+ if self.masterSession != nil {
+ self.refresh()
+ }
+}
+
+// SetBypassDocumentValidation sets whether to bypass document validation in the SessionProvider
+// and eventually in the masterSession
+func (self *SessionProvider) SetBypassDocumentValidation(bypassDocumentValidation bool) {
+ self.masterSessionLock.Lock()
+ defer self.masterSessionLock.Unlock()
+
+ self.bypassDocumentValidation = bypassDocumentValidation
+
+ if self.masterSession != nil {
+ self.refresh()
+ }
+}
+
+// SetTags sets the server selection tags in the SessionProvider
+// and eventually in the masterSession
+func (self *SessionProvider) SetTags(tags bson.D) {
+ self.masterSessionLock.Lock()
+ defer self.masterSessionLock.Unlock()
+
+ self.tags = tags
+
+ if self.masterSession != nil {
+ self.refresh()
+ }
+}
+
+// NewSessionProvider constructs a session provider but does not attempt to
+// create the initial session.
+func NewSessionProvider(opts options.ToolOptions) (*SessionProvider, error) {
+ // create the provider
+ provider := &SessionProvider{
+ readPreference: mgo.Primary,
+ bypassDocumentValidation: false,
+ }
+
+ // finalize auth options, filling in missing passwords
+ if opts.Auth.ShouldAskForPassword() {
+ opts.Auth.Password = password.Prompt()
+ }
+
+ // create the connector for dialing the database
+ provider.connector = getConnector(opts)
+
+ // configure the connector
+ err := provider.connector.Configure(opts)
+ if err != nil {
+ return nil, fmt.Errorf("error configuring the connector: %v", err)
+ }
+ return provider, nil
+}
+
+// IsConnectionError returns a boolean indicating if a given error is due to
+// an error in an underlying DB connection (as opposed to some other write
+// failure such as a duplicate key error)
+func IsConnectionError(err error) bool {
+ if err == nil {
+ return false
+ }
+ if err.Error() == ErrNoReachableServers ||
+ err.Error() == io.EOF.Error() ||
+ strings.HasPrefix(err.Error(), ErrReplTimeoutPrefix) ||
+ strings.HasPrefix(err.Error(), ErrCouldNotContactPrimaryPrefix) ||
+ strings.HasPrefix(err.Error(), ErrCouldNotFindPrimaryPrefix) ||
+ strings.HasPrefix(err.Error(), ErrUnableToTargetPrefix) ||
+ err.Error() == ErrNotMaster ||
+ strings.HasSuffix(err.Error(), ErrConnectionRefusedSuffix) {
+ return true
+ }
+ return false
+}
+
+// Get the right type of connector, based on the options
+func getConnector(opts options.ToolOptions) DBConnector {
+ for _, getConnectorFunc := range GetConnectorFuncs {
+ if connector := getConnectorFunc(opts); connector != nil {
+ return connector
+ }
+ }
+ return &VanillaDBConnector{}
+}