summaryrefslogtreecommitdiff
path: root/mongoimport
diff options
context:
space:
mode:
authorWisdom Omuya <deafgoat@gmail.com>2014-12-14 23:05:16 -0500
committerWisdom Omuya <deafgoat@gmail.com>2014-12-15 10:49:34 -0500
commitd8e37bb26698be70eb08ec1a3c9dbcfd83f3c73a (patch)
tree60922471a056a911b480d1ee0dd380888bb7cd25 /mongoimport
parent8e191a434883ad6011b0ea194d357e799807d9b0 (diff)
downloadmongo-d8e37bb26698be70eb08ec1a3c9dbcfd83f3c73a.tar.gz
TOOLS-496: prevent nil on arrivals
Former-commit-id: bf01ed4ed29ff8b4f3e492ede3db5d15a5d52cc6
Diffstat (limited to 'mongoimport')
-rw-r--r--mongoimport/common.go22
-rw-r--r--mongoimport/mongoimport.go48
-rw-r--r--mongoimport/testdata/test_fields_invalid.txt3
-rw-r--r--mongoimport/testdata/test_fields_valid.txt3
4 files changed, 45 insertions, 31 deletions
diff --git a/mongoimport/common.go b/mongoimport/common.go
index 281024a9c1d..71642f24024 100644
--- a/mongoimport/common.go
+++ b/mongoimport/common.go
@@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"strings"
+ "sync"
)
// ConvertibleDoc is an interface that adds the basic Convert method.
@@ -195,14 +196,13 @@ func setNestedValue(key string, value interface{}, document *bson.D) {
// channel in parallel and then sends over the processed data to the outputChan
// channel - either in sequence or concurrently (depending on the value of
// ordered) - in which the data was received
-func streamDocuments(ordered bool, numDecoders int, readDocChan chan ConvertibleDoc, outputChan chan bson.D) error {
+func streamDocuments(ordered bool, numDecoders int, readDocChan chan ConvertibleDoc, outputChan chan bson.D) (retErr error) {
if numDecoders == 0 {
numDecoders = 1
}
var importWorkers []*ImportWorker
-
+ wg := &sync.WaitGroup{}
importTomb := &tomb.Tomb{}
-
inChan := readDocChan
outChan := outputChan
for i := 0; i < numDecoders; i++ {
@@ -216,9 +216,15 @@ func streamDocuments(ordered bool, numDecoders int, readDocChan chan Convertible
tomb: importTomb,
}
importWorkers = append(importWorkers, importWorker)
- importTomb.Go(func() error {
- return importWorker.processDocuments(ordered)
- })
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ // only set the first worker error and cause sibling goroutines to terminate immediately
+ if err := importWorker.processDocuments(ordered); err != nil && retErr == nil {
+ retErr = err
+ importWorker.tomb.Kill(err)
+ }
+ }()
}
// if ordered, we have to coordinate the sequence in which processed
@@ -226,9 +232,9 @@ func streamDocuments(ordered bool, numDecoders int, readDocChan chan Convertible
if ordered {
doSequentialStreaming(importWorkers, readDocChan, outputChan)
}
- err := importTomb.Wait()
+ wg.Wait()
close(outputChan)
- return err
+ return
}
// tokensToBSON reads in slice of records - along with ordered fields names -
diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go
index 171b248f358..468c3148449 100644
--- a/mongoimport/mongoimport.go
+++ b/mongoimport/mongoimport.go
@@ -49,7 +49,7 @@ type MongoImport struct {
// insertionLock is used to prevent race conditions in incrementing
// the insertion count
- insertionLock *sync.Mutex
+ insertionLock sync.Mutex
// insertionCount keeps track of how many documents have successfully
// been inserted into the database
@@ -57,7 +57,7 @@ type MongoImport struct {
// the tomb is used to synchronize ingestion goroutines and causes
// other sibling goroutines to terminate immediately if one errors out
- tomb *tomb.Tomb
+ tomb.Tomb
// fields to use for upsert operations
upsertFields []string
@@ -324,9 +324,6 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp
// handle all input reads in a separate goroutine
go inputReader.StreamDocument(ordered, readDocChan, readErrChan)
- // initialize insertion lock
- mongoImport.insertionLock = &sync.Mutex{}
-
// return immediately on ingest errors - these will be triggered
// either by an issue ingesting data or if the read channel is
// closed so we can block here while reads happen in a goroutine
@@ -338,29 +335,34 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp
// IngestDocuments takes a slice of documents and either inserts/upserts them -
// based on whether an upsert is requested - into the given collection
-func (mongoImport *MongoImport) IngestDocuments(readDocChan chan bson.D) (err error) {
- // initialize the tomb where all goroutines go to die
- mongoImport.tomb = &tomb.Tomb{}
-
+func (mongoImport *MongoImport) IngestDocuments(readDocChan chan bson.D) (retErr error) {
numInsertionWorkers := mongoImport.ToolOptions.NumInsertionWorkers
if numInsertionWorkers <= 0 {
numInsertionWorkers = 1
}
- // spawn all the worker goroutines, each in its own goroutine
+ // Each ingest worker will return an error which will
+ // be set in the following cases:
+ //
+ // 1. There is a problem connecting with the server
+ // 2. The server becomes unreachable
+ // 3. There is an insertion/update error - e.g. duplicate key
+ // error - and stopOnError is set to true
+
+ wg := &sync.WaitGroup{}
for i := 0; i < numInsertionWorkers; i++ {
- mongoImport.tomb.Go(func() error {
- // Each ingest worker will return an error which may
- // be nil or not. It will be not nil in any of this cases:
- //
- // 1. There is a problem connecting with the server
- // 2. The server becomes unreachable
- // 3. There is an insertion/update error - e.g. duplicate key
- // error - and stopOnError is set to true
- return mongoImport.runInsertionWorker(readDocChan)
- })
- }
- return mongoImport.tomb.Wait()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ // only set the first insertion error and cause sibling goroutines to terminate immediately
+ if err := mongoImport.runInsertionWorker(readDocChan); err != nil && retErr == nil {
+ retErr = err
+ mongoImport.Kill(err)
+ }
+ }()
+ }
+ wg.Wait()
+ return
}
// configureSession takes in a session and modifies it with properly configured
@@ -432,7 +434,7 @@ readLoop:
}
numMessageBytes += len(documentBytes)
documents = append(documents, bson.Raw{3, documentBytes})
- case <-mongoImport.tomb.Dying():
+ case <-mongoImport.Dying():
return nil
}
}
diff --git a/mongoimport/testdata/test_fields_invalid.txt b/mongoimport/testdata/test_fields_invalid.txt
new file mode 100644
index 00000000000..90505050d51
--- /dev/null
+++ b/mongoimport/testdata/test_fields_invalid.txt
@@ -0,0 +1,3 @@
+a
+$
+b
diff --git a/mongoimport/testdata/test_fields_valid.txt b/mongoimport/testdata/test_fields_valid.txt
new file mode 100644
index 00000000000..de980441c3a
--- /dev/null
+++ b/mongoimport/testdata/test_fields_valid.txt
@@ -0,0 +1,3 @@
+a
+b
+c