summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/mongodump/mongodump.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/mongodump/mongodump.go')
-rw-r--r--src/mongo/gotools/mongodump/mongodump.go803
1 files changed, 803 insertions, 0 deletions
diff --git a/src/mongo/gotools/mongodump/mongodump.go b/src/mongo/gotools/mongodump/mongodump.go
new file mode 100644
index 00000000000..a2dfcc40102
--- /dev/null
+++ b/src/mongo/gotools/mongodump/mongodump.go
@@ -0,0 +1,803 @@
+// Package mongodump creates BSON data from the contents of a MongoDB instance.
+package mongodump
+
+import (
+ "github.com/mongodb/mongo-tools/common/archive"
+ "github.com/mongodb/mongo-tools/common/auth"
+ "github.com/mongodb/mongo-tools/common/bsonutil"
+ "github.com/mongodb/mongo-tools/common/db"
+ "github.com/mongodb/mongo-tools/common/intents"
+ "github.com/mongodb/mongo-tools/common/json"
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/progress"
+ "github.com/mongodb/mongo-tools/common/util"
+ "gopkg.in/mgo.v2"
+ "gopkg.in/mgo.v2/bson"
+
+ "compress/gzip"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+const (
+ progressBarLength = 24
+ progressBarWaitTime = time.Second * 3
+ defaultPermissions = 0755
+)
+
+// MongoDump is a container for the user-specified options and
+// internal state used for running mongodump.
+type MongoDump struct {
+ // basic mongo tool options
+ ToolOptions *options.ToolOptions
+ InputOptions *InputOptions
+ OutputOptions *OutputOptions
+
+ // useful internals that we don't directly expose as options
+ sessionProvider *db.SessionProvider
+ manager *intents.Manager
+ query bson.M
+ oplogCollection string
+ oplogStart bson.MongoTimestamp
+ isMongos bool
+ authVersion int
+ archive *archive.Writer
+ progressManager *progress.Manager
+ // shutdownIntentsNotifier is provided to the multiplexer
+ // as well as the signal handler, and allows them to notify
+ // the intent dumpers that they should shutdown
+ shutdownIntentsNotifier *notifier
+ // the value of stdout gets initizlied to os.Stdout if it's unset
+ stdout io.Writer
+ readPrefMode mgo.Mode
+ readPrefTags []bson.D
+}
+
+type notifier struct {
+ notified chan struct{}
+ once sync.Once
+}
+
+func (n *notifier) Notify() { n.once.Do(func() { close(n.notified) }) }
+
+func newNotifier() *notifier { return &notifier{notified: make(chan struct{})} }
+
+// ValidateOptions checks for any incompatible sets of options.
+func (dump *MongoDump) ValidateOptions() error {
+ switch {
+ case dump.OutputOptions.Out == "-" && dump.ToolOptions.Namespace.Collection == "":
+ return fmt.Errorf("can only dump a single collection to stdout")
+ case dump.ToolOptions.Namespace.DB == "" && dump.ToolOptions.Namespace.Collection != "":
+ return fmt.Errorf("cannot dump a collection without a specified database")
+ case dump.InputOptions.Query != "" && dump.ToolOptions.Namespace.Collection == "":
+ return fmt.Errorf("cannot dump using a query without a specified collection")
+ case dump.InputOptions.QueryFile != "" && dump.ToolOptions.Namespace.Collection == "":
+ return fmt.Errorf("cannot dump using a queryFile without a specified collection")
+ case dump.InputOptions.Query != "" && dump.InputOptions.QueryFile != "":
+ return fmt.Errorf("either query or queryFile can be specified as a query option, not both")
+ case dump.InputOptions.Query != "" && dump.InputOptions.TableScan:
+ return fmt.Errorf("cannot use --forceTableScan when specifying --query")
+ case dump.OutputOptions.DumpDBUsersAndRoles && dump.ToolOptions.Namespace.DB == "":
+ return fmt.Errorf("must specify a database when running with dumpDbUsersAndRoles")
+ case dump.OutputOptions.DumpDBUsersAndRoles && dump.ToolOptions.Namespace.Collection != "":
+ return fmt.Errorf("cannot specify a collection when running with dumpDbUsersAndRoles")
+ case dump.OutputOptions.Oplog && dump.ToolOptions.Namespace.DB != "":
+ return fmt.Errorf("--oplog mode only supported on full dumps")
+ case len(dump.OutputOptions.ExcludedCollections) > 0 && dump.ToolOptions.Namespace.Collection != "":
+ return fmt.Errorf("--collection is not allowed when --excludeCollection is specified")
+ case len(dump.OutputOptions.ExcludedCollectionPrefixes) > 0 && dump.ToolOptions.Namespace.Collection != "":
+ return fmt.Errorf("--collection is not allowed when --excludeCollectionsWithPrefix is specified")
+ case len(dump.OutputOptions.ExcludedCollections) > 0 && dump.ToolOptions.Namespace.DB == "":
+ return fmt.Errorf("--db is required when --excludeCollection is specified")
+ case len(dump.OutputOptions.ExcludedCollectionPrefixes) > 0 && dump.ToolOptions.Namespace.DB == "":
+ return fmt.Errorf("--db is required when --excludeCollectionsWithPrefix is specified")
+ case dump.OutputOptions.Repair && dump.InputOptions.Query != "":
+ return fmt.Errorf("cannot run a query with --repair enabled")
+ case dump.OutputOptions.Repair && dump.InputOptions.QueryFile != "":
+ return fmt.Errorf("cannot run a queryFile with --repair enabled")
+ case dump.OutputOptions.Out != "" && dump.OutputOptions.Archive != "":
+ return fmt.Errorf("--out not allowed when --archive is specified")
+ case dump.OutputOptions.Out == "-" && dump.OutputOptions.Gzip:
+ return fmt.Errorf("compression can't be used when dumping a single collection to standard output")
+ case dump.OutputOptions.NumParallelCollections <= 0:
+ return fmt.Errorf("numParallelCollections must be positive")
+ }
+ return nil
+}
+
+// Init performs preliminary setup operations for MongoDump.
+func (dump *MongoDump) Init() error {
+ err := dump.ValidateOptions()
+ if err != nil {
+ return fmt.Errorf("bad option: %v", err)
+ }
+ if dump.stdout == nil {
+ dump.stdout = os.Stdout
+ }
+ dump.sessionProvider, err = db.NewSessionProvider(*dump.ToolOptions)
+ if err != nil {
+ return fmt.Errorf("can't create session: %v", err)
+ }
+
+ // temporarily allow secondary reads for the isMongos check
+ dump.sessionProvider.SetReadPreference(mgo.Nearest)
+ dump.isMongos, err = dump.sessionProvider.IsMongos()
+ if err != nil {
+ return err
+ }
+
+ if dump.isMongos && dump.OutputOptions.Oplog {
+ return fmt.Errorf("can't use --oplog option when dumping from a mongos")
+ }
+
+ var mode mgo.Mode
+ if dump.ToolOptions.ReplicaSetName != "" || dump.isMongos {
+ mode = mgo.Primary
+ } else {
+ mode = mgo.Nearest
+ }
+ var tags bson.D
+
+ if dump.InputOptions.ReadPreference != "" {
+ mode, tags, err = db.ParseReadPreference(dump.InputOptions.ReadPreference)
+ if err != nil {
+ return fmt.Errorf("error parsing --readPreference : %v", err)
+ }
+ if len(tags) > 0 {
+ dump.sessionProvider.SetTags(tags)
+ }
+ }
+
+ // warn if we are trying to dump from a secondary in a sharded cluster
+ if dump.isMongos && mode != mgo.Primary {
+ log.Logvf(log.Always, db.WarningNonPrimaryMongosConnection)
+ }
+
+ dump.sessionProvider.SetReadPreference(mode)
+ dump.sessionProvider.SetTags(tags)
+ dump.sessionProvider.SetFlags(db.DisableSocketTimeout)
+
+ // return a helpful error message for mongos --repair
+ if dump.OutputOptions.Repair && dump.isMongos {
+ return fmt.Errorf("--repair flag cannot be used on a mongos")
+ }
+
+ dump.manager = intents.NewIntentManager()
+ dump.progressManager = progress.NewProgressBarManager(log.Writer(0), progressBarWaitTime)
+ return nil
+}
+
+// Dump handles some final options checking and executes MongoDump.
+func (dump *MongoDump) Dump() (err error) {
+ dump.shutdownIntentsNotifier = newNotifier()
+
+ if dump.InputOptions.HasQuery() {
+ // parse JSON then convert extended JSON values
+ var asJSON interface{}
+ content, err := dump.InputOptions.GetQuery()
+ if err != nil {
+ return err
+ }
+ err = json.Unmarshal(content, &asJSON)
+ if err != nil {
+ return fmt.Errorf("error parsing query as json: %v", err)
+ }
+ convertedJSON, err := bsonutil.ConvertJSONValueToBSON(asJSON)
+ if err != nil {
+ return fmt.Errorf("error converting query to bson: %v", err)
+ }
+ asMap, ok := convertedJSON.(map[string]interface{})
+ if !ok {
+ // unlikely to be reached
+ return fmt.Errorf("query is not in proper format")
+ }
+ dump.query = bson.M(asMap)
+ }
+
+ if dump.OutputOptions.DumpDBUsersAndRoles {
+ // first make sure this is possible with the connected database
+ dump.authVersion, err = auth.GetAuthVersion(dump.sessionProvider)
+ if err == nil {
+ err = auth.VerifySystemAuthVersion(dump.sessionProvider)
+ }
+ if err != nil {
+ return fmt.Errorf("error getting auth schema version for dumpDbUsersAndRoles: %v", err)
+ }
+ log.Logvf(log.DebugLow, "using auth schema version %v", dump.authVersion)
+ if dump.authVersion < 3 {
+ return fmt.Errorf("backing up users and roles is only supported for "+
+ "deployments with auth schema versions >= 3, found: %v", dump.authVersion)
+ }
+ }
+
+ if dump.OutputOptions.Archive != "" {
+ //getArchiveOut gives us a WriteCloser to which we should write the archive
+ var archiveOut io.WriteCloser
+ archiveOut, err = dump.getArchiveOut()
+ if err != nil {
+ return err
+ }
+ dump.archive = &archive.Writer{
+ // The archive.Writer needs its own copy of archiveOut because things
+ // like the prelude are not written by the multiplexer.
+ Out: archiveOut,
+ Mux: archive.NewMultiplexer(archiveOut, dump.shutdownIntentsNotifier),
+ }
+ go dump.archive.Mux.Run()
+ defer func() {
+ // The Mux runs until its Control is closed
+ close(dump.archive.Mux.Control)
+ muxErr := <-dump.archive.Mux.Completed
+ archiveOut.Close()
+ if muxErr != nil {
+ if err != nil {
+ err = fmt.Errorf("archive writer: %v / %v", err, muxErr)
+ } else {
+ err = fmt.Errorf("archive writer: %v", muxErr)
+ }
+ log.Logvf(log.DebugLow, "%v", err)
+ } else {
+ log.Logvf(log.DebugLow, "mux completed successfully")
+ }
+ }()
+ }
+
+ // switch on what kind of execution to do
+ switch {
+ case dump.ToolOptions.DB == "" && dump.ToolOptions.Collection == "":
+ err = dump.CreateAllIntents()
+ case dump.ToolOptions.DB != "" && dump.ToolOptions.Collection == "":
+ err = dump.CreateIntentsForDatabase(dump.ToolOptions.DB)
+ case dump.ToolOptions.DB != "" && dump.ToolOptions.Collection != "":
+ err = dump.CreateCollectionIntent(dump.ToolOptions.DB, dump.ToolOptions.Collection)
+ }
+ if err != nil {
+ return err
+ }
+
+ if dump.OutputOptions.Oplog {
+ err = dump.CreateOplogIntents()
+ if err != nil {
+ return err
+ }
+ }
+
+ if dump.OutputOptions.DumpDBUsersAndRoles && dump.ToolOptions.DB != "admin" {
+ err = dump.CreateUsersRolesVersionIntentsForDB(dump.ToolOptions.DB)
+ if err != nil {
+ return err
+ }
+ }
+
+ // verify we can use repair cursors
+ if dump.OutputOptions.Repair {
+ log.Logv(log.DebugLow, "verifying that the connected server supports repairCursor")
+ if dump.isMongos {
+ return fmt.Errorf("cannot use --repair on mongos")
+ }
+ exampleIntent := dump.manager.Peek()
+ if exampleIntent != nil {
+ supported, err := dump.sessionProvider.SupportsRepairCursor(
+ exampleIntent.DB, exampleIntent.C)
+ if !supported {
+ return err // no extra context needed
+ }
+ }
+ }
+
+ // IO Phase I
+ // metadata, users, roles, and versions
+
+ // TODO, either remove this debug or improve the language
+ log.Logvf(log.DebugHigh, "dump phase I: metadata, indexes, users, roles, version")
+
+ err = dump.DumpMetadata()
+ if err != nil {
+ return fmt.Errorf("error dumping metadata: %v", err)
+ }
+
+ if dump.OutputOptions.Archive != "" {
+ session, err := dump.sessionProvider.GetSession()
+ if err != nil {
+ return err
+ }
+ buildInfo, err := session.BuildInfo()
+ var serverVersion string
+ if err != nil {
+ log.Logvf(log.Always, "warning, couldn't get version information from server: %v", err)
+ serverVersion = "unknown"
+ } else {
+ serverVersion = buildInfo.Version
+ }
+ dump.archive.Prelude, err = archive.NewPrelude(dump.manager, dump.OutputOptions.NumParallelCollections, serverVersion)
+ if err != nil {
+ return fmt.Errorf("creating archive prelude: %v", err)
+ }
+ err = dump.archive.Prelude.Write(dump.archive.Out)
+ if err != nil {
+ return fmt.Errorf("error writing metadata into archive: %v", err)
+ }
+ }
+
+ err = dump.DumpSystemIndexes()
+ if err != nil {
+ return fmt.Errorf("error dumping system indexes: %v", err)
+ }
+
+ if dump.ToolOptions.DB == "admin" || dump.ToolOptions.DB == "" {
+ err = dump.DumpUsersAndRoles()
+ if err != nil {
+ return fmt.Errorf("error dumping users and roles: %v", err)
+ }
+ }
+ if dump.OutputOptions.DumpDBUsersAndRoles {
+ log.Logvf(log.Always, "dumping users and roles for %v", dump.ToolOptions.DB)
+ if dump.ToolOptions.DB == "admin" {
+ log.Logvf(log.Always, "skipping users/roles dump, already dumped admin database")
+ } else {
+ err = dump.DumpUsersAndRolesForDB(dump.ToolOptions.DB)
+ if err != nil {
+ return fmt.Errorf("error dumping users and roles for db: %v", err)
+ }
+ }
+ }
+
+ // If oplog capturing is enabled, we first check the most recent
+ // oplog entry and save its timestamp, this will let us later
+ // copy all oplog entries that occurred while dumping, creating
+ // what is effectively a point-in-time snapshot.
+ if dump.OutputOptions.Oplog {
+ err := dump.determineOplogCollectionName()
+ if err != nil {
+ return fmt.Errorf("error finding oplog: %v", err)
+ }
+ log.Logvf(log.Info, "getting most recent oplog timestamp")
+ dump.oplogStart, err = dump.getOplogStartTime()
+ if err != nil {
+ return fmt.Errorf("error getting oplog start: %v", err)
+ }
+ }
+
+ // IO Phase II
+ // regular collections
+
+ // TODO, either remove this debug or improve the language
+ log.Logvf(log.DebugHigh, "dump phase II: regular collections")
+
+ // kick off the progress bar manager and begin dumping intents
+ dump.progressManager.Start()
+ defer dump.progressManager.Stop()
+
+ if err := dump.DumpIntents(); err != nil {
+ return err
+ }
+
+ // IO Phase III
+ // oplog
+
+ // TODO, either remove this debug or improve the language
+ log.Logvf(log.DebugLow, "dump phase III: the oplog")
+
+ // If we are capturing the oplog, we dump all oplog entries that occurred
+ // while dumping the database. Before and after dumping the oplog,
+ // we check to see if the oplog has rolled over (i.e. the most recent entry when
+ // we started still exist, so we know we haven't lost data)
+ if dump.OutputOptions.Oplog {
+ log.Logvf(log.DebugLow, "checking if oplog entry %v still exists", dump.oplogStart)
+ exists, err := dump.checkOplogTimestampExists(dump.oplogStart)
+ if !exists {
+ return fmt.Errorf(
+ "oplog overflow: mongodump was unable to capture all new oplog entries during execution")
+ }
+ if err != nil {
+ return fmt.Errorf("unable to check oplog for overflow: %v", err)
+ }
+ log.Logvf(log.DebugHigh, "oplog entry %v still exists", dump.oplogStart)
+
+ log.Logvf(log.Always, "writing captured oplog to %v", dump.manager.Oplog().Location)
+ err = dump.DumpOplogAfterTimestamp(dump.oplogStart)
+ if err != nil {
+ return fmt.Errorf("error dumping oplog: %v", err)
+ }
+
+ // check the oplog for a rollover one last time, to avoid a race condition
+ // wherein the oplog rolls over in the time after our first check, but before
+ // we copy it.
+ log.Logvf(log.DebugLow, "checking again if oplog entry %v still exists", dump.oplogStart)
+ exists, err = dump.checkOplogTimestampExists(dump.oplogStart)
+ if !exists {
+ return fmt.Errorf(
+ "oplog overflow: mongodump was unable to capture all new oplog entries during execution")
+ }
+ if err != nil {
+ return fmt.Errorf("unable to check oplog for overflow: %v", err)
+ }
+ log.Logvf(log.DebugHigh, "oplog entry %v still exists", dump.oplogStart)
+ }
+
+ log.Logvf(log.DebugLow, "finishing dump")
+
+ return err
+}
+
+// DumpIntents iterates through the previously-created intents and
+// dumps all of the found collections.
+func (dump *MongoDump) DumpIntents() error {
+ resultChan := make(chan error)
+
+ jobs := dump.OutputOptions.NumParallelCollections
+ if numIntents := len(dump.manager.Intents()); jobs > numIntents {
+ jobs = numIntents
+ }
+
+ if jobs > 1 {
+ dump.manager.Finalize(intents.LongestTaskFirst)
+ } else {
+ dump.manager.Finalize(intents.Legacy)
+ }
+
+ log.Logvf(log.Info, "dumping up to %v collections in parallel", jobs)
+
+ // start a goroutine for each job thread
+ for i := 0; i < jobs; i++ {
+ go func(id int) {
+ log.Logvf(log.DebugHigh, "starting dump routine with id=%v", id)
+ for {
+ intent := dump.manager.Pop()
+ if intent == nil {
+ log.Logvf(log.DebugHigh, "ending dump routine with id=%v, no more work to do", id)
+ resultChan <- nil
+ return
+ }
+ err := dump.DumpIntent(intent)
+ if err != nil {
+ resultChan <- err
+ return
+ }
+ dump.manager.Finish(intent)
+ }
+ }(i)
+ }
+
+ // wait until all goroutines are done or one of them errors out
+ for i := 0; i < jobs; i++ {
+ if err := <-resultChan; err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// DumpIntent dumps the specified database's collection.
+func (dump *MongoDump) DumpIntent(intent *intents.Intent) error {
+ session, err := dump.sessionProvider.GetSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+ // in mgo, setting prefetch = 1.0 causes the driver to make requests for
+ // more results as soon as results are returned. This effectively
+ // duplicates the behavior of an exhaust cursor.
+ session.SetPrefetch(1.0)
+
+ err = intent.BSONFile.Open()
+ if err != nil {
+ return err
+ }
+ defer intent.BSONFile.Close()
+
+ var findQuery *mgo.Query
+ switch {
+ case len(dump.query) > 0:
+ findQuery = session.DB(intent.DB).C(intent.C).Find(dump.query)
+ case dump.InputOptions.TableScan:
+ // ---forceTablesScan runs the query without snapshot enabled
+ findQuery = session.DB(intent.DB).C(intent.C).Find(nil)
+ default:
+ findQuery = session.DB(intent.DB).C(intent.C).Find(nil).Snapshot()
+
+ }
+
+ var dumpCount int64
+
+ if dump.OutputOptions.Out == "-" {
+ log.Logvf(log.Always, "writing %v to stdout", intent.Namespace())
+ dumpCount, err = dump.dumpQueryToWriter(findQuery, intent)
+ if err == nil {
+ // on success, print the document count
+ log.Logvf(log.Always, "dumped %v %v", dumpCount, docPlural(dumpCount))
+ }
+ return err
+ }
+
+ // set where the intent will be written to
+ if dump.OutputOptions.Archive != "" {
+ if dump.OutputOptions.Archive == "-" {
+ intent.Location = "archive on stdout"
+ } else {
+ intent.Location = fmt.Sprintf("archive '%v'", dump.OutputOptions.Archive)
+ }
+ }
+
+ if !dump.OutputOptions.Repair {
+ log.Logvf(log.Always, "writing %v to %v", intent.Namespace(), intent.Location)
+ if dumpCount, err = dump.dumpQueryToWriter(findQuery, intent); err != nil {
+ return err
+ }
+ } else {
+ // handle repairs as a special case, since we cannot count them
+ log.Logvf(log.Always, "writing repair of %v to %v", intent.Namespace(), intent.Location)
+ repairIter := session.DB(intent.DB).C(intent.C).Repair()
+ repairCounter := progress.NewCounter(1) // this counter is ignored
+ if err := dump.dumpIterToWriter(repairIter, intent.BSONFile, repairCounter); err != nil {
+ return fmt.Errorf("repair error: %v", err)
+ }
+ _, repairCount := repairCounter.Progress()
+ log.Logvf(log.Always, "\trepair cursor found %v %v in %v",
+ repairCount, docPlural(repairCount), intent.Namespace())
+ }
+
+ log.Logvf(log.Always, "done dumping %v (%v %v)", intent.Namespace(), dumpCount, docPlural(dumpCount))
+ return nil
+}
+
+// dumpQueryToWriter takes an mgo Query, its intent, and a writer, performs the query,
+// and writes the raw bson results to the writer. Returns a final count of documents
+// dumped, and any errors that occured.
+func (dump *MongoDump) dumpQueryToWriter(
+ query *mgo.Query, intent *intents.Intent) (int64, error) {
+ var total int
+ var err error
+ if len(dump.query) == 0 {
+ total, err = query.Count()
+ if err != nil {
+ return int64(0), fmt.Errorf("error reading from db: %v", err)
+ }
+ log.Logvf(log.DebugLow, "counted %v %v in %v", total, docPlural(int64(total)), intent.Namespace())
+ } else {
+ log.Logvf(log.DebugLow, "not counting query on %v", intent.Namespace())
+ }
+
+ dumpProgressor := progress.NewCounter(int64(total))
+ bar := &progress.Bar{
+ Name: intent.Namespace(),
+ Watching: dumpProgressor,
+ BarLength: progressBarLength,
+ }
+ dump.progressManager.Attach(bar)
+ defer dump.progressManager.Detach(bar)
+
+ err = dump.dumpIterToWriter(query.Iter(), intent.BSONFile, dumpProgressor)
+ _, dumpCount := dumpProgressor.Progress()
+
+ return dumpCount, err
+}
+
+// dumpIterToWriter takes an mgo iterator, a writer, and a pointer to
+// a counter, and dumps the iterator's contents to the writer.
+func (dump *MongoDump) dumpIterToWriter(
+ iter *mgo.Iter, writer io.Writer, progressCount progress.Updateable) error {
+ var termErr error
+
+ // We run the result iteration in its own goroutine,
+ // this allows disk i/o to not block reads from the db,
+ // which gives a slight speedup on benchmarks
+ buffChan := make(chan []byte)
+ go func() {
+ for {
+ select {
+ case <-dump.shutdownIntentsNotifier.notified:
+ log.Logvf(log.DebugHigh, "terminating writes")
+ termErr = util.ErrTerminated
+ close(buffChan)
+ return
+ default:
+ raw := &bson.Raw{}
+ next := iter.Next(raw)
+ if !next {
+ // we check the iterator for errors below
+ close(buffChan)
+ return
+ }
+ nextCopy := make([]byte, len(raw.Data))
+ copy(nextCopy, raw.Data)
+ buffChan <- nextCopy
+ }
+ }
+ }()
+
+ // while there are still results in the database,
+ // grab results from the goroutine and write them to filesystem
+ for {
+ buff, alive := <-buffChan
+ if !alive {
+ if iter.Err() != nil {
+ return fmt.Errorf("error reading collection: %v", iter.Err())
+ }
+ break
+ }
+ _, err := writer.Write(buff)
+ if err != nil {
+ return fmt.Errorf("error writing to file: %v", err)
+ }
+ progressCount.Inc(1)
+ }
+ return termErr
+}
+
+// DumpUsersAndRolesForDB queries and dumps the users and roles tied to the given
+// database. Only works with an authentication schema version >= 3.
+func (dump *MongoDump) DumpUsersAndRolesForDB(db string) error {
+ session, err := dump.sessionProvider.GetSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+
+ dbQuery := bson.M{"db": db}
+ usersQuery := session.DB("admin").C("system.users").Find(dbQuery)
+ intent := dump.manager.Users()
+ err = intent.BSONFile.Open()
+ if err != nil {
+ return fmt.Errorf("error opening output stream for dumping Users: %v", err)
+ }
+ defer intent.BSONFile.Close()
+ _, err = dump.dumpQueryToWriter(usersQuery, intent)
+ if err != nil {
+ return fmt.Errorf("error dumping db users: %v", err)
+ }
+
+ rolesQuery := session.DB("admin").C("system.roles").Find(dbQuery)
+ intent = dump.manager.Roles()
+ err = intent.BSONFile.Open()
+ if err != nil {
+ return fmt.Errorf("error opening output stream for dumping Roles: %v", err)
+ }
+ defer intent.BSONFile.Close()
+ _, err = dump.dumpQueryToWriter(rolesQuery, intent)
+ if err != nil {
+ return fmt.Errorf("error dumping db roles: %v", err)
+ }
+
+ versionQuery := session.DB("admin").C("system.version").Find(nil)
+ intent = dump.manager.AuthVersion()
+ err = intent.BSONFile.Open()
+ if err != nil {
+ return fmt.Errorf("error opening output stream for dumping AuthVersion: %v", err)
+ }
+ defer intent.BSONFile.Close()
+ _, err = dump.dumpQueryToWriter(versionQuery, intent)
+ if err != nil {
+ return fmt.Errorf("error dumping db auth version: %v", err)
+ }
+
+ return nil
+}
+
+// DumpUsersAndRoles dumps all of the users and roles and versions
+// TODO: This and DumpUsersAndRolesForDB should be merged, correctly
+func (dump *MongoDump) DumpUsersAndRoles() error {
+ var err error
+ if dump.manager.Users() != nil {
+ err = dump.DumpIntent(dump.manager.Users())
+ if err != nil {
+ return err
+ }
+ }
+ if dump.manager.Roles() != nil {
+ err = dump.DumpIntent(dump.manager.Roles())
+ if err != nil {
+ return err
+ }
+ }
+ if dump.manager.AuthVersion() != nil {
+ err = dump.DumpIntent(dump.manager.AuthVersion())
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// DumpSystemIndexes dumps all of the system.indexes
+func (dump *MongoDump) DumpSystemIndexes() error {
+ for _, dbName := range dump.manager.SystemIndexDBs() {
+ err := dump.DumpIntent(dump.manager.SystemIndexes(dbName))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// DumpMetadata dumps the metadata for each intent in the manager
+// that has metadata
+func (dump *MongoDump) DumpMetadata() error {
+ allIntents := dump.manager.Intents()
+ for _, intent := range allIntents {
+ if intent.MetadataFile != nil {
+ err := dump.dumpMetadata(intent)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// nopCloseWriter implements io.WriteCloser. It wraps up a io.Writer, and adds a no-op Close
+type nopCloseWriter struct {
+ io.Writer
+}
+
+// Close does nothing on nopCloseWriters
+func (*nopCloseWriter) Close() error {
+ return nil
+}
+
+// wrappedWriteCloser implements io.WriteCloser. It wraps up two WriteClosers. The Write method
+// of the io.WriteCloser is implemented by the embedded io.WriteCloser
+type wrappedWriteCloser struct {
+ io.WriteCloser
+ inner io.WriteCloser
+}
+
+// Close is part of the io.WriteCloser interface. Close closes both the embedded io.WriteCloser as
+// well as the inner io.WriteCloser
+func (wwc *wrappedWriteCloser) Close() error {
+ err := wwc.WriteCloser.Close()
+ if err != nil {
+ return err
+ }
+ return wwc.inner.Close()
+}
+
+func (dump *MongoDump) getArchiveOut() (out io.WriteCloser, err error) {
+ if dump.OutputOptions.Archive == "-" {
+ out = &nopCloseWriter{dump.stdout}
+ } else {
+ targetStat, err := os.Stat(dump.OutputOptions.Archive)
+ if err == nil && targetStat.IsDir() {
+ defaultArchiveFilePath :=
+ filepath.Join(dump.OutputOptions.Archive, "archive")
+ if dump.OutputOptions.Gzip {
+ defaultArchiveFilePath = defaultArchiveFilePath + ".gz"
+ }
+ out, err = os.Create(defaultArchiveFilePath)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ out, err = os.Create(dump.OutputOptions.Archive)
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ if dump.OutputOptions.Gzip {
+ return &wrappedWriteCloser{
+ WriteCloser: gzip.NewWriter(out),
+ inner: out,
+ }, nil
+ }
+ return out, nil
+}
+
+// docPlural returns "document" or "documents" depending on the
+// count of documents passed in.
+func docPlural(count int64) string {
+ return util.Pluralize(int(count), "document", "documents")
+}
+
+func (dump *MongoDump) HandleInterrupt() {
+ if dump.shutdownIntentsNotifier != nil {
+ dump.shutdownIntentsNotifier.Notify()
+ }
+}