diff options
author | Wisdom Omuya <deafgoat@gmail.com> | 2014-12-14 23:05:16 -0500 |
---|---|---|
committer | Wisdom Omuya <deafgoat@gmail.com> | 2014-12-15 10:49:34 -0500 |
commit | d8e37bb26698be70eb08ec1a3c9dbcfd83f3c73a (patch) | |
tree | 60922471a056a911b480d1ee0dd380888bb7cd25 /mongoimport | |
parent | 8e191a434883ad6011b0ea194d357e799807d9b0 (diff) | |
download | mongo-d8e37bb26698be70eb08ec1a3c9dbcfd83f3c73a.tar.gz |
TOOLS-496: prevent nil on arrivals
Former-commit-id: bf01ed4ed29ff8b4f3e492ede3db5d15a5d52cc6
Diffstat (limited to 'mongoimport')
-rw-r--r-- | mongoimport/common.go | 22 | ||||
-rw-r--r-- | mongoimport/mongoimport.go | 48 | ||||
-rw-r--r-- | mongoimport/testdata/test_fields_invalid.txt | 3 | ||||
-rw-r--r-- | mongoimport/testdata/test_fields_valid.txt | 3 |
4 files changed, 45 insertions, 31 deletions
diff --git a/mongoimport/common.go b/mongoimport/common.go index 281024a9c1d..71642f24024 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -12,6 +12,7 @@ import ( "sort" "strconv" "strings" + "sync" ) // ConvertibleDoc is an interface that adds the basic Convert method. @@ -195,14 +196,13 @@ func setNestedValue(key string, value interface{}, document *bson.D) { // channel in parallel and then sends over the processed data to the outputChan // channel - either in sequence or concurrently (depending on the value of // ordered) - in which the data was received -func streamDocuments(ordered bool, numDecoders int, readDocChan chan ConvertibleDoc, outputChan chan bson.D) error { +func streamDocuments(ordered bool, numDecoders int, readDocChan chan ConvertibleDoc, outputChan chan bson.D) (retErr error) { if numDecoders == 0 { numDecoders = 1 } var importWorkers []*ImportWorker - + wg := &sync.WaitGroup{} importTomb := &tomb.Tomb{} - inChan := readDocChan outChan := outputChan for i := 0; i < numDecoders; i++ { @@ -216,9 +216,15 @@ func streamDocuments(ordered bool, numDecoders int, readDocChan chan Convertible tomb: importTomb, } importWorkers = append(importWorkers, importWorker) - importTomb.Go(func() error { - return importWorker.processDocuments(ordered) - }) + wg.Add(1) + go func() { + defer wg.Done() + // only set the first worker error and cause sibling goroutines to terminate immediately + if err := importWorker.processDocuments(ordered); err != nil && retErr == nil { + retErr = err + importWorker.tomb.Kill(err) + } + }() } // if ordered, we have to coordinate the sequence in which processed @@ -226,9 +232,9 @@ func streamDocuments(ordered bool, numDecoders int, readDocChan chan Convertible if ordered { doSequentialStreaming(importWorkers, readDocChan, outputChan) } - err := importTomb.Wait() + wg.Wait() close(outputChan) - return err + return } // tokensToBSON reads in slice of records - along with ordered fields names - diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 171b248f358..468c3148449 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -49,7 +49,7 @@ type MongoImport struct { // insertionLock is used to prevent race conditions in incrementing // the insertion count - insertionLock *sync.Mutex + insertionLock sync.Mutex // insertionCount keeps track of how many documents have successfully // been inserted into the database @@ -57,7 +57,7 @@ type MongoImport struct { // the tomb is used to synchronize ingestion goroutines and causes // other sibling goroutines to terminate immediately if one errors out - tomb *tomb.Tomb + tomb.Tomb // fields to use for upsert operations upsertFields []string @@ -324,9 +324,6 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp // handle all input reads in a separate goroutine go inputReader.StreamDocument(ordered, readDocChan, readErrChan) - // initialize insertion lock - mongoImport.insertionLock = &sync.Mutex{} - // return immediately on ingest errors - these will be triggered // either by an issue ingesting data or if the read channel is // closed so we can block here while reads happen in a goroutine @@ -338,29 +335,34 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp // IngestDocuments takes a slice of documents and either inserts/upserts them - // based on whether an upsert is requested - into the given collection -func (mongoImport *MongoImport) IngestDocuments(readDocChan chan bson.D) (err error) { - // initialize the tomb where all goroutines go to die - mongoImport.tomb = &tomb.Tomb{} - +func (mongoImport *MongoImport) IngestDocuments(readDocChan chan bson.D) (retErr error) { numInsertionWorkers := mongoImport.ToolOptions.NumInsertionWorkers if numInsertionWorkers <= 0 { numInsertionWorkers = 1 } - // spawn all the worker goroutines, each in its own goroutine + // Each ingest worker will return an error which will + // be set in the following cases: + // + // 1. There is a problem connecting with the server + // 2. The server becomes unreachable + // 3. There is an insertion/update error - e.g. duplicate key + // error - and stopOnError is set to true + + wg := &sync.WaitGroup{} for i := 0; i < numInsertionWorkers; i++ { - mongoImport.tomb.Go(func() error { - // Each ingest worker will return an error which may - // be nil or not. It will be not nil in any of this cases: - // - // 1. There is a problem connecting with the server - // 2. The server becomes unreachable - // 3. There is an insertion/update error - e.g. duplicate key - // error - and stopOnError is set to true - return mongoImport.runInsertionWorker(readDocChan) - }) - } - return mongoImport.tomb.Wait() + wg.Add(1) + go func() { + defer wg.Done() + // only set the first insertion error and cause sibling goroutines to terminate immediately + if err := mongoImport.runInsertionWorker(readDocChan); err != nil && retErr == nil { + retErr = err + mongoImport.Kill(err) + } + }() + } + wg.Wait() + return } // configureSession takes in a session and modifies it with properly configured @@ -432,7 +434,7 @@ readLoop: } numMessageBytes += len(documentBytes) documents = append(documents, bson.Raw{3, documentBytes}) - case <-mongoImport.tomb.Dying(): + case <-mongoImport.Dying(): return nil } } diff --git a/mongoimport/testdata/test_fields_invalid.txt b/mongoimport/testdata/test_fields_invalid.txt new file mode 100644 index 00000000000..90505050d51 --- /dev/null +++ b/mongoimport/testdata/test_fields_invalid.txt @@ -0,0 +1,3 @@ +a +$ +b diff --git a/mongoimport/testdata/test_fields_valid.txt b/mongoimport/testdata/test_fields_valid.txt new file mode 100644 index 00000000000..de980441c3a --- /dev/null +++ b/mongoimport/testdata/test_fields_valid.txt @@ -0,0 +1,3 @@ +a +b +c |