diff options
author | Wisdom Omuya <deafgoat@gmail.com> | 2014-11-04 10:23:07 -0500 |
---|---|---|
committer | Wisdom Omuya <deafgoat@gmail.com> | 2014-11-04 10:23:07 -0500 |
commit | 5132aab1afb99b285b541deded2de41bc3997dab (patch) | |
tree | 65a692eb56db345a42693fb007f125c4c44ebc92 | |
parent | c15274fe76119180509b10ce036205015201a70c (diff) | |
parent | 1817796fde68563a4c327d94c198a8a585e4bf7f (diff) | |
download | mongo-5132aab1afb99b285b541deded2de41bc3997dab.tar.gz |
TOOLS-325: fix failing tests
Former-commit-id: d96f50230c53bd9b9ae76c43dc1735c0e6bf5b24
-rw-r--r-- | common/db/command.go | 17 | ||||
-rw-r--r-- | mongoimport/common.go | 218 | ||||
-rw-r--r-- | mongoimport/common_test.go | 105 | ||||
-rw-r--r-- | mongoimport/mongoimport.go | 197 | ||||
-rw-r--r-- | mongoimport/mongoimport_test.go | 74 |
5 files changed, 289 insertions, 322 deletions
diff --git a/common/db/command.go b/common/db/command.go index 2d4c60a0c76..eed3dda2a60 100644 --- a/common/db/command.go +++ b/common/db/command.go @@ -33,7 +33,6 @@ type CommandRunner interface { Remove(DB, Collection string, Query interface{}) error DatabaseNames() ([]string, error) CollectionNames(dbName string) ([]string, error) - SupportsWriteCommands() (bool, error) } func (sp *SessionProvider) Remove(DB, Collection string, Query interface{}) error { @@ -84,6 +83,22 @@ func (sp *SessionProvider) FindDocs(DB, Collection string, Skip, Limit int, Quer return &CursorDocSource{q.Iter(), session}, nil } +func (sp *SessionProvider) IsReplicaSet() (bool, error) { + session, err := sp.GetSession() + if err != nil { + return false, err + } + defer session.Close() + masterDoc := bson.M{} + err = session.Run("isMaster", &masterDoc) + if err != nil { + return false, err + } + _, hasSetName := masterDoc["setName"] + _, hasHosts := masterDoc["hosts"] + return hasSetName || hasHosts, nil +} + func (sp *SessionProvider) SupportsWriteCommands() (bool, error) { session, err := sp.GetSession() if err != nil { diff --git a/mongoimport/common.go b/mongoimport/common.go index d48a7f0067c..f1114910ed7 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -1,6 +1,7 @@ package mongoimport import ( + "errors" "fmt" "github.com/mongodb/mongo-tools/common/bsonutil" "github.com/mongodb/mongo-tools/common/db" @@ -8,12 +9,20 @@ import ( "github.com/mongodb/mongo-tools/common/util" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" + "io" "sort" "strconv" "strings" "sync" ) +var ( + errLostConnection = errors.New("lost connection to server") + errNoReachableServer = errors.New("no reachable servers") + errNsNotFound = errors.New("ns not found") + errNoReplSet = errors.New("norepl") +) + // ConvertibleDoc is an interface implemented by special types which wrap data // gotten for various input readers - i.e. CSV, JSON, TSV. It exposes one // function - Convert() - which converts the special type to a bson.D document @@ -121,6 +130,85 @@ func getUpsertValue(field string, document bson.M) interface{} { return getUpsertValue(field[index+1:], subDoc) } +// filterIngestError accepts a boolean indicating if a non-nil error should be, +// returned as an actual error. +// +// If the error indicates an unreachable server, it returns that immediately. +// +// If the error indicates an invalid write concern was passed, it returns nil +// +// If the error is not nil, it logs the error. If the error is an io.EOF error - +// indicating a lost connection to the server, it sets the error as such. +// +func filterIngestError(stopOnError bool, err error) error { + if err == nil { + return nil + } + if err.Error() == errNoReachableServer.Error() { + return err + } + if err.Error() == errNoReplSet.Error() { + log.Logf(log.Info, "write concern 'majority' run against standalone") + return nil + } + if err == io.EOF { + err = errLostConnection + } + log.Logf(log.Always, "error inserting documents: %v", err) + if stopOnError || err == errLostConnection { + return err + } + return nil +} + +// insertDocuments writes the given documents to the specified collection and +// returns the number of documents inserted and an error. It can handle both +// ordered and unordered writes. If both a write error and a write concern error +// are encountered, the write error is returned. If the target server is not +// capable of handling write commands, it returns an error. +func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) { + // mongod v2.6 requires you to explicitly pass an integer for numeric write + // concerns + var wc interface{} + if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil { + wc = writeConcern + } else { + wc = intWriteConcern + } + + response := &db.WriteCommandResponse{} + err := collection.Database.Run( + bson.D{ + bson.DocElem{"insert", collection.Name}, + bson.DocElem{"ordered", ordered}, + bson.DocElem{"documents", documents}, + bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}}, + }, response) + if err != nil { + return 0, err + } + // if the write concern is 0, n is not present in the response document + // so we return immediately with no errors + if response.NumAffected == nil { + return len(documents), nil + } + if response.Ok == 0 { + // the command itself failed (authentication failed.., syntax error) + return 0, fmt.Errorf("write command failed") + } else if len(response.WriteErrors) != 0 { + // happens if the server couldn't write the data; e.g. because of a + // duplicate key, running out of disk space, etc + for _, writeError := range response.WriteErrors { + log.Logf(log.Always, writeError.Errmsg) + } + return *response.NumAffected, fmt.Errorf("encountered write errors") + } else if response.WriteConcernError.Errmsg != "" { + log.Logf(log.Always, response.WriteConcernError.Errmsg) + return 0, fmt.Errorf("encountered write concern error") + } + return *response.NumAffected, nil +} + // removeBlankFields removes empty/blank fields in csv and tsv func removeBlankFields(document bson.D) bson.D { for index, pair := range document { @@ -157,6 +245,47 @@ func setNestedValue(key string, value interface{}, document *bson.D) { } } +// streamDocuments concurrently processes data gotten from the inputChan +// channel in parallel and then sends over the processed data to the outputChan +// channel - either in sequence or concurrently (depending on the value of +// ordered) - in which the data was received +func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) { + var importWorkers []*ImportWorker + // initialize all our concurrent processing threads + wg := &sync.WaitGroup{} + inChan := inputChan + outChan := outputChan + for i := 0; i < numDecodingWorkers; i++ { + if ordered { + // TODO: experiment with buffered channel size; the buffer size of + // inChan should always be the same as that of outChan + workerBufferSize := 100 + inChan = make(chan ConvertibleDoc, workerBufferSize) + outChan = make(chan bson.D, workerBufferSize) + } + importWorker := &ImportWorker{ + unprocessedDataChan: inChan, + processedDocumentChan: outChan, + } + importWorkers = append(importWorkers, importWorker) + wg.Add(1) + go func() { + defer wg.Done() + if err := importWorker.processDocuments(ordered); err != nil { + errChan <- err + } + }() + } + + // if ordered, we have to coordinate the sequence in which processed + // documents are passed to the main read channel + if ordered { + doSequentialStreaming(importWorkers, inputChan, outputChan) + } + wg.Wait() + close(outputChan) +} + // tokensToBSON reads in slice of records - along with ordered fields names - // and returns a BSON document for the record. func tokensToBSON(fields, tokens []string, numProcessed uint64) (bson.D, error) { @@ -251,92 +380,3 @@ func (importWorker *ImportWorker) processDocuments(ordered bool) error { } return nil } - -// streamDocuments concurrently processes data gotten from the inputChan -// channel in parallel and then sends over the processed data to the outputChan -// channel - either in sequence or concurrently (depending on the value of -// ordered) - in which the data was received -func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) { - var importWorkers []*ImportWorker - // initialize all our concurrent processing threads - wg := &sync.WaitGroup{} - inChan := inputChan - outChan := outputChan - for i := 0; i < numDecodingWorkers; i++ { - if ordered { - // TODO: experiment with buffered channel size; the buffer size of - // inChan should always be the same as that of outChan - workerBufferSize := 100 - inChan = make(chan ConvertibleDoc, workerBufferSize) - outChan = make(chan bson.D, workerBufferSize) - } - importWorker := &ImportWorker{ - unprocessedDataChan: inChan, - processedDocumentChan: outChan, - } - importWorkers = append(importWorkers, importWorker) - wg.Add(1) - go func() { - defer wg.Done() - if err := importWorker.processDocuments(ordered); err != nil { - errChan <- err - } - }() - } - - // if ordered, we have to coordinate the sequence in which processed - // documents are passed to the main read channel - if ordered { - doSequentialStreaming(importWorkers, inputChan, outputChan) - } - wg.Wait() - close(outputChan) -} - -// insertDocuments writes the given documents to the specified collection and -// returns the number of documents inserted and an error. It can handle both -// ordered and unordered writes. If both a write error and a write concern error -// are encountered, the write error is returned. If the target server is not -// capable of handling write commands, it returns an error. -func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) { - // mongod v2.6 requires you to explicitly pass an integer for numeric write - // concerns - var wc interface{} - if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil { - wc = writeConcern - } else { - wc = intWriteConcern - } - - response := &db.WriteCommandResponse{} - err := collection.Database.Run( - bson.D{ - bson.DocElem{"insert", collection.Name}, - bson.DocElem{"ordered", ordered}, - bson.DocElem{"documents", documents}, - bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}}, - }, response) - if err != nil { - return 0, err - } - // if the write concern is 0, n is not present in the response document - // so we return immediately with no errors - if response.NumAffected == nil { - return len(documents), nil - } - if response.Ok == 0 { - // the command itself failed (authentication failed.., syntax error) - return 0, fmt.Errorf("write command failed") - } else if len(response.WriteErrors) != 0 { - // happens if the server couldn't write the data; e.g. because of a - // duplicate key, running out of disk space, etc - for _, writeError := range response.WriteErrors { - log.Logf(log.Always, writeError.Errmsg) - } - return *response.NumAffected, fmt.Errorf("encountered write errors") - } else if response.WriteConcernError.Errmsg != "" { - log.Logf(log.Always, response.WriteConcernError.Errmsg) - return 0, fmt.Errorf("encountered write concern error") - } - return *response.NumAffected, nil -} diff --git a/mongoimport/common_test.go b/mongoimport/common_test.go index 3b4cc11d206..ab262abd785 100644 --- a/mongoimport/common_test.go +++ b/mongoimport/common_test.go @@ -1,7 +1,7 @@ package mongoimport import ( - "github.com/mongodb/mongo-tools/common/db" + "fmt" "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/options" "github.com/mongodb/mongo-tools/common/testutil" @@ -532,88 +532,29 @@ func TestStreamDocuments(t *testing.T) { }) } -// This test will only pass with a version of mongod that supports write commands -func TestInsertDocuments(t *testing.T) { - Convey("Given a set of documents to insert", t, func() { - toolOptions := getBasicToolOptions() - sessionProvider, err := db.InitSessionProvider(*toolOptions) - So(err, ShouldBeNil) - session, err := sessionProvider.GetSession() - So(err, ShouldBeNil) - collection := session.DB(testDB).C(testCollection) - writeConcern := "majority" - - Convey("an error should be returned if there are duplicate _ids", func() { - documents := []bson.D{ - bson.D{bson.DocElem{"_id", 1}}, - bson.D{bson.DocElem{"a", 3}}, - bson.D{bson.DocElem{"_id", 1}}, - bson.D{bson.DocElem{"a", 4}}, - } - numInserted, err := insertDocuments( - convertBSONDToRaw(documents), - collection, - false, - writeConcern, - ) - So(err, ShouldNotBeNil) - So(numInserted, ShouldEqual, 3) - }) - Convey("no error should be returned if the documents are valid", func() { - documents := []bson.D{ - bson.D{bson.DocElem{"a", 1}}, - bson.D{bson.DocElem{"a", 2}}, - bson.D{bson.DocElem{"a", 3}}, - bson.D{bson.DocElem{"a", 4}}, - } - numInserted, err := insertDocuments( - convertBSONDToRaw(documents), - collection, - false, - writeConcern, - ) - So(err, ShouldBeNil) - So(numInserted, ShouldEqual, 4) - }) - Convey("ordered inserts with duplicates should error out", func() { - documents := []bson.D{ - bson.D{bson.DocElem{"_id", 1}}, - bson.D{bson.DocElem{"a", 2}}, - bson.D{bson.DocElem{"_id", 1}}, - bson.D{bson.DocElem{"a", 4}}, - } - numInserted, err := insertDocuments( - convertBSONDToRaw(documents), - collection, - true, - writeConcern, - ) - So(err, ShouldNotBeNil) - So(numInserted, ShouldEqual, 2) - }) - Convey("ordered inserts without duplicates should work without errors", func() { - documents := []bson.D{ - bson.D{bson.DocElem{"a", 1}}, - bson.D{bson.DocElem{"a", 2}}, - bson.D{bson.DocElem{"a", 3}}, - bson.D{bson.DocElem{"a", 4}}, - } - numInserted, err := insertDocuments( - convertBSONDToRaw(documents), - collection, - true, - writeConcern, - ) - So(err, ShouldBeNil) - So(numInserted, ShouldEqual, 4) +func TestFilterIngestError(t *testing.T) { + testutil.VerifyTestType(t, testutil.UNIT_TEST_TYPE) + + Convey("Given a boolean 'stopOnError' and an error...", t, func() { + + Convey("an error should be returned if stopOnError is true the err is not nil", func() { + So(filterIngestError(true, fmt.Errorf("")), ShouldNotBeNil) }) - Reset(func() { - session, err := sessionProvider.GetSession() - if err != nil { - t.Fatalf("error getting session: %v", err) - } - defer session.Close() - session.DB(testDB).C(testCollection).DropCollection() + + Convey("errLostConnection should be returned if stopOnError is true the err is io.EOF", func() { + So(filterIngestError(true, io.EOF), ShouldEqual, errLostConnection) + }) + + Convey("no error should be returned if stopOnError is false the err is not nil", func() { + So(filterIngestError(false, fmt.Errorf("")), ShouldBeNil) + }) + + Convey("no error should be returned if stopOnError is false the err is nil", func() { + So(filterIngestError(false, nil), ShouldBeNil) + }) + + Convey("no error should be returned if stopOnError is true the err is nil", func() { + So(filterIngestError(true, nil), ShouldBeNil) }) }) } diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index c932e3dd08c..b66d0f9ad9b 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -1,7 +1,6 @@ package mongoimport import ( - "errors" "fmt" "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/log" @@ -27,17 +26,10 @@ const ( JSON = "json" ) -var ( - errNsNotFound = errors.New("ns not found") - errNoReplSet = errors.New("norepl") -) - // ingestion constants const ( - maxBSONSize = 16 * (1024 * 1024) - maxMessageSizeBytes = 2 * maxBSONSize - maxWriteCommandSizeBytes = maxBSONSize / 2 - updateAfterNumInserts = 10000 + maxBSONSize = 16 * (1024 * 1024) + maxMessageSizeBytes = 2 * maxBSONSize ) // variables used by the input/ingestion goroutines @@ -69,13 +61,12 @@ type MongoImport struct { // been inserted into the database insertionCount uint64 + // indicates whether the connected server is part of a replica set + isReplicaSet bool + // the tomb is used to synchronize ingestion goroutines and causes // other sibling goroutines to terminate immediately if one errors out tomb *tomb.Tomb - - // indicates if the underlying connected server on the session suports - // write commands - supportsWriteCommands bool } // InputReader is an interface that specifies how an input source should be @@ -371,11 +362,20 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp // IngestDocuments takes a slice of documents and either inserts/upserts them - // based on whether an upsert is requested - into the given collection func (mongoImport *MongoImport) IngestDocuments(readChan chan bson.D) (err error) { + // check if the server is a replica set + mongoImport.isReplicaSet, err = mongoImport.SessionProvider.IsReplicaSet() + if err != nil { + return fmt.Errorf("error checking if server is part of a replicaset: %v", err) + } + log.Logf(log.Info, "is replica set: %v", mongoImport.isReplicaSet) + + numDecodingWorkers := *mongoImport.IngestOptions.NumInsertionWorkers + // initialize the tomb where all goroutines go to die mongoImport.tomb = &tomb.Tomb{} // spawn all the worker threads, each in its own goroutine - for i := 0; i < numInsertionWorkers; i++ { + for i := 0; i < numDecodingWorkers; i++ { mongoImport.tomb.Go(func() error { // Each ingest worker will return an error which may // be nil or not. It will be not nil in any of this cases: @@ -390,65 +390,57 @@ func (mongoImport *MongoImport) IngestDocuments(readChan chan bson.D) (err error return mongoImport.tomb.Wait() } -// ingestDocuments is a helper to IngestDocuments - it reads document off the -// read channel and prepares then for insertion into the database -func (mongoImport *MongoImport) ingestDocs(readChan chan bson.D) (err error) { - ignoreBlanks := mongoImport.IngestOptions.IgnoreBlanks && mongoImport.InputOptions.Type != JSON - documentBytes := make([]byte, 0) - documents := make([]bson.Raw, 0) - numMessageBytes := 0 - - // TODO: mgo driver does not reestablish connections once lost - session, err := mongoImport.SessionProvider.GetSession() - if err != nil { - return fmt.Errorf("error connecting to mongod: %v", err) - } - defer session.Close() - +// configureSession takes in a session and modifies it with properly configured +// settings. It does the following configurations: +// +// 1. Sets the session to not timeout +// 2. Sets 'w' for the write concern +// +func (mongoImport *MongoImport) configureSession(session *mgo.Session) { // sockets to the database will never be forcibly closed session.SetSocketTimeout(0) - // check if the server supports write commands - mongoImport.supportsWriteCommands, err = mongoImport.SessionProvider.SupportsWriteCommands() - if err != nil { - return fmt.Errorf("error checking if server supports write commands: %v", err) - } - log.Logf(log.Info, "using write commands: %v", mongoImport.supportsWriteCommands) sessionSafety := &mgo.Safe{} - intWriteConcern, err := strconv.Atoi(mongoImport.IngestOptions.WriteConcern) if err != nil { - log.Logf(log.DebugLow, "using wmode write concern: %v", mongoImport.IngestOptions.WriteConcern) + log.Logf(log.Info, "using wmode write concern: %v", mongoImport.IngestOptions.WriteConcern) sessionSafety.WMode = mongoImport.IngestOptions.WriteConcern } else { - log.Logf(log.DebugLow, "using w write concern: %v", mongoImport.IngestOptions.WriteConcern) + log.Logf(log.Info, "using w write concern: %v", mongoImport.IngestOptions.WriteConcern) sessionSafety.W = intWriteConcern } // handle fire-and-forget write concern - if sessionSafety.W == 0 { - session.SetSafe(nil) - } else { - session.SetSafe(sessionSafety) - } - - collection := session.DB(mongoImport.ToolOptions.DB). - C(mongoImport.ToolOptions.Collection) - - var document bson.D - var alive bool + if sessionSafety.WMode == "" && sessionSafety.W == 0 { + sessionSafety = nil + } else if !mongoImport.isReplicaSet { + // for standalone mongod, only a write concern of 0/1 is needed + log.Logf(log.Info, "standalone server: setting write concern to 1") + sessionSafety.W = 1 + sessionSafety.WMode = "" + } + session.SetSafe(sessionSafety) +} - // set the max message size bytes based on whether or not we're using - // write commands - maxMessageSize := maxMessageSizeBytes - if mongoImport.supportsWriteCommands { - maxMessageSize = maxWriteCommandSizeBytes +// ingestDocuments is a helper to IngestDocuments - it reads document off the +// read channel and prepares then for insertion into the database +func (mongoImport *MongoImport) ingestDocs(readChan chan bson.D) (err error) { + session, err := mongoImport.SessionProvider.GetSession() + if err != nil { + return fmt.Errorf("error connecting to mongod: %v", err) } + defer session.Close() + mongoImport.configureSession(session) + collection := session.DB(mongoImport.ToolOptions.DB).C(mongoImport.ToolOptions.Collection) + ignoreBlanks := mongoImport.IngestOptions.IgnoreBlanks && mongoImport.InputOptions.Type != JSON + documentBytes := make([]byte, 0) + documents := make([]bson.Raw, 0) + numMessageBytes := 0 readLoop: for { select { - case document, alive = <-readChan: + case document, alive := <-readChan: if !alive { break readLoop } @@ -456,13 +448,12 @@ readLoop: // limit so we self impose a limit by using maxMessageSizeBytes // and send documents over the wire when we hit the batch size // or when we're at/over the maximum message size threshold - if len(documents) == batchSize || numMessageBytes >= maxMessageSize { - err = mongoImport.ingester(documents, collection) - if err != nil { + if len(documents) == batchSize || numMessageBytes >= maxMessageSizeBytes { + if err = mongoImport.ingester(documents, collection); err != nil { return err } // TODO: TOOLS-313; better to use a progress bar here - if mongoImport.insertionCount%updateAfterNumInserts == 0 { + if mongoImport.insertionCount%10000 == 0 { log.Logf(log.Always, "Progress: %v documents inserted...", mongoImport.insertionCount) } documents = documents[:0] @@ -489,13 +480,40 @@ readLoop: return nil } +// TODO: TOOLS-317: add tests/update this to be more efficient +// handleUpsert upserts documents into the database - used if --upsert is passed +// to mongoimport +func (mongoImport *MongoImport) handleUpsert(documents []bson.Raw, collection *mgo.Collection) (numInserted int, err error) { + stopOnError := mongoImport.IngestOptions.StopOnError + upsertFields := strings.Split(mongoImport.IngestOptions.UpsertFields, ",") + for _, rawBsonDocument := range documents { + document := bson.M{} + err = bson.Unmarshal(rawBsonDocument.Data, &document) + if err != nil { + return numInserted, fmt.Errorf("error unmarshaling document: %v", err) + } + selector := constructUpsertDocument(upsertFields, document) + if selector == nil { + err = collection.Insert(document) + } else { + _, err = collection.Upsert(selector, document) + } + if err == nil { + numInserted += 1 + } + if err = filterIngestError(stopOnError, err); err != nil { + return numInserted, err + } + } + return numInserted, nil +} + // ingester performs the actual insertion/updates. If no upsert fields are // present in the document to be inserted, it simply inserts the documents // into the given collection func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.Collection) (err error) { numInserted := 0 stopOnError := mongoImport.IngestOptions.StopOnError - writeConcern := mongoImport.IngestOptions.WriteConcern maintainInsertionOrder := mongoImport.IngestOptions.MaintainInsertionOrder defer func() { @@ -505,48 +523,11 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C }() if mongoImport.IngestOptions.Upsert { - selector := bson.M{} - document := bson.M{} - upsertFields := strings.Split(mongoImport.IngestOptions.UpsertFields, ",") - for _, rawBsonDocument := range documents { - err = bson.Unmarshal(rawBsonDocument.Data, &document) - if err != nil { - return fmt.Errorf("error unmarshaling document: %v", err) - } - selector = constructUpsertDocument(upsertFields, document) - if selector == nil { - err = collection.Insert(document) - } else { - _, err = collection.Upsert(selector, document) - } - // If you pass a write concern of "majority" to a pre-2.6 mongod, it throws - // a "norepl" error if it's not run against a replica set. The extra check - // is here to allow users run mongoimport against a pre-2.6 mongod - if err != nil && err.Error() != errNoReplSet.Error() { - errMsg := err.Error() - if err == io.EOF { - errMsg = "lost connection to server" - } - log.Logf(log.Always, "error updating documents: %v", errMsg) - if stopOnError || err == io.EOF { - return fmt.Errorf(errMsg) - } - } else { - numInserted += 1 - } - document = bson.M{} - } - } else if mongoImport.supportsWriteCommands { - // note that this count may not be entirely accurate if some - // ingester threads insert when another errors out. however, - // any/all errors will be logged if/when they are encountered - numInserted, err = insertDocuments( - documents, - collection, - stopOnError, - writeConcern, - ) + numInserted, err = mongoImport.handleUpsert(documents, collection) + return err } else { + // note that this count may not be entirely accurate if some + // ingester threads insert when another errors out. // without write commands, we can't say for sure how many documents were // inserted when we use bulk inserts so we assume the entire batch // succeeded - even if an error is returned. The result is that we may @@ -565,17 +546,7 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C _, err = bulk.Run() numInserted = len(documents) } - if err != nil && err.Error() != errNoReplSet.Error() { - errMsg := err.Error() - if err == io.EOF { - errMsg = "lost connection to server" - } - log.Logf(log.Always, "error inserting documents: %v", errMsg) - if stopOnError || err == io.EOF { - return fmt.Errorf(errMsg) - } - } - return nil + return filterIngestError(stopOnError, err) } // getInputReader returns an implementation of InputReader which can handle diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index a5d99f8cd76..9dabf055dfd 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -397,8 +397,8 @@ func TestImportDocuments(t *testing.T) { Convey("Given a mongoimport instance with which to import documents, on "+ "calling importDocuments", t, func() { batchSize := 1 - numProcessingThreads := 1 - numIngestionThreads := 1 + NumDecodingWorkers := 1 + NumInsertionWorkers := 1 Convey("no error should be thrown for CSV import on test data and all "+ "CSV data lines should be imported correctly", func() { toolOptions := getBasicToolOptions() @@ -408,9 +408,9 @@ func TestImportDocuments(t *testing.T) { Fields: "a,b,c", } ingestOptions := &options.IngestOptions{ - BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + BatchSize: &batchSize, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) @@ -431,10 +431,10 @@ func TestImportDocuments(t *testing.T) { File: "testdata/test_plain2.json", } ingestOptions := &options.IngestOptions{ - IgnoreBlanks: true, - BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + IgnoreBlanks: true, + BatchSize: &batchSize, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) @@ -460,8 +460,8 @@ func TestImportDocuments(t *testing.T) { ingestOptions := &options.IngestOptions{ IgnoreBlanks: true, BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } sessionProvider, err = db.InitSessionProvider(*toolOptions) @@ -493,8 +493,8 @@ func TestImportDocuments(t *testing.T) { } ingestOptions := &options.IngestOptions{ BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } sessionProvider, err = db.InitSessionProvider(*toolOptions) @@ -526,8 +526,8 @@ func TestImportDocuments(t *testing.T) { ingestOptions := &options.IngestOptions{ Upsert: true, BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } sessionProvider, err = db.InitSessionProvider(*toolOptions) @@ -560,8 +560,8 @@ func TestImportDocuments(t *testing.T) { ingestOptions := &options.IngestOptions{ StopOnError: true, BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } sessionProvider, err = db.InitSessionProvider(*toolOptions) @@ -591,10 +591,9 @@ func TestImportDocuments(t *testing.T) { Fields: "_id,b,c", } ingestOptions := &options.IngestOptions{ - BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, - MaintainInsertionOrder: true, + BatchSize: &batchSize, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) @@ -627,9 +626,10 @@ func TestImportDocuments(t *testing.T) { ingestOptions := &options.IngestOptions{ Drop: true, BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, + WriteConcern: "majority", } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) @@ -658,9 +658,9 @@ func TestImportDocuments(t *testing.T) { HeaderLine: true, } ingestOptions := &options.IngestOptions{ - BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + BatchSize: &batchSize, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) @@ -686,9 +686,9 @@ func TestImportDocuments(t *testing.T) { HeaderLine: true, } ingestOptions := &options.IngestOptions{ - BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + BatchSize: &batchSize, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) @@ -714,8 +714,8 @@ func TestImportDocuments(t *testing.T) { Upsert: true, UpsertFields: "_id", BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } sessionProvider, err = db.InitSessionProvider(*toolOptions) @@ -748,8 +748,8 @@ func TestImportDocuments(t *testing.T) { Upsert: true, UpsertFields: "_id", BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } toolOptions := getBasicToolOptions() @@ -783,8 +783,8 @@ func TestImportDocuments(t *testing.T) { ingestOptions := &options.IngestOptions{ StopOnError: true, BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, WriteConcern: "1", MaintainInsertionOrder: true, } @@ -816,8 +816,8 @@ func TestImportDocuments(t *testing.T) { ingestOptions := &options.IngestOptions{ StopOnError: true, BatchSize: &batchSize, - NumProcessingThreads: &numProcessingThreads, - NumIngestionThreads: &numIngestionThreads, + NumDecodingWorkers: &NumDecodingWorkers, + NumInsertionWorkers: &NumInsertionWorkers, MaintainInsertionOrder: true, } sessionProvider, err = db.InitSessionProvider(*toolOptions) |