summaryrefslogtreecommitdiff
path: root/mongoimport/mongoimport.go
diff options
context:
space:
mode:
Diffstat (limited to 'mongoimport/mongoimport.go')
-rw-r--r--mongoimport/mongoimport.go98
1 files changed, 29 insertions, 69 deletions
diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go
index b3fd4b4ed66..c3a854c63d4 100644
--- a/mongoimport/mongoimport.go
+++ b/mongoimport/mongoimport.go
@@ -13,7 +13,6 @@ import (
"io"
"os"
"path/filepath"
- "runtime"
"strings"
"sync"
)
@@ -31,13 +30,6 @@ const (
maxMessageSizeBytes = 2 * maxBSONSize
)
-// variables used by the input/ingestion goroutines
-var (
- numDecodingWorkers = 1 // will be set to numCPUs at runtime
- numInsertionWorkers = 1
- batchSize = 10000
-)
-
// Wrapper for MongoImport functionality
type MongoImport struct {
// generic mongo tool options
@@ -95,16 +87,6 @@ type InputReader interface {
// ValidateSettings ensures that the tool specific options supplied for
// MongoImport are valid
func (mongoImport *MongoImport) ValidateSettings(args []string) error {
- if err := mongoImport.ToolOptions.Validate(); err != nil {
- return err
- }
-
- // TODO: move to common
- // --dbpath is now deprecated for tools with version >= v2.8
- if mongoImport.ToolOptions.DBPath != "" {
- return fmt.Errorf("--dbpath is now deprecated. start a mongod instead")
- }
-
// Namespace must have a valid database if none is specified,
// use 'test'
if mongoImport.ToolOptions.Namespace.DB == "" {
@@ -151,58 +133,27 @@ func (mongoImport *MongoImport) ValidateSettings(args []string) error {
}
}
- numCPU := runtime.NumCPU()
-
- // set the number of operating system threads to use for imports
- if mongoImport.IngestOptions.NumOSThreads == nil {
- runtime.GOMAXPROCS(numCPU)
- } else {
- if *mongoImport.IngestOptions.NumOSThreads < 1 {
- return fmt.Errorf("--numOSThreads argument must be > 0")
- }
- runtime.GOMAXPROCS(*mongoImport.IngestOptions.NumOSThreads)
- }
-
// set the number of decoding workers to use for imports
- if mongoImport.IngestOptions.NumDecodingWorkers != nil {
- if *mongoImport.IngestOptions.NumDecodingWorkers < 1 {
- return fmt.Errorf("--numDecodingWorkers argument must be > 0")
- }
- numDecodingWorkers = *mongoImport.IngestOptions.NumDecodingWorkers
- } else {
- mongoImport.IngestOptions.NumDecodingWorkers = &numCPU
- numDecodingWorkers = numCPU
+ if mongoImport.ToolOptions.NumDecodingWorkers <= 0 {
+ mongoImport.ToolOptions.NumDecodingWorkers = mongoImport.ToolOptions.MaxProcs
}
+ log.Logf(log.DebugLow, "Using %v decoding workers", mongoImport.ToolOptions.NumDecodingWorkers)
// set the number of insertion workers to use for imports
- if mongoImport.IngestOptions.NumInsertionWorkers != nil {
- if *mongoImport.IngestOptions.NumInsertionWorkers < 1 {
- return fmt.Errorf("--numInsertionThreads argument must be > 0")
- }
- numInsertionWorkers = *mongoImport.IngestOptions.NumInsertionWorkers
- } else {
- mongoImport.IngestOptions.NumInsertionWorkers = &numInsertionWorkers
+ if mongoImport.ToolOptions.NumInsertionWorkers <= 0 {
+ mongoImport.ToolOptions.NumInsertionWorkers = 1
}
- // if maintain --maintainInsertionOrder is true, we can only have one
- // insertion worker
+ log.Logf(log.DebugLow, "Using %v insert workers", mongoImport.ToolOptions.NumInsertionWorkers)
+
+ // if --maintainInsertionOrder is set, we can only allow 1 insertion worker
if mongoImport.IngestOptions.MaintainInsertionOrder {
- if numInsertionWorkers > 1 {
- return fmt.Errorf("cannot specify --maintainInsertionOrder with more than 1 insertionWorker")
- }
- mongoImport.IngestOptions.NumInsertionWorkers = &numInsertionWorkers
+ mongoImport.ToolOptions.NumInsertionWorkers = 1
}
// get the number of documents per batch
- if mongoImport.IngestOptions.BatchSize != nil {
- if *mongoImport.IngestOptions.BatchSize < 1 {
- return fmt.Errorf("--batchSize argument must be > 0")
- }
- batchSize = *mongoImport.IngestOptions.BatchSize
- } else {
- // TODO: TOOLS-335 replace use of global variables - batch size,
- // numInsertionWorkers and numDecodingWorkers
- mongoImport.IngestOptions.BatchSize = &batchSize
+ if mongoImport.ToolOptions.BulkBufferSize <= 0 {
+ mongoImport.ToolOptions.BulkBufferSize = 10000
}
// ensure no more than one positional argument is supplied
@@ -350,7 +301,10 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp
ordered := mongoImport.IngestOptions.MaintainInsertionOrder
// set the batch size for ingestion
- readDocChanSize := batchSize * numDecodingWorkers
+ readDocChanSize := mongoImport.ToolOptions.BulkBufferSize * mongoImport.ToolOptions.NumDecodingWorkers
+ if readDocChanSize == 0 {
+ readDocChanSize = 1
+ }
// readDocChan is buffered with readDocChanSize to ensure we only block
// accepting reads if processing is slow
@@ -380,19 +334,22 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp
// IngestDocuments takes a slice of documents and either inserts/upserts them -
// based on whether an upsert is requested - into the given collection
func (mongoImport *MongoImport) IngestDocuments(readChan chan bson.D) (err error) {
- numDecodingWorkers := *mongoImport.IngestOptions.NumInsertionWorkers
-
// initialize the tomb where all goroutines go to die
mongoImport.tomb = &tomb.Tomb{}
+ numInsertionWorkers := mongoImport.ToolOptions.NumInsertionWorkers
+ if numInsertionWorkers <= 0 {
+ numInsertionWorkers = 1
+ }
+
// spawn all the worker goroutines, each in its own goroutine
- for i := 0; i < numDecodingWorkers; i++ {
+ for i := 0; i < numInsertionWorkers; i++ {
mongoImport.tomb.Go(func() error {
// Each ingest worker will return an error which may
// be nil or not. It will be not nil in any of this cases:
//
// 1. There is a problem connecting with the server
- // 2. There server becomes unreachable
+ // 2. The server becomes unreachable
// 3. There is an insertion/update error - e.g. duplicate key
// error - and stopOnError is set to true
return mongoImport.ingestDocs(readChan)
@@ -450,7 +407,7 @@ readLoop:
// limit so we self impose a limit by using maxMessageSizeBytes
// and send documents over the wire when we hit the batch size
// or when we're at/over the maximum message size threshold
- if len(documents) == batchSize || numMessageBytes >= maxMessageSizeBytes {
+ if len(documents) == mongoImport.ToolOptions.BulkBufferSize || numMessageBytes >= maxMessageSizeBytes {
if err = mongoImport.ingester(documents, collection); err != nil {
return err
}
@@ -528,6 +485,9 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C
numInserted, err = mongoImport.handleUpsert(documents, collection)
return err
} else {
+ if len(documents) == 0 {
+ return
+ }
bulk := collection.Bulk()
for _, document := range documents {
bulk.Insert(document)
@@ -569,9 +529,9 @@ func (mongoImport *MongoImport) getInputReader(in io.Reader) (InputReader, error
}
}
if mongoImport.InputOptions.Type == CSV {
- return NewCSVInputReader(fields, in), nil
+ return NewCSVInputReader(fields, in, mongoImport.ToolOptions.NumDecodingWorkers), nil
} else if mongoImport.InputOptions.Type == TSV {
- return NewTSVInputReader(fields, in), nil
+ return NewTSVInputReader(fields, in, mongoImport.ToolOptions.NumDecodingWorkers), nil
}
- return NewJSONInputReader(mongoImport.InputOptions.JSONArray, in), nil
+ return NewJSONInputReader(mongoImport.InputOptions.JSONArray, in, mongoImport.ToolOptions.NumDecodingWorkers), nil
}