diff options
-rw-r--r-- | mongoimport/common.go | 57 | ||||
-rw-r--r-- | mongoimport/mongoimport.go | 22 |
2 files changed, 14 insertions, 65 deletions
diff --git a/mongoimport/common.go b/mongoimport/common.go index f1114910ed7..8279cefbc9b 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -4,10 +4,8 @@ import ( "errors" "fmt" "github.com/mongodb/mongo-tools/common/bsonutil" - "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/log" "github.com/mongodb/mongo-tools/common/util" - "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "io" "sort" @@ -20,7 +18,6 @@ var ( errLostConnection = errors.New("lost connection to server") errNoReachableServer = errors.New("no reachable servers") errNsNotFound = errors.New("ns not found") - errNoReplSet = errors.New("norepl") ) // ConvertibleDoc is an interface implemented by special types which wrap data @@ -147,11 +144,7 @@ func filterIngestError(stopOnError bool, err error) error { if err.Error() == errNoReachableServer.Error() { return err } - if err.Error() == errNoReplSet.Error() { - log.Logf(log.Info, "write concern 'majority' run against standalone") - return nil - } - if err == io.EOF { + if err.Error() == io.EOF.Error() { err = errLostConnection } log.Logf(log.Always, "error inserting documents: %v", err) @@ -161,54 +154,6 @@ func filterIngestError(stopOnError bool, err error) error { return nil } -// insertDocuments writes the given documents to the specified collection and -// returns the number of documents inserted and an error. It can handle both -// ordered and unordered writes. If both a write error and a write concern error -// are encountered, the write error is returned. If the target server is not -// capable of handling write commands, it returns an error. -func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) { - // mongod v2.6 requires you to explicitly pass an integer for numeric write - // concerns - var wc interface{} - if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil { - wc = writeConcern - } else { - wc = intWriteConcern - } - - response := &db.WriteCommandResponse{} - err := collection.Database.Run( - bson.D{ - bson.DocElem{"insert", collection.Name}, - bson.DocElem{"ordered", ordered}, - bson.DocElem{"documents", documents}, - bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}}, - }, response) - if err != nil { - return 0, err - } - // if the write concern is 0, n is not present in the response document - // so we return immediately with no errors - if response.NumAffected == nil { - return len(documents), nil - } - if response.Ok == 0 { - // the command itself failed (authentication failed.., syntax error) - return 0, fmt.Errorf("write command failed") - } else if len(response.WriteErrors) != 0 { - // happens if the server couldn't write the data; e.g. because of a - // duplicate key, running out of disk space, etc - for _, writeError := range response.WriteErrors { - log.Logf(log.Always, writeError.Errmsg) - } - return *response.NumAffected, fmt.Errorf("encountered write errors") - } else if response.WriteConcernError.Errmsg != "" { - log.Logf(log.Always, response.WriteConcernError.Errmsg) - return 0, fmt.Errorf("encountered write concern error") - } - return *response.NumAffected, nil -} - // removeBlankFields removes empty/blank fields in csv and tsv func removeBlankFields(document bson.D) bson.D { for index, pair := range document { diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 73d698d0dd7..2fd81e53465 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -528,14 +528,6 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C numInserted, err = mongoImport.handleUpsert(documents, collection) return err } else { - // note that this count may not be entirely accurate if some - // ingester workers insert when another errors out. - // without write commands, we can't say for sure how many documents were - // inserted when we use bulk inserts so we assume the entire batch - // succeeded - even if an error is returned. The result is that we may - // report that more documents - than were actually inserted - were - // inserted into the database. This will change as soon as BulkResults - // are supported by the driver bulk := collection.Bulk() for _, document := range documents { bulk.Insert(document) @@ -546,7 +538,19 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C // mgo.Bulk doesn't currently implement write commands so mgo.BulkResult // isn't informative _, err = bulk.Run() - numInserted = len(documents) + + // TOOLS-349: Note that this count may not be entirely accurate if some + // ingester workers insert when another errors out. + // + // Without write commands, we can't say for sure how many documents + // were inserted when we use bulk inserts so we assume the entire batch + // succeeded - even if an error is returned. The result is that we may + // report that more documents - than were actually inserted - were + // inserted into the database. This will change as soon as BulkResults + // are supported by the driver + if err == nil { + numInserted = len(documents) + } } return filterIngestError(stopOnError, err) } |