summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWisdom Omuya <deafgoat@gmail.com>2014-10-31 16:23:47 -0400
committerWisdom Omuya <deafgoat@gmail.com>2014-10-31 17:10:13 -0400
commita9c5b69b6b8451e3539a7dee57cfa79e3606122e (patch)
tree3e9181ab7adf2c961840a14214d2264f8a84640b
parent156c0f141ecfa29ee674d6a713544f840f8acdaa (diff)
downloadmongo-a9c5b69b6b8451e3539a7dee57cfa79e3606122e.tar.gz
TOOLS-318: remove write commands
Former-commit-id: 5b45e96318e443ff8fdcfe45e032887668ad9dab
-rw-r--r--common/db/command.go2
-rw-r--r--mongoimport/common.go205
-rw-r--r--mongoimport/mongoimport.go41
-rw-r--r--mongoimport/mongoimport_test.go1
4 files changed, 116 insertions, 133 deletions
diff --git a/common/db/command.go b/common/db/command.go
index 5e5d1c745e6..4b8afec3fca 100644
--- a/common/db/command.go
+++ b/common/db/command.go
@@ -99,7 +99,7 @@ func (sp *SessionProvider) IsReplicaSet() (bool, error) {
if hasSetName || hasHosts {
return true, nil
}
- return false, err
+ return false, nil
}
func (sp *SessionProvider) SupportsWriteCommands() (bool, error) {
diff --git a/mongoimport/common.go b/mongoimport/common.go
index 8c8f5237049..0a34280dfa4 100644
--- a/mongoimport/common.go
+++ b/mongoimport/common.go
@@ -17,7 +17,10 @@ import (
)
var (
- errLostConnection = errors.New("lost connection to server")
+ errLostConnection = errors.New("lost connection to server")
+ errNoReachableServer = errors.New("no reachable servers")
+ errNsNotFound = errors.New("ns not found")
+ errNoReplSet = errors.New("norepl")
)
// ConvertibleDoc is an interface implemented by special types which wrap data
@@ -132,16 +135,72 @@ func getUpsertValue(field string, document bson.M) interface{} {
// io.EOF - indicating a lost connection to the server, it sets the error as
// such. In any case, it unconditionally returns an error which may/may not be nil
func handleErr(stopOnError bool, err error) error {
+ if err == nil {
+ return nil
+ }
+ if err.Error() == errNoReachableServer.Error() {
+ return err
+ }
+ if err.Error() == errNoReplSet.Error() {
+ log.Logf(log.Info, "write concern 'majority' run against standalone")
+ return nil
+ }
+ if err == io.EOF {
+ err = errLostConnection
+ }
+ log.Logf(log.Always, "error inserting documents: %v", err)
+ if stopOnError || err == errLostConnection {
+ return err
+ }
+ return nil
+}
+
+// insertDocuments writes the given documents to the specified collection and
+// returns the number of documents inserted and an error. It can handle both
+// ordered and unordered writes. If both a write error and a write concern error
+// are encountered, the write error is returned. If the target server is not
+// capable of handling write commands, it returns an error.
+func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) {
+ // mongod v2.6 requires you to explicitly pass an integer for numeric write
+ // concerns
+ var wc interface{}
+ if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil {
+ wc = writeConcern
+ } else {
+ wc = intWriteConcern
+ }
+
+ response := &db.WriteCommandResponse{}
+ err := collection.Database.Run(
+ bson.D{
+ bson.DocElem{"insert", collection.Name},
+ bson.DocElem{"ordered", ordered},
+ bson.DocElem{"documents", documents},
+ bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}},
+ }, response)
if err != nil {
- if err == io.EOF {
- err = errLostConnection
- }
- log.Logf(log.Always, "error inserting documents: %v", err)
- if stopOnError || err == errLostConnection {
- return err
+ return 0, err
+ }
+ // if the write concern is 0, n is not present in the response document
+ // so we return immediately with no errors
+ if response.NumAffected == nil {
+ return len(documents), nil
+ }
+ if response.Ok == 0 {
+ // the command itself failed (authentication failed.., syntax error)
+ return 0, fmt.Errorf("write command failed")
+ } else if len(response.WriteErrors) != 0 {
+ // happens if the server couldn't write the data; e.g. because of a
+ // duplicate key, running out of disk space, etc
+ for _, writeError := range response.WriteErrors {
+ log.Logf(log.Always, writeError.Errmsg)
}
+ return *response.NumAffected, fmt.Errorf("encountered write errors")
+ } else if response.WriteConcernError.Errmsg != "" {
+ log.Logf(log.Always, response.WriteConcernError.Errmsg)
+ return 0, fmt.Errorf("encountered write concern error")
}
- return nil
+ return *response.NumAffected, nil
}
// removeBlankFields removes empty/blank fields in csv and tsv
@@ -180,6 +239,47 @@ func setNestedValue(key string, value interface{}, document *bson.D) {
}
}
+// streamDocuments concurrently processes data gotten from the inputChan
+// channel in parallel and then sends over the processed data to the outputChan
+// channel - either in sequence or concurrently (depending on the value of
+// ordered) - in which the data was received
+func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) {
+ var importWorkers []*ImportWorker
+ // initialize all our concurrent processing threads
+ wg := &sync.WaitGroup{}
+ inChan := inputChan
+ outChan := outputChan
+ for i := 0; i < numProcessingThreads; i++ {
+ if ordered {
+ // TODO: experiment with buffered channel size; the buffer size of
+ // inChan should always be the same as that of outChan
+ workerBufferSize := 100
+ inChan = make(chan ConvertibleDoc, workerBufferSize)
+ outChan = make(chan bson.D, workerBufferSize)
+ }
+ importWorker := &ImportWorker{
+ unprocessedDataChan: inChan,
+ processedDocumentChan: outChan,
+ }
+ importWorkers = append(importWorkers, importWorker)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := importWorker.processDocuments(ordered); err != nil {
+ errChan <- err
+ }
+ }()
+ }
+
+ // if ordered, we have to coordinate the sequence in which processed
+ // documents are passed to the main read channel
+ if ordered {
+ doSequentialStreaming(importWorkers, inputChan, outputChan)
+ }
+ wg.Wait()
+ close(outputChan)
+}
+
// tokensToBSON reads in slice of records - along with ordered fields names -
// and returns a BSON document for the record.
func tokensToBSON(fields, tokens []string, numProcessed uint64) (bson.D, error) {
@@ -274,92 +374,3 @@ func (importWorker *ImportWorker) processDocuments(ordered bool) error {
}
return nil
}
-
-// streamDocuments concurrently processes data gotten from the inputChan
-// channel in parallel and then sends over the processed data to the outputChan
-// channel - either in sequence or concurrently (depending on the value of
-// ordered) - in which the data was received
-func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) {
- var importWorkers []*ImportWorker
- // initialize all our concurrent processing threads
- wg := &sync.WaitGroup{}
- inChan := inputChan
- outChan := outputChan
- for i := 0; i < numProcessingThreads; i++ {
- if ordered {
- // TODO: experiment with buffered channel size; the buffer size of
- // inChan should always be the same as that of outChan
- workerBufferSize := 100
- inChan = make(chan ConvertibleDoc, workerBufferSize)
- outChan = make(chan bson.D, workerBufferSize)
- }
- importWorker := &ImportWorker{
- unprocessedDataChan: inChan,
- processedDocumentChan: outChan,
- }
- importWorkers = append(importWorkers, importWorker)
- wg.Add(1)
- go func() {
- defer wg.Done()
- if err := importWorker.processDocuments(ordered); err != nil {
- errChan <- err
- }
- }()
- }
-
- // if ordered, we have to coordinate the sequence in which processed
- // documents are passed to the main read channel
- if ordered {
- doSequentialStreaming(importWorkers, inputChan, outputChan)
- }
- wg.Wait()
- close(outputChan)
-}
-
-// insertDocuments writes the given documents to the specified collection and
-// returns the number of documents inserted and an error. It can handle both
-// ordered and unordered writes. If both a write error and a write concern error
-// are encountered, the write error is returned. If the target server is not
-// capable of handling write commands, it returns an error.
-func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) {
- // mongod v2.6 requires you to explicitly pass an integer for numeric write
- // concerns
- var wc interface{}
- if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil {
- wc = writeConcern
- } else {
- wc = intWriteConcern
- }
-
- response := &db.WriteCommandResponse{}
- err := collection.Database.Run(
- bson.D{
- bson.DocElem{"insert", collection.Name},
- bson.DocElem{"ordered", ordered},
- bson.DocElem{"documents", documents},
- bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}},
- }, response)
- if err != nil {
- return 0, err
- }
- // if the write concern is 0, n is not present in the response document
- // so we return immediately with no errors
- if response.NumAffected == nil {
- return len(documents), nil
- }
- if response.Ok == 0 {
- // the command itself failed (authentication failed.., syntax error)
- return 0, fmt.Errorf("write command failed")
- } else if len(response.WriteErrors) != 0 {
- // happens if the server couldn't write the data; e.g. because of a
- // duplicate key, running out of disk space, etc
- for _, writeError := range response.WriteErrors {
- log.Logf(log.Always, writeError.Errmsg)
- }
- return *response.NumAffected, fmt.Errorf("encountered write errors")
- } else if response.WriteConcernError.Errmsg != "" {
- log.Logf(log.Always, response.WriteConcernError.Errmsg)
- return 0, fmt.Errorf("encountered write concern error")
- }
- return *response.NumAffected, nil
-}
diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go
index 25bb31beda6..5d232285017 100644
--- a/mongoimport/mongoimport.go
+++ b/mongoimport/mongoimport.go
@@ -1,7 +1,6 @@
package mongoimport
import (
- "errors"
"fmt"
"github.com/mongodb/mongo-tools/common/db"
"github.com/mongodb/mongo-tools/common/log"
@@ -27,16 +26,10 @@ const (
JSON = "json"
)
-var (
- errNsNotFound = errors.New("ns not found")
-)
-
// ingestion constants
const (
- maxBSONSize = 16 * (1024 * 1024)
- maxMessageSizeBytes = 2 * maxBSONSize
- maxWriteCommandSizeBytes = maxBSONSize / 2
- updateAfterNumInserts = 10000
+ maxBSONSize = 16 * (1024 * 1024)
+ maxMessageSizeBytes = 2 * maxBSONSize
)
// variables used by the input/ingestion goroutines
@@ -70,10 +63,6 @@ type MongoImport struct {
// the tomb is used to synchronize ingestion goroutines and causes
// other sibling goroutines to terminate immediately if one errors out
tomb *tomb.Tomb
-
- // indicates if the underlying connected server on the session suports
- // write commands
- supportsWriteCommands bool
}
// InputReader is an interface that specifies how an input source should be
@@ -350,13 +339,6 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp
// initialize insertion lock
mongoImport.insertionLock = &sync.Mutex{}
- // check if the server supports write commands
- mongoImport.supportsWriteCommands, err = mongoImport.SessionProvider.SupportsWriteCommands()
- if err != nil {
- return 0, fmt.Errorf("error checking if server supports write commands: %v", err)
- }
- log.Logf(log.Info, "using write commands: %v", mongoImport.supportsWriteCommands)
-
// return immediately on ingest errors - these will be triggered
// either by an issue ingesting data or if the read channel is
// closed so we can block here while reads happen in a goroutine
@@ -448,13 +430,6 @@ func (mongoImport *MongoImport) ingestDocs(readChan chan bson.D) (err error) {
documents := make([]bson.Raw, 0)
numMessageBytes := 0
- // set the max message size bytes based on whether or not we're using
- // write commands
- maxMessageSize := maxMessageSizeBytes
- if mongoImport.supportsWriteCommands {
- maxMessageSize = maxWriteCommandSizeBytes
- }
-
readLoop:
for {
select {
@@ -466,12 +441,12 @@ readLoop:
// limit so we self impose a limit by using maxMessageSizeBytes
// and send documents over the wire when we hit the batch size
// or when we're at/over the maximum message size threshold
- if len(documents) == batchSize || numMessageBytes >= maxMessageSize {
+ if len(documents) == batchSize || numMessageBytes >= maxMessageSizeBytes {
if err = mongoImport.ingester(documents, collection); err != nil {
return err
}
// TODO: TOOLS-313; better to use a progress bar here
- if mongoImport.insertionCount%updateAfterNumInserts == 0 {
+ if mongoImport.insertionCount%10000 == 0 {
log.Logf(log.Always, "Progress: %v documents inserted...", mongoImport.insertionCount)
}
documents = documents[:0]
@@ -535,7 +510,6 @@ func (mongoImport *MongoImport) handleUpsert(documents []bson.Raw, collection *m
func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.Collection) (err error) {
numInserted := 0
stopOnError := mongoImport.IngestOptions.StopOnError
- writeConcern := mongoImport.IngestOptions.WriteConcern
maintainInsertionOrder := mongoImport.IngestOptions.MaintainInsertionOrder
defer func() {
@@ -546,12 +520,9 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C
if mongoImport.IngestOptions.Upsert {
return mongoImport.handleUpsert(documents, collection, &numInserted)
- } else if mongoImport.supportsWriteCommands {
- // note that this count may not be entirely accurate if some
- // ingester threads insert when another errors out. however,
- // any/all errors will be logged if/when they are encountered
- numInserted, err = insertDocuments(documents, collection, stopOnError, writeConcern)
} else {
+ // note that this count may not be entirely accurate if some
+ // ingester threads insert when another errors out.
// without write commands, we can't say for sure how many documents were
// inserted when we use bulk inserts so we assume the entire batch
// succeeded - even if an error is returned. The result is that we may
diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go
index a5d99f8cd76..242d0f1ce5c 100644
--- a/mongoimport/mongoimport_test.go
+++ b/mongoimport/mongoimport_test.go
@@ -630,6 +630,7 @@ func TestImportDocuments(t *testing.T) {
NumProcessingThreads: &numProcessingThreads,
NumIngestionThreads: &numIngestionThreads,
MaintainInsertionOrder: true,
+ WriteConcern: "majority",
}
sessionProvider, err = db.InitSessionProvider(*toolOptions)
So(err, ShouldBeNil)