diff options
author | Wisdom Omuya <deafgoat@gmail.com> | 2014-10-31 16:23:47 -0400 |
---|---|---|
committer | Wisdom Omuya <deafgoat@gmail.com> | 2014-10-31 17:10:13 -0400 |
commit | a9c5b69b6b8451e3539a7dee57cfa79e3606122e (patch) | |
tree | 3e9181ab7adf2c961840a14214d2264f8a84640b | |
parent | 156c0f141ecfa29ee674d6a713544f840f8acdaa (diff) | |
download | mongo-a9c5b69b6b8451e3539a7dee57cfa79e3606122e.tar.gz |
TOOLS-318: remove write commands
Former-commit-id: 5b45e96318e443ff8fdcfe45e032887668ad9dab
-rw-r--r-- | common/db/command.go | 2 | ||||
-rw-r--r-- | mongoimport/common.go | 205 | ||||
-rw-r--r-- | mongoimport/mongoimport.go | 41 | ||||
-rw-r--r-- | mongoimport/mongoimport_test.go | 1 |
4 files changed, 116 insertions, 133 deletions
diff --git a/common/db/command.go b/common/db/command.go index 5e5d1c745e6..4b8afec3fca 100644 --- a/common/db/command.go +++ b/common/db/command.go @@ -99,7 +99,7 @@ func (sp *SessionProvider) IsReplicaSet() (bool, error) { if hasSetName || hasHosts { return true, nil } - return false, err + return false, nil } func (sp *SessionProvider) SupportsWriteCommands() (bool, error) { diff --git a/mongoimport/common.go b/mongoimport/common.go index 8c8f5237049..0a34280dfa4 100644 --- a/mongoimport/common.go +++ b/mongoimport/common.go @@ -17,7 +17,10 @@ import ( ) var ( - errLostConnection = errors.New("lost connection to server") + errLostConnection = errors.New("lost connection to server") + errNoReachableServer = errors.New("no reachable servers") + errNsNotFound = errors.New("ns not found") + errNoReplSet = errors.New("norepl") ) // ConvertibleDoc is an interface implemented by special types which wrap data @@ -132,16 +135,72 @@ func getUpsertValue(field string, document bson.M) interface{} { // io.EOF - indicating a lost connection to the server, it sets the error as // such. In any case, it unconditionally returns an error which may/may not be nil func handleErr(stopOnError bool, err error) error { + if err == nil { + return nil + } + if err.Error() == errNoReachableServer.Error() { + return err + } + if err.Error() == errNoReplSet.Error() { + log.Logf(log.Info, "write concern 'majority' run against standalone") + return nil + } + if err == io.EOF { + err = errLostConnection + } + log.Logf(log.Always, "error inserting documents: %v", err) + if stopOnError || err == errLostConnection { + return err + } + return nil +} + +// insertDocuments writes the given documents to the specified collection and +// returns the number of documents inserted and an error. It can handle both +// ordered and unordered writes. If both a write error and a write concern error +// are encountered, the write error is returned. If the target server is not +// capable of handling write commands, it returns an error. +func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) { + // mongod v2.6 requires you to explicitly pass an integer for numeric write + // concerns + var wc interface{} + if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil { + wc = writeConcern + } else { + wc = intWriteConcern + } + + response := &db.WriteCommandResponse{} + err := collection.Database.Run( + bson.D{ + bson.DocElem{"insert", collection.Name}, + bson.DocElem{"ordered", ordered}, + bson.DocElem{"documents", documents}, + bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}}, + }, response) if err != nil { - if err == io.EOF { - err = errLostConnection - } - log.Logf(log.Always, "error inserting documents: %v", err) - if stopOnError || err == errLostConnection { - return err + return 0, err + } + // if the write concern is 0, n is not present in the response document + // so we return immediately with no errors + if response.NumAffected == nil { + return len(documents), nil + } + if response.Ok == 0 { + // the command itself failed (authentication failed.., syntax error) + return 0, fmt.Errorf("write command failed") + } else if len(response.WriteErrors) != 0 { + // happens if the server couldn't write the data; e.g. because of a + // duplicate key, running out of disk space, etc + for _, writeError := range response.WriteErrors { + log.Logf(log.Always, writeError.Errmsg) } + return *response.NumAffected, fmt.Errorf("encountered write errors") + } else if response.WriteConcernError.Errmsg != "" { + log.Logf(log.Always, response.WriteConcernError.Errmsg) + return 0, fmt.Errorf("encountered write concern error") } - return nil + return *response.NumAffected, nil } // removeBlankFields removes empty/blank fields in csv and tsv @@ -180,6 +239,47 @@ func setNestedValue(key string, value interface{}, document *bson.D) { } } +// streamDocuments concurrently processes data gotten from the inputChan +// channel in parallel and then sends over the processed data to the outputChan +// channel - either in sequence or concurrently (depending on the value of +// ordered) - in which the data was received +func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) { + var importWorkers []*ImportWorker + // initialize all our concurrent processing threads + wg := &sync.WaitGroup{} + inChan := inputChan + outChan := outputChan + for i := 0; i < numProcessingThreads; i++ { + if ordered { + // TODO: experiment with buffered channel size; the buffer size of + // inChan should always be the same as that of outChan + workerBufferSize := 100 + inChan = make(chan ConvertibleDoc, workerBufferSize) + outChan = make(chan bson.D, workerBufferSize) + } + importWorker := &ImportWorker{ + unprocessedDataChan: inChan, + processedDocumentChan: outChan, + } + importWorkers = append(importWorkers, importWorker) + wg.Add(1) + go func() { + defer wg.Done() + if err := importWorker.processDocuments(ordered); err != nil { + errChan <- err + } + }() + } + + // if ordered, we have to coordinate the sequence in which processed + // documents are passed to the main read channel + if ordered { + doSequentialStreaming(importWorkers, inputChan, outputChan) + } + wg.Wait() + close(outputChan) +} + // tokensToBSON reads in slice of records - along with ordered fields names - // and returns a BSON document for the record. func tokensToBSON(fields, tokens []string, numProcessed uint64) (bson.D, error) { @@ -274,92 +374,3 @@ func (importWorker *ImportWorker) processDocuments(ordered bool) error { } return nil } - -// streamDocuments concurrently processes data gotten from the inputChan -// channel in parallel and then sends over the processed data to the outputChan -// channel - either in sequence or concurrently (depending on the value of -// ordered) - in which the data was received -func streamDocuments(ordered bool, inputChan chan ConvertibleDoc, outputChan chan bson.D, errChan chan error) { - var importWorkers []*ImportWorker - // initialize all our concurrent processing threads - wg := &sync.WaitGroup{} - inChan := inputChan - outChan := outputChan - for i := 0; i < numProcessingThreads; i++ { - if ordered { - // TODO: experiment with buffered channel size; the buffer size of - // inChan should always be the same as that of outChan - workerBufferSize := 100 - inChan = make(chan ConvertibleDoc, workerBufferSize) - outChan = make(chan bson.D, workerBufferSize) - } - importWorker := &ImportWorker{ - unprocessedDataChan: inChan, - processedDocumentChan: outChan, - } - importWorkers = append(importWorkers, importWorker) - wg.Add(1) - go func() { - defer wg.Done() - if err := importWorker.processDocuments(ordered); err != nil { - errChan <- err - } - }() - } - - // if ordered, we have to coordinate the sequence in which processed - // documents are passed to the main read channel - if ordered { - doSequentialStreaming(importWorkers, inputChan, outputChan) - } - wg.Wait() - close(outputChan) -} - -// insertDocuments writes the given documents to the specified collection and -// returns the number of documents inserted and an error. It can handle both -// ordered and unordered writes. If both a write error and a write concern error -// are encountered, the write error is returned. If the target server is not -// capable of handling write commands, it returns an error. -func insertDocuments(documents []bson.Raw, collection *mgo.Collection, ordered bool, writeConcern string) (int, error) { - // mongod v2.6 requires you to explicitly pass an integer for numeric write - // concerns - var wc interface{} - if intWriteConcern, err := strconv.Atoi(writeConcern); err != nil { - wc = writeConcern - } else { - wc = intWriteConcern - } - - response := &db.WriteCommandResponse{} - err := collection.Database.Run( - bson.D{ - bson.DocElem{"insert", collection.Name}, - bson.DocElem{"ordered", ordered}, - bson.DocElem{"documents", documents}, - bson.DocElem{"writeConcern", bson.D{bson.DocElem{"w", wc}}}, - }, response) - if err != nil { - return 0, err - } - // if the write concern is 0, n is not present in the response document - // so we return immediately with no errors - if response.NumAffected == nil { - return len(documents), nil - } - if response.Ok == 0 { - // the command itself failed (authentication failed.., syntax error) - return 0, fmt.Errorf("write command failed") - } else if len(response.WriteErrors) != 0 { - // happens if the server couldn't write the data; e.g. because of a - // duplicate key, running out of disk space, etc - for _, writeError := range response.WriteErrors { - log.Logf(log.Always, writeError.Errmsg) - } - return *response.NumAffected, fmt.Errorf("encountered write errors") - } else if response.WriteConcernError.Errmsg != "" { - log.Logf(log.Always, response.WriteConcernError.Errmsg) - return 0, fmt.Errorf("encountered write concern error") - } - return *response.NumAffected, nil -} diff --git a/mongoimport/mongoimport.go b/mongoimport/mongoimport.go index 25bb31beda6..5d232285017 100644 --- a/mongoimport/mongoimport.go +++ b/mongoimport/mongoimport.go @@ -1,7 +1,6 @@ package mongoimport import ( - "errors" "fmt" "github.com/mongodb/mongo-tools/common/db" "github.com/mongodb/mongo-tools/common/log" @@ -27,16 +26,10 @@ const ( JSON = "json" ) -var ( - errNsNotFound = errors.New("ns not found") -) - // ingestion constants const ( - maxBSONSize = 16 * (1024 * 1024) - maxMessageSizeBytes = 2 * maxBSONSize - maxWriteCommandSizeBytes = maxBSONSize / 2 - updateAfterNumInserts = 10000 + maxBSONSize = 16 * (1024 * 1024) + maxMessageSizeBytes = 2 * maxBSONSize ) // variables used by the input/ingestion goroutines @@ -70,10 +63,6 @@ type MongoImport struct { // the tomb is used to synchronize ingestion goroutines and causes // other sibling goroutines to terminate immediately if one errors out tomb *tomb.Tomb - - // indicates if the underlying connected server on the session suports - // write commands - supportsWriteCommands bool } // InputReader is an interface that specifies how an input source should be @@ -350,13 +339,6 @@ func (mongoImport *MongoImport) importDocuments(inputReader InputReader) (numImp // initialize insertion lock mongoImport.insertionLock = &sync.Mutex{} - // check if the server supports write commands - mongoImport.supportsWriteCommands, err = mongoImport.SessionProvider.SupportsWriteCommands() - if err != nil { - return 0, fmt.Errorf("error checking if server supports write commands: %v", err) - } - log.Logf(log.Info, "using write commands: %v", mongoImport.supportsWriteCommands) - // return immediately on ingest errors - these will be triggered // either by an issue ingesting data or if the read channel is // closed so we can block here while reads happen in a goroutine @@ -448,13 +430,6 @@ func (mongoImport *MongoImport) ingestDocs(readChan chan bson.D) (err error) { documents := make([]bson.Raw, 0) numMessageBytes := 0 - // set the max message size bytes based on whether or not we're using - // write commands - maxMessageSize := maxMessageSizeBytes - if mongoImport.supportsWriteCommands { - maxMessageSize = maxWriteCommandSizeBytes - } - readLoop: for { select { @@ -466,12 +441,12 @@ readLoop: // limit so we self impose a limit by using maxMessageSizeBytes // and send documents over the wire when we hit the batch size // or when we're at/over the maximum message size threshold - if len(documents) == batchSize || numMessageBytes >= maxMessageSize { + if len(documents) == batchSize || numMessageBytes >= maxMessageSizeBytes { if err = mongoImport.ingester(documents, collection); err != nil { return err } // TODO: TOOLS-313; better to use a progress bar here - if mongoImport.insertionCount%updateAfterNumInserts == 0 { + if mongoImport.insertionCount%10000 == 0 { log.Logf(log.Always, "Progress: %v documents inserted...", mongoImport.insertionCount) } documents = documents[:0] @@ -535,7 +510,6 @@ func (mongoImport *MongoImport) handleUpsert(documents []bson.Raw, collection *m func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.Collection) (err error) { numInserted := 0 stopOnError := mongoImport.IngestOptions.StopOnError - writeConcern := mongoImport.IngestOptions.WriteConcern maintainInsertionOrder := mongoImport.IngestOptions.MaintainInsertionOrder defer func() { @@ -546,12 +520,9 @@ func (mongoImport *MongoImport) ingester(documents []bson.Raw, collection *mgo.C if mongoImport.IngestOptions.Upsert { return mongoImport.handleUpsert(documents, collection, &numInserted) - } else if mongoImport.supportsWriteCommands { - // note that this count may not be entirely accurate if some - // ingester threads insert when another errors out. however, - // any/all errors will be logged if/when they are encountered - numInserted, err = insertDocuments(documents, collection, stopOnError, writeConcern) } else { + // note that this count may not be entirely accurate if some + // ingester threads insert when another errors out. // without write commands, we can't say for sure how many documents were // inserted when we use bulk inserts so we assume the entire batch // succeeded - even if an error is returned. The result is that we may diff --git a/mongoimport/mongoimport_test.go b/mongoimport/mongoimport_test.go index a5d99f8cd76..242d0f1ce5c 100644 --- a/mongoimport/mongoimport_test.go +++ b/mongoimport/mongoimport_test.go @@ -630,6 +630,7 @@ func TestImportDocuments(t *testing.T) { NumProcessingThreads: &numProcessingThreads, NumIngestionThreads: &numIngestionThreads, MaintainInsertionOrder: true, + WriteConcern: "majority", } sessionProvider, err = db.InitSessionProvider(*toolOptions) So(err, ShouldBeNil) |