diff options
Diffstat (limited to 'mongoimport/mongoimport.go')
-rw-r--r-- | mongoimport/mongoimport.go | 98 |
1 files changed, 29 insertions, 69 deletions
diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index b3fd4b4ed66..c3a854c63d4 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -13,7 +13,6 @@ import ( "io" "os" "path/filepath" - "runtime" "strings" "sync" ) @@ -31,13 +30,6 @@ const ( maxMessageSizeBytes = 2 * maxBSONSize ) -// variables used by the input/ingestion goroutines -var ( - numDecodingWorkers = 1 // will be set to numCPUs at runtime - numInsertionWorkers = 1 - batchSize = 10000 -) - // Wrapper for MongoImport functionality type MongoImport struct { // generic mongo tool options @@ -95,16 +87,6 @@ type InputReader interface { // ValidateSettings ensures that the tool specific options supplied for // MongoImport are valid func (mongoImport *MongoImport) ValidateSettings(args []string) error { - if err := mongoImport.ToolOptions.Validate(); err != nil { - return err - } - - // TODO: move to common - // --dbpath is now deprecated for tools with version >= v2.8 - if mongoImport.ToolOptions.DBPath != "" { - return fmt.Errorf("--dbpath is now deprecated. start a mongod instead") - } - // Namespace must have a valid database if none is specified, // use 'test' if mongoImport.ToolOptions.Namespace.DB == "" { @@ -151,58 +133,27 @@ func (mongoImport *MongoImport) ValidateSettings(args []string) error { } } - numCPU := runtime.NumCPU() - - // set the number of operating system threads to use for imports - if mongoImport.IngestOptions.NumOSThreads == nil { - runtime.GOMAXPROCS(numCPU) - } else { - if *mongoImport.IngestOptions.NumOSThreads < 1 { - return fmt.Errorf("--numOSThreads argument must be > 0") - } - runtime.GOMAXPROCS(*mongoImport.IngestOptions.NumOSThreads) - } - // set the number of decoding workers to use for imports - if mongoImport.IngestOptions.NumDecodingWorkers != nil { - if *mongoImport.IngestOptions.NumDecodingWorkers < 1 { - return fmt.Errorf("--numDecodingWorkers argument must be > 0") - } - numDecodingWorkers = *mongoImport.IngestOptions.NumDecodingWorkers - } else { - mongoImport.IngestOptions.NumDecodingWorkers = &numCPU - numDecodingWorkers = numCPU + if mongoImport.ToolOptions.NumDecodingWorkers <= 0 { + mongoImport.ToolOptions.NumDecodingWorkers = mongoImport.ToolOptions.MaxProcs } + log.Logf(log.DebugLow, "Using %v decoding workers", mongoImport.ToolOptions.NumDecodingWorkers) // set the number of insertion workers to use for imports - if mongoImport.IngestOptions.NumInsertionWorkers != nil { - if *mongoImport.IngestOptions.NumInsertionWorkers < 1 { - return fmt.Errorf("--numInsertionThreads argument must be > 0") - } - numInsertionWorkers = *mongoImport.IngestOptions.NumInsertionWorkers - } else { - mongoImport.IngestOptions.NumInsertionWorkers = &numInsertionWorkers + if mongoImport.ToolOptions.NumInsertionWorkers <= 0 { + mongoImport.ToolOptions.NumInsertionWorkers = 1 } - // if maintain --maintainInsertionOrder is true, we can only have one - // insertion worker + log.Logf(log.DebugLow, "Using %v insert workers", mongoImport.ToolOptions.NumInsertionWorkers) + + // if --maintainInsertionOrder is set, we can only allow 1 insertion worker if mongoImport.IngestOptions.MaintainInsertionOrder { - if numInsertionWorkers > 1 { - return fmt.Errorf("cannot specify --maintainInsertionOrder with more than 1 insertionWorker") - } - mongoImport.IngestOptions.NumInsertionWorkers = &numInsertionWorkers + mongoImport.ToolOptions.NumInsertionWorkers = 1 } // get the number of documents per batch - if mongoImport.IngestOptions.BatchSize != nil { - if *mongoImport.IngestOptions.BatchSize < 1 { - return fmt.Errorf("--batchSize argument must be > 0") - } - batchSize = *mongoImport.IngestOptions.BatchSize - } else { - // TODO: TOOLS-335 replace use of global variables - batch size, - // numInsertionWorkers and numDecodingWorkers - mongoImport.IngestOptions.BatchSize = &batchSize + if mongoImport.ToolOptions.BulkBufferSize <= 0 { + mongoImport.ToolOptions.BulkBufferSize = 10000 } // ensure no more than one positional argument is supplied @@ -350,7 +301,10 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp ordered := mongoImport.IngestOptions.MaintainInsertionOrder // set the batch size for ingestion - readDocChanSize := batchSize * numDecodingWorkers + readDocChanSize := mongoImport.ToolOptions.BulkBufferSize * mongoImport.ToolOptions.NumDecodingWorkers + if readDocChanSize == 0 { + readDocChanSize = 1 + } // readDocChan is buffered with readDocChanSize to ensure we only block // accepting reads if processing is slow @@ -380,19 +334,22 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp // IngestDocuments takes a slice of documents and either inserts/upserts them - // based on whether an upsert is requested - into the given collection func (mongoImport *MongoImport) IngestDocuments(readChan chan bson.D) (err error) { - numDecodingWorkers := *mongoImport.IngestOptions.NumInsertionWorkers - // initialize the tomb where all goroutines go to die mongoImport.tomb = &tomb.Tomb{} + numInsertionWorkers := mongoImport.ToolOptions.NumInsertionWorkers + if numInsertionWorkers <= 0 { + numInsertionWorkers = 1 + } + // spawn all the worker goroutines, each in its own goroutine - for i := 0; i < numDecodingWorkers; i++ { + for i := 0; i < numInsertionWorkers; i++ { mongoImport.tomb.Go(func() error { // Each ingest worker will return an error which may // be nil or not. It will be not nil in any of this cases: // // 1. There is a problem connecting with the server - // 2. There server becomes unreachable + // 2. The server becomes unreachable // 3. There is an insertion/update error - e.g. duplicate key // error - and stopOnError is set to true return mongoImport.ingestDocs(readChan) @@ -450,7 +407,7 @@ readLoop: // limit so we self impose a limit by using maxMessageSizeBytes // and send documents over the wire when we hit the batch size // or when we're at/over the maximum message size threshold - if len(documents) == batchSize || numMessageBytes >= maxMessageSizeBytes { + if len(documents) == mongoImport.ToolOptions.BulkBufferSize || numMessageBytes >= maxMessageSizeBytes { if err = mongoImport.ingester(documents, collection); err != nil { return err } @@ -528,6 +485,9 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C numInserted, err = mongoImport.handleUpsert(documents, collection) return err } else { + if len(documents) == 0 { + return + } bulk := collection.Bulk() for _, document := range documents { bulk.Insert(document) @@ -569,9 +529,9 @@ func (mongoImport *MongoImport) getInputReader(in io.Reader) (InputReader, error } } if mongoImport.InputOptions.Type == CSV { - return NewCSVInputReader(fields, in), nil + return NewCSVInputReader(fields, in, mongoImport.ToolOptions.NumDecodingWorkers), nil } else if mongoImport.InputOptions.Type == TSV { - return NewTSVInputReader(fields, in), nil + return NewTSVInputReader(fields, in, mongoImport.ToolOptions.NumDecodingWorkers), nil } - return NewJSONInputReader(mongoImport.InputOptions.JSONArray, in), nil + return NewJSONInputReader(mongoImport.InputOptions.JSONArray, in, mongoImport.ToolOptions.NumDecodingWorkers), nil } |