summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mongoimport/common.go57
-rw-r--r--mongoimport/mongoimport.go22
2 files changed, 14 insertions, 65 deletions
diff --git a/mongoimport/common.go b/mongoimport/common.go
index f1114910ed7..8279cefbc9b 100644
--- a/mongoimport/common.go
+++ b/mongoimport/common.go
@@ -4,10 +4,8 @@ import (
"errors"
"fmt"
"github.com/mongodb/mongo-tools/common/bsonutil"
- "github.com/mongodb/mongo-tools/common/db"
"github.com/mongodb/mongo-tools/common/log"
"github.com/mongodb/mongo-tools/common/util"
- "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"io"
"sort"
@@ -20,7 +18,6 @@ var (
errLostConnection = errors.New("lost connection to server")
errNoReachableServer = errors.New("no reachable servers")
errNsNotFound = errors.New("ns not found")
- errNoReplSet = errors.New("norepl")
)
// ConvertibleDoc is an interface implemented by special types which wrap data
@@ -147,11 +144,7 @@ func filterIngestError(stopOnError bool, err error) error {
if err.Error() == errNoReachableServer.Error() {
return err
}
- if err.Error() == errNoReplSet.Error() {
- log.Logf(log.Info, "write concern 'majority' run against standalone")
- return nil
- }
- if err == io.EOF {
+ if err.Error() == io.EOF.Error() {
err = errLostConnection
}
log.Logf(log.Always, "error inserting documents: %v", err)
@@ -161,54 +154,6 @@ func filterIngestError(stopOnError bool, err error) error {
return nil
}
-// insertDocuments writes the given documents to the specified collection and
-// returns the number of documents inserted and an error. It can handle both
-// ordered and unordered writes. If both a write error and a write concern error
-// are encountered, the write error is returned. If the target server is not
-// capable of handling write commands, it returns an error.
-func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) {
- // mongod v2.6 requires you to explicitly pass an integer for numeric write
- // concerns
- var wc interface{}
- if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil {
- wc = writeConcern
- } else {
- wc = intWriteConcern
- }
-
- response := &db.WriteCommandResponse{}
- err := collection.Database.Run(
- bson.D{
- bson.DocElem{"insert", collection.Name},
- bson.DocElem{"ordered", ordered},
- bson.DocElem{"documents", documents},
- bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}},
- }, response)
- if err != nil {
- return 0, err
- }
- // if the write concern is 0, n is not present in the response document
- // so we return immediately with no errors
- if response.NumAffected == nil {
- return len(documents), nil
- }
- if response.Ok == 0 {
- // the command itself failed (authentication failed.., syntax error)
- return 0, fmt.Errorf("write command failed")
- } else if len(response.WriteErrors) != 0 {
- // happens if the server couldn't write the data; e.g. because of a
- // duplicate key, running out of disk space, etc
- for _, writeError := range response.WriteErrors {
- log.Logf(log.Always, writeError.Errmsg)
- }
- return *response.NumAffected, fmt.Errorf("encountered write errors")
- } else if response.WriteConcernError.Errmsg != "" {
- log.Logf(log.Always, response.WriteConcernError.Errmsg)
- return 0, fmt.Errorf("encountered write concern error")
- }
- return *response.NumAffected, nil
-}
-
// removeBlankFields removes empty/blank fields in csv and tsv
func removeBlankFields(document bson.D) bson.D {
for index, pair := range document {
diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go
index 73d698d0dd7..2fd81e53465 100644
--- a/mongoimport/mongoimport.go
+++ b/mongoimport/mongoimport.go
@@ -528,14 +528,6 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C
numInserted, err = mongoImport.handleUpsert(documents, collection)
return err
} else {
- // note that this count may not be entirely accurate if some
- // ingester workers insert when another errors out.
- // without write commands, we can't say for sure how many documents were
- // inserted when we use bulk inserts so we assume the entire batch
- // succeeded - even if an error is returned. The result is that we may
- // report that more documents - than were actually inserted - were
- // inserted into the database. This will change as soon as BulkResults
- // are supported by the driver
bulk := collection.Bulk()
for _, document := range documents {
bulk.Insert(document)
@@ -546,7 +538,19 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C
// mgo.Bulk doesn't currently implement write commands so mgo.BulkResult
// isn't informative
_, err = bulk.Run()
- numInserted = len(documents)
+
+ // TOOLS-349: Note that this count may not be entirely accurate if some
+ // ingester workers insert when another errors out.
+ //
+ // Without write commands, we can't say for sure how many documents
+ // were inserted when we use bulk inserts so we assume the entire batch
+ // succeeded - even if an error is returned. The result is that we may
+ // report that more documents - than were actually inserted - were
+ // inserted into the database. This will change as soon as BulkResults
+ // are supported by the driver
+ if err == nil {
+ numInserted = len(documents)
+ }
}
return filterIngestError(stopOnError, err)
}