summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWisdom Omuya <deafgoat@gmail.com>2014-11-04 10:23:07 -0500
committerWisdom Omuya <deafgoat@gmail.com>2014-11-04 10:23:07 -0500
commit5132aab1afb99b285b541deded2de41bc3997dab (patch)
tree65a692eb56db345a42693fb007f125c4c44ebc92
parentc15274fe76119180509b10ce036205015201a70c (diff)
parent1817796fde68563a4c327d94c198a8a585e4bf7f (diff)
downloadmongo-5132aab1afb99b285b541deded2de41bc3997dab.tar.gz
TOOLS-325: fix failing tests
Former-commit-id: d96f50230c53bd9b9ae76c43dc1735c0e6bf5b24
-rw-r--r--common/db/command.go17
-rw-r--r--mongoimport/common.go218
-rw-r--r--mongoimport/common_test.go105
-rw-r--r--mongoimport/mongoimport.go197
-rw-r--r--mongoimport/mongoimport_test.go74
5 files changed, 289 insertions, 322 deletions
diff --git a/common/db/command.go b/common/db/command.go
index 2d4c60a0c76..eed3dda2a60 100644
--- a/common/db/command.go
+++ b/common/db/command.go
@@ -33,7 +33,6 @@ type CommandRunner interface {
Remove(DB, Collection string, Query interface{}) error
DatabaseNames() ([]string, error)
CollectionNames(dbName string) ([]string, error)
- SupportsWriteCommands() (bool, error)
}
func (sp *SessionProvider) Remove(DB, Collection string, Query interface{}) error {
@@ -84,6 +83,22 @@ func (sp *SessionProvider) FindDocs(DB, Collection string, Skip, Limit int, Quer
return &CursorDocSource{q.Iter(), session}, nil
}
+func (sp *SessionProvider) IsReplicaSet() (bool, error) {
+ session, err := sp.GetSession()
+ if err != nil {
+ return false, err
+ }
+ defer session.Close()
+ masterDoc := bson.M{}
+ err = session.Run("isMaster", &masterDoc)
+ if err != nil {
+ return false, err
+ }
+ _, hasSetName := masterDoc["setName"]
+ _, hasHosts := masterDoc["hosts"]
+ return hasSetName || hasHosts, nil
+}
+
func (sp *SessionProvider) SupportsWriteCommands() (bool, error) {
session, err := sp.GetSession()
if err != nil {
diff --git a/mongoimport/common.go b/mongoimport/common.go
index d48a7f0067c..f1114910ed7 100644
--- a/mongoimport/common.go
+++ b/mongoimport/common.go
@@ -1,6 +1,7 @@
package mongoimport
import (
+ "errors"
"fmt"
"github.com/mongodb/mongo-tools/common/bsonutil"
"github.com/mongodb/mongo-tools/common/db"
@@ -8,12 +9,20 @@ import (
"github.com/mongodb/mongo-tools/common/util"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
+ "io"
"sort"
"strconv"
"strings"
"sync"
)
+var (
+ errLostConnection = errors.New("lost connection to server")
+ errNoReachableServer = errors.New("no reachable servers")
+ errNsNotFound = errors.New("ns not found")
+ errNoReplSet = errors.New("norepl")
+)
+
// ConvertibleDoc is an interface implemented by special types which wrap data
// gotten for various input readers - i.e. CSV, JSON, TSV. It exposes one
// function - Convert() - which converts the special type to a bson.D document
@@ -121,6 +130,85 @@ func getUpsertValue(field string, document bson.M) interface{} {
return getUpsertValue(field[index+1:], subDoc)
}
+// filterIngestError accepts a boolean indicating if a non-nil error should be,
+// returned as an actual error.
+//
+// If the error indicates an unreachable server, it returns that immediately.
+//
+// If the error indicates an invalid write concern was passed, it returns nil
+//
+// If the error is not nil, it logs the error. If the error is an io.EOF error -
+// indicating a lost connection to the server, it sets the error as such.
+//
+func filterIngestError(stopOnError bool, err error) error {
+ if err == nil {
+ return nil
+ }
+ if err.Error() == errNoReachableServer.Error() {
+ return err
+ }
+ if err.Error() == errNoReplSet.Error() {
+ log.Logf(log.Info, "write concern 'majority' run against standalone")
+ return nil
+ }
+ if err == io.EOF {
+ err = errLostConnection
+ }
+ log.Logf(log.Always, "error inserting documents: %v", err)
+ if stopOnError || err == errLostConnection {
+ return err
+ }
+ return nil
+}
+
+// insertDocuments writes the given documents to the specified collection and
+// returns the number of documents inserted and an error. It can handle both
+// ordered and unordered writes. If both a write error and a write concern error
+// are encountered, the write error is returned. If the target server is not
+// capable of handling write commands, it returns an error.
+func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) {
+ // mongod v2.6 requires you to explicitly pass an integer for numeric write
+ // concerns
+ var wc interface{}
+ if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil {
+ wc = writeConcern
+ } else {
+ wc = intWriteConcern
+ }
+
+ response := &db.WriteCommandResponse{}
+ err := collection.Database.Run(
+ bson.D{
+ bson.DocElem{"insert", collection.Name},
+ bson.DocElem{"ordered", ordered},
+ bson.DocElem{"documents", documents},
+ bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}},
+ }, response)
+ if err != nil {
+ return 0, err
+ }
+ // if the write concern is 0, n is not present in the response document
+ // so we return immediately with no errors
+ if response.NumAffected == nil {
+ return len(documents), nil
+ }
+ if response.Ok == 0 {
+ // the command itself failed (authentication failed.., syntax error)
+ return 0, fmt.Errorf("write command failed")
+ } else if len(response.WriteErrors) != 0 {
+ // happens if the server couldn't write the data; e.g. because of a
+ // duplicate key, running out of disk space, etc
+ for _, writeError := range response.WriteErrors {
+ log.Logf(log.Always, writeError.Errmsg)
+ }
+ return *response.NumAffected, fmt.Errorf("encountered write errors")
+ } else if response.WriteConcernError.Errmsg != "" {
+ log.Logf(log.Always, response.WriteConcernError.Errmsg)
+ return 0, fmt.Errorf("encountered write concern error")
+ }
+ return *response.NumAffected, nil
+}
+
// removeBlankFields removes empty/blank fields in csv and tsv
func removeBlankFields(document bson.D) bson.D {
for index, pair := range document {
@@ -157,6 +245,47 @@ func setNestedValue(key string, value interface{}, document *bson.D) {
}
}
+// streamDocuments concurrently processes data gotten from the inputChan
+// channel in parallel and then sends over the processed data to the outputChan
+// channel - either in sequence or concurrently (depending on the value of
+// ordered) - in which the data was received
+func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) {
+ var importWorkers []*ImportWorker
+ // initialize all our concurrent processing threads
+ wg := &sync.WaitGroup{}
+ inChan := inputChan
+ outChan := outputChan
+ for i := 0; i < numDecodingWorkers; i++ {
+ if ordered {
+ // TODO: experiment with buffered channel size; the buffer size of
+ // inChan should always be the same as that of outChan
+ workerBufferSize := 100
+ inChan = make(chan ConvertibleDoc, workerBufferSize)
+ outChan = make(chan bson.D, workerBufferSize)
+ }
+ importWorker := &ImportWorker{
+ unprocessedDataChan: inChan,
+ processedDocumentChan: outChan,
+ }
+ importWorkers = append(importWorkers, importWorker)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := importWorker.processDocuments(ordered); err != nil {
+ errChan <- err
+ }
+ }()
+ }
+
+ // if ordered, we have to coordinate the sequence in which processed
+ // documents are passed to the main read channel
+ if ordered {
+ doSequentialStreaming(importWorkers, inputChan, outputChan)
+ }
+ wg.Wait()
+ close(outputChan)
+}
+
// tokensToBSON reads in slice of records - along with ordered fields names -
// and returns a BSON document for the record.
func tokensToBSON(fields, tokens []string, numProcessed uint64) (bson.D, error) {
@@ -251,92 +380,3 @@ func (importWorker *ImportWorker) processDocuments(ordered bool) error {
}
return nil
}
-
-// streamDocuments concurrently processes data gotten from the inputChan
-// channel in parallel and then sends over the processed data to the outputChan
-// channel - either in sequence or concurrently (depending on the value of
-// ordered) - in which the data was received
-func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) {
- var importWorkers []*ImportWorker
- // initialize all our concurrent processing threads
- wg := &sync.WaitGroup{}
- inChan := inputChan
- outChan := outputChan
- for i := 0; i < numDecodingWorkers; i++ {
- if ordered {
- // TODO: experiment with buffered channel size; the buffer size of
- // inChan should always be the same as that of outChan
- workerBufferSize := 100
- inChan = make(chan ConvertibleDoc, workerBufferSize)
- outChan = make(chan bson.D, workerBufferSize)
- }
- importWorker := &ImportWorker{
- unprocessedDataChan: inChan,
- processedDocumentChan: outChan,
- }
- importWorkers = append(importWorkers, importWorker)
- wg.Add(1)
- go func() {
- defer wg.Done()
- if err := importWorker.processDocuments(ordered); err != nil {
- errChan <- err
- }
- }()
- }
-
- // if ordered, we have to coordinate the sequence in which processed
- // documents are passed to the main read channel
- if ordered {
- doSequentialStreaming(importWorkers, inputChan, outputChan)
- }
- wg.Wait()
- close(outputChan)
-}
-
-// insertDocuments writes the given documents to the specified collection and
-// returns the number of documents inserted and an error. It can handle both
-// ordered and unordered writes. If both a write error and a write concern error
-// are encountered, the write error is returned. If the target server is not
-// capable of handling write commands, it returns an error.
-func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) {
- // mongod v2.6 requires you to explicitly pass an integer for numeric write
- // concerns
- var wc interface{}
- if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil {
- wc = writeConcern
- } else {
- wc = intWriteConcern
- }
-
- response := &db.WriteCommandResponse{}
- err := collection.Database.Run(
- bson.D{
- bson.DocElem{"insert", collection.Name},
- bson.DocElem{"ordered", ordered},
- bson.DocElem{"documents", documents},
- bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}},
- }, response)
- if err != nil {
- return 0, err
- }
- // if the write concern is 0, n is not present in the response document
- // so we return immediately with no errors
- if response.NumAffected == nil {
- return len(documents), nil
- }
- if response.Ok == 0 {
- // the command itself failed (authentication failed.., syntax error)
- return 0, fmt.Errorf("write command failed")
- } else if len(response.WriteErrors) != 0 {
- // happens if the server couldn't write the data; e.g. because of a
- // duplicate key, running out of disk space, etc
- for _, writeError := range response.WriteErrors {
- log.Logf(log.Always, writeError.Errmsg)
- }
- return *response.NumAffected, fmt.Errorf("encountered write errors")
- } else if response.WriteConcernError.Errmsg != "" {
- log.Logf(log.Always, response.WriteConcernError.Errmsg)
- return 0, fmt.Errorf("encountered write concern error")
- }
- return *response.NumAffected, nil
-}
diff --git a/mongoimport/common_test.go b/mongoimport/common_test.go
index 3b4cc11d206..ab262abd785 100644
--- a/mongoimport/common_test.go
+++ b/mongoimport/common_test.go
@@ -1,7 +1,7 @@
package mongoimport
import (
- "github.com/mongodb/mongo-tools/common/db"
+ "fmt"
"github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/options"
"github.com/mongodb/mongo-tools/common/testutil"
@@ -532,88 +532,29 @@ func TestStreamDocuments(t *testing.T) {
})
}
-// This test will only pass with a version of mongod that supports write commands
-func TestInsertDocuments(t *testing.T) {
- Convey("Given a set of documents to insert", t, func() {
- toolOptions := getBasicToolOptions()
- sessionProvider, err := db.InitSessionProvider(*toolOptions)
- So(err, ShouldBeNil)
- session, err := sessionProvider.GetSession()
- So(err, ShouldBeNil)
- collection := session.DB(testDB).C(testCollection)
- writeConcern := "majority"
-
- Convey("an error should be returned if there are duplicate _ids", func() {
- documents := []bson.D{
- bson.D{bson.DocElem{"_id", 1}},
- bson.D{bson.DocElem{"a", 3}},
- bson.D{bson.DocElem{"_id", 1}},
- bson.D{bson.DocElem{"a", 4}},
- }
- numInserted, err := insertDocuments(
- convertBSONDToRaw(documents),
- collection,
- false,
- writeConcern,
- )
- So(err, ShouldNotBeNil)
- So(numInserted, ShouldEqual, 3)
- })
- Convey("no error should be returned if the documents are valid", func() {
- documents := []bson.D{
- bson.D{bson.DocElem{"a", 1}},
- bson.D{bson.DocElem{"a", 2}},
- bson.D{bson.DocElem{"a", 3}},
- bson.D{bson.DocElem{"a", 4}},
- }
- numInserted, err := insertDocuments(
- convertBSONDToRaw(documents),
- collection,
- false,
- writeConcern,
- )
- So(err, ShouldBeNil)
- So(numInserted, ShouldEqual, 4)
- })
- Convey("ordered inserts with duplicates should error out", func() {
- documents := []bson.D{
- bson.D{bson.DocElem{"_id", 1}},
- bson.D{bson.DocElem{"a", 2}},
- bson.D{bson.DocElem{"_id", 1}},
- bson.D{bson.DocElem{"a", 4}},
- }
- numInserted, err := insertDocuments(
- convertBSONDToRaw(documents),
- collection,
- true,
- writeConcern,
- )
- So(err, ShouldNotBeNil)
- So(numInserted, ShouldEqual, 2)
- })
- Convey("ordered inserts without duplicates should work without errors", func() {
- documents := []bson.D{
- bson.D{bson.DocElem{"a", 1}},
- bson.D{bson.DocElem{"a", 2}},
- bson.D{bson.DocElem{"a", 3}},
- bson.D{bson.DocElem{"a", 4}},
- }
- numInserted, err := insertDocuments(
- convertBSONDToRaw(documents),
- collection,
- true,
- writeConcern,
- )
- So(err, ShouldBeNil)
- So(numInserted, ShouldEqual, 4)
+func TestFilterIngestError(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE)
+
+ Convey("Given a boolean 'stopOnError' and an error...", t, func() {
+
+ Convey("an error should be returned if stopOnError is true the err is not nil", func() {
+ So(filterIngestError(true, fmt.Errorf("")), ShouldNotBeNil)
})
- Reset(func() {
- session, err := sessionProvider.GetSession()
- if err != nil {
- t.Fatalf("error getting session: %v", err)
- }
- defer session.Close()
- session.DB(testDB).C(testCollection).DropCollection()
+
+ Convey("errLostConnection should be returned if stopOnError is true the err is io.EOF", func() {
+ So(filterIngestError(true, io.EOF), ShouldEqual, errLostConnection)
+ })
+
+ Convey("no error should be returned if stopOnError is false the err is not nil", func() {
+ So(filterIngestError(false, fmt.Errorf("")), ShouldBeNil)
+ })
+
+ Convey("no error should be returned if stopOnError is false the err is nil", func() {
+ So(filterIngestError(false, nil), ShouldBeNil)
+ })
+
+ Convey("no error should be returned if stopOnError is true the err is nil", func() {
+ So(filterIngestError(true, nil), ShouldBeNil)
})
})
}
diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go
index c932e3dd08c..b66d0f9ad9b 100644
--- a/mongoimport/mongoimport.go
+++ b/mongoimport/mongoimport.go
@@ -1,7 +1,6 @@
package mongoimport
import (
- "errors"
"fmt"
"github.com/mongodb/mongo-tools/common/db"
"github.com/mongodb/mongo-tools/common/log"
@@ -27,17 +26,10 @@ const (
JSON = "json"
)
-var (
- errNsNotFound = errors.New("ns not found")
- errNoReplSet = errors.New("norepl")
-)
-
// ingestion constants
const (
- maxBSONSize = 16 * (1024 * 1024)
- maxMessageSizeBytes = 2 * maxBSONSize
- maxWriteCommandSizeBytes = maxBSONSize / 2
- updateAfterNumInserts = 10000
+ maxBSONSize = 16 * (1024 * 1024)
+ maxMessageSizeBytes = 2 * maxBSONSize
)
// variables used by the input/ingestion goroutines
@@ -69,13 +61,12 @@ type MongoImport struct {
// been inserted into the database
insertionCount uint64
+ // indicates whether the connected server is part of a replica set
+ isReplicaSet bool
+
// the tomb is used to synchronize ingestion goroutines and causes
// other sibling goroutines to terminate immediately if one errors out
tomb *tomb.Tomb
-
- // indicates if the underlying connected server on the session suports
- // write commands
- supportsWriteCommands bool
}
// InputReader is an interface that specifies how an input source should be
@@ -371,11 +362,20 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp
// IngestDocuments takes a slice of documents and either inserts/upserts them -
// based on whether an upsert is requested - into the given collection
func (mongoImport *MongoImport) IngestDocuments(readChan chan bson.D) (err error) {
+ // check if the server is a replica set
+ mongoImport.isReplicaSet, err = mongoImport.SessionProvider.IsReplicaSet()
+ if err != nil {
+ return fmt.Errorf("error checking if server is part of a replicaset: %v", err)
+ }
+ log.Logf(log.Info, "is replica set: %v", mongoImport.isReplicaSet)
+
+ numDecodingWorkers := *mongoImport.IngestOptions.NumInsertionWorkers
+
// initialize the tomb where all goroutines go to die
mongoImport.tomb = &tomb.Tomb{}
// spawn all the worker threads, each in its own goroutine
- for i := 0; i < numInsertionWorkers; i++ {
+ for i := 0; i < numDecodingWorkers; i++ {
mongoImport.tomb.Go(func() error {
// Each ingest worker will return an error which may
// be nil or not. It will be not nil in any of this cases:
@@ -390,65 +390,57 @@ func (mongoImport *MongoImport) IngestDocuments(readChan chan bson.D) (err error
return mongoImport.tomb.Wait()
}
-// ingestDocuments is a helper to IngestDocuments - it reads document off the
-// read channel and prepares then for insertion into the database
-func (mongoImport *MongoImport) ingestDocs(readChan chan bson.D) (err error) {
- ignoreBlanks := mongoImport.IngestOptions.IgnoreBlanks && mongoImport.InputOptions.Type != JSON
- documentBytes := make([]byte, 0)
- documents := make([]bson.Raw, 0)
- numMessageBytes := 0
-
- // TODO: mgo driver does not reestablish connections once lost
- session, err := mongoImport.SessionProvider.GetSession()
- if err != nil {
- return fmt.Errorf("error connecting to mongod: %v", err)
- }
- defer session.Close()
-
+// configureSession takes in a session and modifies it with properly configured
+// settings. It does the following configurations:
+//
+// 1. Sets the session to not timeout
+// 2. Sets 'w' for the write concern
+//
+func (mongoImport *MongoImport) configureSession(session *mgo.Session) {
// sockets to the database will never be forcibly closed
session.SetSocketTimeout(0)
- // check if the server supports write commands
- mongoImport.supportsWriteCommands, err = mongoImport.SessionProvider.SupportsWriteCommands()
- if err != nil {
- return fmt.Errorf("error checking if server supports write commands: %v", err)
- }
- log.Logf(log.Info, "using write commands: %v", mongoImport.supportsWriteCommands)
sessionSafety := &mgo.Safe{}
-
intWriteConcern, err := strconv.Atoi(mongoImport.IngestOptions.WriteConcern)
if err != nil {
- log.Logf(log.DebugLow, "using wmode write concern: %v", mongoImport.IngestOptions.WriteConcern)
+ log.Logf(log.Info, "using wmode write concern: %v", mongoImport.IngestOptions.WriteConcern)
sessionSafety.WMode = mongoImport.IngestOptions.WriteConcern
} else {
- log.Logf(log.DebugLow, "using w write concern: %v", mongoImport.IngestOptions.WriteConcern)
+ log.Logf(log.Info, "using w write concern: %v", mongoImport.IngestOptions.WriteConcern)
sessionSafety.W = intWriteConcern
}
// handle fire-and-forget write concern
- if sessionSafety.W == 0 {
- session.SetSafe(nil)
- } else {
- session.SetSafe(sessionSafety)
- }
-
- collection := session.DB(mongoImport.ToolOptions.DB).
- C(mongoImport.ToolOptions.Collection)
-
- var document bson.D
- var alive bool
+ if sessionSafety.WMode == "" && sessionSafety.W == 0 {
+ sessionSafety = nil
+ } else if !mongoImport.isReplicaSet {
+ // for standalone mongod, only a write concern of 0/1 is needed
+ log.Logf(log.Info, "standalone server: setting write concern to 1")
+ sessionSafety.W = 1
+ sessionSafety.WMode = ""
+ }
+ session.SetSafe(sessionSafety)
+}
- // set the max message size bytes based on whether or not we're using
- // write commands
- maxMessageSize := maxMessageSizeBytes
- if mongoImport.supportsWriteCommands {
- maxMessageSize = maxWriteCommandSizeBytes
+// ingestDocuments is a helper to IngestDocuments - it reads document off the
+// read channel and prepares then for insertion into the database
+func (mongoImport *MongoImport) ingestDocs(readChan chan bson.D) (err error) {
+ session, err := mongoImport.SessionProvider.GetSession()
+ if err != nil {
+ return fmt.Errorf("error connecting to mongod: %v", err)
}
+ defer session.Close()
+ mongoImport.configureSession(session)
+ collection := session.DB(mongoImport.ToolOptions.DB).C(mongoImport.ToolOptions.Collection)
+ ignoreBlanks := mongoImport.IngestOptions.IgnoreBlanks && mongoImport.InputOptions.Type != JSON
+ documentBytes := make([]byte, 0)
+ documents := make([]bson.Raw, 0)
+ numMessageBytes := 0
readLoop:
for {
select {
- case document, alive = <-readChan:
+ case document, alive := <-readChan:
if !alive {
break readLoop
}
@@ -456,13 +448,12 @@ readLoop:
// limit so we self impose a limit by using maxMessageSizeBytes
// and send documents over the wire when we hit the batch size
// or when we're at/over the maximum message size threshold
- if len(documents) == batchSize || numMessageBytes >= maxMessageSize {
- err = mongoImport.ingester(documents, collection)
- if err != nil {
+ if len(documents) == batchSize || numMessageBytes >= maxMessageSizeBytes {
+ if err = mongoImport.ingester(documents, collection); err != nil {
return err
}
// TODO: TOOLS-313; better to use a progress bar here
- if mongoImport.insertionCount%updateAfterNumInserts == 0 {
+ if mongoImport.insertionCount%10000 == 0 {
log.Logf(log.Always, "Progress: %v documents inserted...", mongoImport.insertionCount)
}
documents = documents[:0]
@@ -489,13 +480,40 @@ readLoop:
return nil
}
+// TODO: TOOLS-317: add tests/update this to be more efficient
+// handleUpsert upserts documents into the database - used if --upsert is passed
+// to mongoimport
+func (mongoImport *MongoImport) handleUpsert(documents []bson.Raw, collection *mgo.Collection) (numInserted int, err error) {
+ stopOnError := mongoImport.IngestOptions.StopOnError
+ upsertFields := strings.Split(mongoImport.IngestOptions.UpsertFields, ",")
+ for _, rawBsonDocument := range documents {
+ document := bson.M{}
+ err = bson.Unmarshal(rawBsonDocument.Data, &document)
+ if err != nil {
+ return numInserted, fmt.Errorf("error unmarshaling document: %v", err)
+ }
+ selector := constructUpsertDocument(upsertFields, document)
+ if selector == nil {
+ err = collection.Insert(document)
+ } else {
+ _, err = collection.Upsert(selector, document)
+ }
+ if err == nil {
+ numInserted += 1
+ }
+ if err = filterIngestError(stopOnError, err); err != nil {
+ return numInserted, err
+ }
+ }
+ return numInserted, nil
+}
+
// ingester performs the actual insertion/updates. If no upsert fields are
// present in the document to be inserted, it simply inserts the documents
// into the given collection
func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.Collection) (err error) {
numInserted := 0
stopOnError := mongoImport.IngestOptions.StopOnError
- writeConcern := mongoImport.IngestOptions.WriteConcern
maintainInsertionOrder := mongoImport.IngestOptions.MaintainInsertionOrder
defer func() {
@@ -505,48 +523,11 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C
}()
if mongoImport.IngestOptions.Upsert {
- selector := bson.M{}
- document := bson.M{}
- upsertFields := strings.Split(mongoImport.IngestOptions.UpsertFields, ",")
- for _, rawBsonDocument := range documents {
- err = bson.Unmarshal(rawBsonDocument.Data, &document)
- if err != nil {
- return fmt.Errorf("error unmarshaling document: %v", err)
- }
- selector = constructUpsertDocument(upsertFields, document)
- if selector == nil {
- err = collection.Insert(document)
- } else {
- _, err = collection.Upsert(selector, document)
- }
- // If you pass a write concern of "majority" to a pre-2.6 mongod, it throws
- // a "norepl" error if it's not run against a replica set. The extra check
- // is here to allow users run mongoimport against a pre-2.6 mongod
- if err != nil && err.Error() != errNoReplSet.Error() {
- errMsg := err.Error()
- if err == io.EOF {
- errMsg = "lost connection to server"
- }
- log.Logf(log.Always, "error updating documents: %v", errMsg)
- if stopOnError || err == io.EOF {
- return fmt.Errorf(errMsg)
- }
- } else {
- numInserted += 1
- }
- document = bson.M{}
- }
- } else if mongoImport.supportsWriteCommands {
- // note that this count may not be entirely accurate if some
- // ingester threads insert when another errors out. however,
- // any/all errors will be logged if/when they are encountered
- numInserted, err = insertDocuments(
- documents,
- collection,
- stopOnError,
- writeConcern,
- )
+ numInserted, err = mongoImport.handleUpsert(documents, collection)
+ return err
} else {
+ // note that this count may not be entirely accurate if some
+ // ingester threads insert when another errors out.
// without write commands, we can't say for sure how many documents were
// inserted when we use bulk inserts so we assume the entire batch
// succeeded - even if an error is returned. The result is that we may
@@ -565,17 +546,7 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C
_, err = bulk.Run()
numInserted = len(documents)
}
- if err != nil && err.Error() != errNoReplSet.Error() {
- errMsg := err.Error()
- if err == io.EOF {
- errMsg = "lost connection to server"
- }
- log.Logf(log.Always, "error inserting documents: %v", errMsg)
- if stopOnError || err == io.EOF {
- return fmt.Errorf(errMsg)
- }
- }
- return nil
+ return filterIngestError(stopOnError, err)
}
// getInputReader returns an implementation of InputReader which can handle
diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go
index a5d99f8cd76..9dabf055dfd 100644
--- a/mongoimport/mongoimport_test.go
+++ b/mongoimport/mongoimport_test.go
@@ -397,8 +397,8 @@ func TestImportDocuments(t *testing.T) {
Convey("Given a mongoimport instance with which to import documents, on "+
"calling importDocuments", t, func() {
batchSize := 1
- numProcessingThreads := 1
- numIngestionThreads := 1
+ NumDecodingWorkers := 1
+ NumInsertionWorkers := 1
Convey("no error should be thrown for CSV import on test data and all "+
"CSV data lines should be imported correctly", func() {
toolOptions := getBasicToolOptions()
@@ -408,9 +408,9 @@ func TestImportDocuments(t *testing.T) {
Fields: "a,b,c",
}
ingestOptions := &options.IngestOptions{
- BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ BatchSize: &batchSize,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)
@@ -431,10 +431,10 @@ func TestImportDocuments(t *testing.T) {
File: "testdata/test_plain2.json",
}
ingestOptions := &options.IngestOptions{
- IgnoreBlanks: true,
- BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ IgnoreBlanks: true,
+ BatchSize: &batchSize,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)
@@ -460,8 +460,8 @@ func TestImportDocuments(t *testing.T) {
ingestOptions := &options.IngestOptions{
IgnoreBlanks: true,
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
@@ -493,8 +493,8 @@ func TestImportDocuments(t *testing.T) {
}
ingestOptions := &options.IngestOptions{
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
@@ -526,8 +526,8 @@ func TestImportDocuments(t *testing.T) {
ingestOptions := &options.IngestOptions{
Upsert: true,
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
@@ -560,8 +560,8 @@ func TestImportDocuments(t *testing.T) {
ingestOptions := &options.IngestOptions{
StopOnError: true,
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
@@ -591,10 +591,9 @@ func TestImportDocuments(t *testing.T) {
Fields: "_id,b,c",
}
ingestOptions := &options.IngestOptions{
- BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
- MaintainInsertionOrder: true,
+ BatchSize: &batchSize,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)
@@ -627,9 +626,10 @@ func TestImportDocuments(t *testing.T) {
ingestOptions := &options.IngestOptions{
Drop: true,
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
+ WriteConcern: "majority",
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)
@@ -658,9 +658,9 @@ func TestImportDocuments(t *testing.T) {
HeaderLine: true,
}
ingestOptions := &options.IngestOptions{
- BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ BatchSize: &batchSize,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)
@@ -686,9 +686,9 @@ func TestImportDocuments(t *testing.T) {
HeaderLine: true,
}
ingestOptions := &options.IngestOptions{
- BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ BatchSize: &batchSize,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)
@@ -714,8 +714,8 @@ func TestImportDocuments(t *testing.T) {
Upsert: true,
UpsertFields: "_id",
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
@@ -748,8 +748,8 @@ func TestImportDocuments(t *testing.T) {
Upsert: true,
UpsertFields: "_id",
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
toolOptions := getBasicToolOptions()
@@ -783,8 +783,8 @@ func TestImportDocuments(t *testing.T) {
ingestOptions := &options.IngestOptions{
StopOnError: true,
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
WriteConcern: "1",
MaintainInsertionOrder: true,
}
@@ -816,8 +816,8 @@ func TestImportDocuments(t *testing.T) {
ingestOptions := &options.IngestOptions{
StopOnError: true,
BatchSize: &batchSize,
- NumProcessingThreads: &numProcessingThreads,
- NumIngestionThreads: &numIngestionThreads,
+ NumDecodingWorkers: &NumDecodingWorkers,
+ NumInsertionWorkers: &NumInsertionWorkers,
MaintainInsertionOrder: true,
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)