diff options
author | Ramon Fernandez <ramon@mongodb.com> | 2016-08-25 16:34:34 -0400 |
---|---|---|
committer | Ramon Fernandez <ramon@mongodb.com> | 2016-08-25 16:54:18 -0400 |
commit | c330c9991ab45e7d0685d53e699ef26dba065660 (patch) | |
tree | 3dc5cd06b5f6c7eaaa4cb20cbe763504c14a772b /src/mongo/gotools/mongoimport | |
parent | eb62b862d5ebf179a1bcd9f394070e69c30188ab (diff) | |
download | mongo-c330c9991ab45e7d0685d53e699ef26dba065660.tar.gz |
Import tools: 5b883d86fdb4df55036d5dba2ca6f9dfa0750b44 from branch v3.3
ref: 1ac1389bda..5b883d86fd
for: 3.3.12
SERVER-25814 Initial vendor import: tools
Diffstat (limited to 'src/mongo/gotools/mongoimport')
30 files changed, 5179 insertions, 0 deletions
diff --git a/src/mongo/gotools/mongoimport/common.go b/src/mongo/gotools/mongoimport/common.go new file mode 100644 index 00000000000..cd7c4d333e8 --- /dev/null +++ b/src/mongo/gotools/mongoimport/common.go @@ -0,0 +1,472 @@ +package mongoimport + +import ( + "bufio" + "bytes" + "fmt" + "io" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/mongodb/mongo-tools/common/bsonutil" + "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/util" + "gopkg.in/mgo.v2/bson" + "gopkg.in/tomb.v2" +) + +type ParseGrace int + +const ( + pgAutoCast ParseGrace = iota + pgSkipField + pgSkipRow + pgStop +) + +// ValidatePG ensures the user-provided parseGrace is one of the allowed +// values. +func ValidatePG(pg string) (ParseGrace, error) { + switch pg { + case "autoCast": + return pgAutoCast, nil + case "skipField": + return pgSkipField, nil + case "skipRow": + return pgSkipRow, nil + case "stop": + return pgStop, nil + default: + return pgAutoCast, fmt.Errorf("invalid parse grace: %s", pg) + } +} + +// ParsePG interprets the user-provided parseGrace, assuming it is valid. +func ParsePG(pg string) (res ParseGrace) { + res, _ = ValidatePG(pg) + return +} + +// Converter is an interface that adds the basic Convert method which returns a +// valid BSON document that has been converted by the underlying implementation. +// If conversion fails, err will be set. +type Converter interface { + Convert() (document bson.D, err error) +} + +// An importWorker reads Converter from the unprocessedDataChan channel and +// sends processed BSON documents on the processedDocumentChan channel +type importWorker struct { + // unprocessedDataChan is used to stream the input data for a worker to process + unprocessedDataChan chan Converter + + // used to stream the processed document back to the caller + processedDocumentChan chan bson.D + + // used to synchronise all worker goroutines + tomb *tomb.Tomb +} + +// an interface for tracking the number of bytes, which is used in mongoimport to feed +// the progress bar. +type sizeTracker interface { + Size() int64 +} + +// sizeTrackingReader implements Reader and sizeTracker by wrapping an io.Reader and keeping track +// of the total number of bytes read from each call to Read(). +type sizeTrackingReader struct { + bytesRead int64 + reader io.Reader +} + +func (str *sizeTrackingReader) Size() int64 { + bytes := atomic.LoadInt64(&str.bytesRead) + return bytes +} + +func (str *sizeTrackingReader) Read(p []byte) (n int, err error) { + n, err = str.reader.Read(p) + atomic.AddInt64(&str.bytesRead, int64(n)) + return +} + +func newSizeTrackingReader(reader io.Reader) *sizeTrackingReader { + return &sizeTrackingReader{ + reader: reader, + bytesRead: 0, + } +} + +var ( + UTF8_BOM = []byte{0xEF, 0xBB, 0xBF} +) + +// bomDiscardingReader implements and wraps io.Reader, discarding the UTF-8 BOM, if applicable +type bomDiscardingReader struct { + buf *bufio.Reader + didRead bool +} + +func (bd *bomDiscardingReader) Read(p []byte) (int, error) { + if !bd.didRead { + bom, err := bd.buf.Peek(3) + if err == nil && bytes.Equal(bom, UTF8_BOM) { + bd.buf.Read(make([]byte, 3)) // discard BOM + } + bd.didRead = true + } + return bd.buf.Read(p) +} + +func newBomDiscardingReader(r io.Reader) *bomDiscardingReader { + return &bomDiscardingReader{buf: bufio.NewReader(r)} +} + +// channelQuorumError takes a channel and a quorum - which specifies how many +// messages to receive on that channel before returning. It either returns the +// first non-nil error received on the channel or nil if up to `quorum` nil +// errors are received +func channelQuorumError(ch <-chan error, quorum int) (err error) { + for i := 0; i < quorum; i++ { + if err = <-ch; err != nil { + return + } + } + return +} + +// constructUpsertDocument constructs a BSON document to use for upserts +func constructUpsertDocument(upsertFields []string, document bson.D) bson.D { + upsertDocument := bson.D{} + var hasDocumentKey bool + for _, key := range upsertFields { + val := getUpsertValue(key, document) + if val != nil { + hasDocumentKey = true + } + upsertDocument = append(upsertDocument, bson.DocElem{Name: key, Value: val}) + } + if !hasDocumentKey { + return nil + } + return upsertDocument +} + +// doSequentialStreaming takes a slice of workers, a readDocs (input) channel and +// an outputChan (output) channel. It sequentially writes unprocessed data read from +// the input channel to each worker and then sequentially reads the processed data +// from each worker before passing it on to the output channel +func doSequentialStreaming(workers []*importWorker, readDocs chan Converter, outputChan chan bson.D) { + numWorkers := len(workers) + + // feed in the data to be processed and do round-robin + // reads from each worker once processing is completed + go func() { + i := 0 + for doc := range readDocs { + workers[i].unprocessedDataChan <- doc + i = (i + 1) % numWorkers + } + + // close the read channels of all the workers + for i := 0; i < numWorkers; i++ { + close(workers[i].unprocessedDataChan) + } + }() + + // coordinate the order in which the documents are sent over to the + // main output channel + numDoneWorkers := 0 + i := 0 + for { + processedDocument, open := <-workers[i].processedDocumentChan + if open { + outputChan <- processedDocument + } else { + numDoneWorkers++ + } + if numDoneWorkers == numWorkers { + break + } + i = (i + 1) % numWorkers + } +} + +// getUpsertValue takes a given BSON document and a given field, and returns the +// field's associated value in the document. The field is specified using dot +// notation for nested fields. e.g. "person.age" would return 34 would return +// 34 in the document: bson.M{"person": bson.M{"age": 34}} whereas, +// "person.name" would return nil +func getUpsertValue(field string, document bson.D) interface{} { + index := strings.Index(field, ".") + if index == -1 { + // grab the value (ignoring errors because we are okay with nil) + val, _ := bsonutil.FindValueByKey(field, &document) + return val + } + // recurse into subdocuments + left := field[0:index] + subDoc, _ := bsonutil.FindValueByKey(left, &document) + if subDoc == nil { + return nil + } + subDocD, ok := subDoc.(bson.D) + if !ok { + return nil + } + return getUpsertValue(field[index+1:], subDocD) +} + +// filterIngestError accepts a boolean indicating if a non-nil error should be, +// returned as an actual error. +// +// If the error indicates an unreachable server, it returns that immediately. +// +// If the error indicates an invalid write concern was passed, it returns nil +// +// If the error is not nil, it logs the error. If the error is an io.EOF error - +// indicating a lost connection to the server, it sets the error as such. +// +func filterIngestError(stopOnError bool, err error) error { + if err == nil { + return nil + } + if err.Error() == io.EOF.Error() { + return fmt.Errorf(db.ErrLostConnection) + } + if stopOnError || db.IsConnectionError(err) { + return err + } + log.Logvf(log.Always, "error inserting documents: %v", err) + return nil +} + +// removeBlankFields takes document and returns a new copy in which +// fields with empty/blank values are removed +func removeBlankFields(document bson.D) (newDocument bson.D) { + for _, keyVal := range document { + if val, ok := keyVal.Value.(*bson.D); ok { + keyVal.Value = removeBlankFields(*val) + } + if val, ok := keyVal.Value.(string); ok && val == "" { + continue + } + if val, ok := keyVal.Value.(bson.D); ok && val == nil { + continue + } + newDocument = append(newDocument, keyVal) + } + return newDocument +} + +// setNestedValue takes a nested field - in the form "a.b.c" - +// its associated value, and a document. It then assigns that +// value to the appropriate nested field within the document +func setNestedValue(key string, value interface{}, document *bson.D) { + index := strings.Index(key, ".") + if index == -1 { + *document = append(*document, bson.DocElem{Name: key, Value: value}) + return + } + keyName := key[0:index] + subDocument := &bson.D{} + elem, err := bsonutil.FindValueByKey(keyName, document) + if err != nil { // no such key in the document + elem = nil + } + var existingKey bool + if elem != nil { + subDocument = elem.(*bson.D) + existingKey = true + } + setNestedValue(key[index+1:], value, subDocument) + if !existingKey { + *document = append(*document, bson.DocElem{Name: keyName, Value: subDocument}) + } +} + +// streamDocuments concurrently processes data gotten from the inputChan +// channel in parallel and then sends over the processed data to the outputChan +// channel - either in sequence or concurrently (depending on the value of +// ordered) - in which the data was received +func streamDocuments(ordered bool, numDecoders int, readDocs chan Converter, outputChan chan bson.D) (retErr error) { + if numDecoders == 0 { + numDecoders = 1 + } + var importWorkers []*importWorker + wg := new(sync.WaitGroup) + importTomb := new(tomb.Tomb) + inChan := readDocs + outChan := outputChan + for i := 0; i < numDecoders; i++ { + if ordered { + inChan = make(chan Converter, workerBufferSize) + outChan = make(chan bson.D, workerBufferSize) + } + iw := &importWorker{ + unprocessedDataChan: inChan, + processedDocumentChan: outChan, + tomb: importTomb, + } + importWorkers = append(importWorkers, iw) + wg.Add(1) + go func(iw importWorker) { + defer wg.Done() + // only set the first worker error and cause sibling goroutines + // to terminate immediately + err := iw.processDocuments(ordered) + if err != nil && retErr == nil { + retErr = err + iw.tomb.Kill(err) + } + }(*iw) + } + + // if ordered, we have to coordinate the sequence in which processed + // documents are passed to the main read channel + if ordered { + doSequentialStreaming(importWorkers, readDocs, outputChan) + } + wg.Wait() + close(outputChan) + return +} + +// coercionError should only be used as a specific error type to check +// whether tokensToBSON wants the row to print +type coercionError struct{} + +func (coercionError) Error() string { return "coercionError" } + +// tokensToBSON reads in slice of records - along with ordered column names - +// and returns a BSON document for the record. +func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64, ignoreBlanks bool) (bson.D, error) { + log.Logvf(log.DebugHigh, "got line: %v", tokens) + var parsedValue interface{} + document := bson.D{} + for index, token := range tokens { + if token == "" && ignoreBlanks { + continue + } + if index < len(colSpecs) { + parsedValue, err := colSpecs[index].Parser.Parse(token) + if err != nil { + log.Logvf(log.DebugHigh, "parse failure in document #%d for column '%s',"+ + "could not parse token '%s' to type %s", + numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName) + switch colSpecs[index].ParseGrace { + case pgAutoCast: + parsedValue = autoParse(token) + case pgSkipField: + continue + case pgSkipRow: + log.Logvf(log.Always, "skipping row #%d: %v", numProcessed, tokens) + return nil, coercionError{} + case pgStop: + return nil, fmt.Errorf("type coercion failure in document #%d for column '%s', "+ + "could not parse token '%s' to type %s", + numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName) + } + } + if strings.Index(colSpecs[index].Name, ".") != -1 { + setNestedValue(colSpecs[index].Name, parsedValue, &document) + } else { + document = append(document, bson.DocElem{Name: colSpecs[index].Name, Value: parsedValue}) + } + } else { + parsedValue = autoParse(token) + key := "field" + strconv.Itoa(index) + if util.StringSliceContains(ColumnNames(colSpecs), key) { + return nil, fmt.Errorf("duplicate field name - on %v - for token #%v ('%v') in document #%v", + key, index+1, parsedValue, numProcessed) + } + document = append(document, bson.DocElem{Name: key, Value: parsedValue}) + } + } + return document, nil +} + +// validateFields takes a slice of fields and returns an error if the fields +// are invalid, returns nil otherwise +func validateFields(fields []string) error { + fieldsCopy := make([]string, len(fields), len(fields)) + copy(fieldsCopy, fields) + sort.Sort(sort.StringSlice(fieldsCopy)) + + for index, field := range fieldsCopy { + if strings.HasSuffix(field, ".") { + return fmt.Errorf("field '%v' cannot end with a '.'", field) + } + if strings.HasPrefix(field, ".") { + return fmt.Errorf("field '%v' cannot start with a '.'", field) + } + if strings.HasPrefix(field, "$") { + return fmt.Errorf("field '%v' cannot start with a '$'", field) + } + if strings.Contains(field, "..") { + return fmt.Errorf("field '%v' cannot contain consecutive '.' characters", field) + } + // NOTE: since fields is sorted, this check ensures that no field + // is incompatible with another one that occurs further down the list. + // meant to prevent cases where we have fields like "a" and "a.c" + for _, latterField := range fieldsCopy[index+1:] { + // NOTE: this means we will not support imports that have fields that + // include e.g. a, a.b + if strings.HasPrefix(latterField, field+".") { + return fmt.Errorf("fields '%v' and '%v' are incompatible", field, latterField) + } + // NOTE: this means we will not support imports that have fields like + // a, a - since this is invalid in MongoDB + if field == latterField { + return fmt.Errorf("fields cannot be identical: '%v' and '%v'", field, latterField) + } + } + } + return nil +} + +// validateReaderFields is a helper to validate fields for input readers +func validateReaderFields(fields []string) error { + if err := validateFields(fields); err != nil { + return err + } + if len(fields) == 1 { + log.Logvf(log.Info, "using field: %v", fields[0]) + } else { + log.Logvf(log.Info, "using fields: %v", strings.Join(fields, ",")) + } + return nil +} + +// processDocuments reads from the Converter channel and for each record, converts it +// to a bson.D document before sending it on the processedDocumentChan channel. Once the +// input channel is closed the processed channel is also closed if the worker streams its +// reads in order +func (iw *importWorker) processDocuments(ordered bool) error { + if ordered { + defer close(iw.processedDocumentChan) + } + for { + select { + case converter, alive := <-iw.unprocessedDataChan: + if !alive { + return nil + } + document, err := converter.Convert() + if err != nil { + return err + } + if document == nil { + continue + } + iw.processedDocumentChan <- document + case <-iw.tomb.Dying(): + return nil + } + } +} diff --git a/src/mongo/gotools/mongoimport/common_test.go b/src/mongo/gotools/mongoimport/common_test.go new file mode 100644 index 00000000000..4745333a927 --- /dev/null +++ b/src/mongo/gotools/mongoimport/common_test.go @@ -0,0 +1,600 @@ +package mongoimport + +import ( + "fmt" + "io" + "testing" + + "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" + "gopkg.in/tomb.v2" +) + +func init() { + log.SetVerbosity(&options.Verbosity{ + VLevel: 4, + }) +} + +var ( + index = uint64(0) + csvConverters = []CSVConverter{ + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field2", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field3", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"a", "b", "c"}, + index: index, + }, + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field4", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field5", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field6", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"d", "e", "f"}, + index: index, + }, + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field7", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field8", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field9", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"d", "e", "f"}, + index: index, + }, + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field10", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field11", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field12", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"d", "e", "f"}, + index: index, + }, + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field13", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field14", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field15", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"d", "e", "f"}, + index: index, + }, + } + expectedDocuments = []bson.D{ + { + {"field1", "a"}, + {"field2", "b"}, + {"field3", "c"}, + }, { + {"field4", "d"}, + {"field5", "e"}, + {"field6", "f"}, + }, { + {"field7", "d"}, + {"field8", "e"}, + {"field9", "f"}, + }, { + {"field10", "d"}, + {"field11", "e"}, + {"field12", "f"}, + }, { + {"field13", "d"}, + {"field14", "e"}, + {"field15", "f"}, + }, + } +) + +func convertBSONDToRaw(documents []bson.D) []bson.Raw { + rawBSONDocuments := []bson.Raw{} + for _, document := range documents { + rawBytes, err := bson.Marshal(document) + So(err, ShouldBeNil) + rawBSONDocuments = append(rawBSONDocuments, bson.Raw{3, rawBytes}) + } + return rawBSONDocuments +} + +func TestValidateFields(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given an import input, in validating the headers", t, func() { + Convey("if the fields contain '..', an error should be thrown", func() { + So(validateFields([]string{"a..a"}), ShouldNotBeNil) + }) + Convey("if the fields start/end in a '.', an error should be thrown", func() { + So(validateFields([]string{".a"}), ShouldNotBeNil) + So(validateFields([]string{"a."}), ShouldNotBeNil) + }) + Convey("if the fields start in a '$', an error should be thrown", func() { + So(validateFields([]string{"$.a"}), ShouldNotBeNil) + So(validateFields([]string{"$"}), ShouldNotBeNil) + So(validateFields([]string{"$a"}), ShouldNotBeNil) + So(validateFields([]string{"a$a"}), ShouldBeNil) + }) + Convey("if the fields collide, an error should be thrown", func() { + So(validateFields([]string{"a", "a.a"}), ShouldNotBeNil) + So(validateFields([]string{"a", "a.ba", "b.a"}), ShouldNotBeNil) + So(validateFields([]string{"a", "a.ba", "b.a"}), ShouldNotBeNil) + So(validateFields([]string{"a", "a.b.c"}), ShouldNotBeNil) + }) + Convey("if the fields don't collide, no error should be thrown", func() { + So(validateFields([]string{"a", "aa"}), ShouldBeNil) + So(validateFields([]string{"a", "aa", "b.a", "b.c"}), ShouldBeNil) + So(validateFields([]string{"a", "ba", "ab", "b.a"}), ShouldBeNil) + So(validateFields([]string{"a", "ba", "ab", "b.a", "b.c.d"}), ShouldBeNil) + So(validateFields([]string{"a", "ab.c"}), ShouldBeNil) + }) + Convey("if the fields contain the same keys, an error should be thrown", func() { + So(validateFields([]string{"a", "ba", "a"}), ShouldNotBeNil) + }) + }) +} + +func TestGetUpsertValue(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given a field and a BSON document, on calling getUpsertValue", t, func() { + Convey("the value of the key should be correct for unnested documents", func() { + bsonDocument := bson.D{{"a", 3}} + So(getUpsertValue("a", bsonDocument), ShouldEqual, 3) + }) + Convey("the value of the key should be correct for nested document fields", func() { + inner := bson.D{{"b", 4}} + bsonDocument := bson.D{{"a", inner}} + So(getUpsertValue("a.b", bsonDocument), ShouldEqual, 4) + }) + Convey("the value of the key should be nil for unnested document "+ + "fields that do not exist", func() { + bsonDocument := bson.D{{"a", 4}} + So(getUpsertValue("c", bsonDocument), ShouldBeNil) + }) + Convey("the value of the key should be nil for nested document "+ + "fields that do not exist", func() { + inner := bson.D{{"b", 4}} + bsonDocument := bson.D{{"a", inner}} + So(getUpsertValue("a.c", bsonDocument), ShouldBeNil) + }) + Convey("the value of the key should be nil for nil document values", func() { + So(getUpsertValue("a", bson.D{{"a", nil}}), ShouldBeNil) + }) + }) +} + +func TestConstructUpsertDocument(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given a set of upsert fields and a BSON document, on calling "+ + "constructUpsertDocument", t, func() { + Convey("the key/value combination in the upsert document should be "+ + "correct for unnested documents with single fields", func() { + bsonDocument := bson.D{{"a", 3}} + upsertFields := []string{"a"} + upsertDocument := constructUpsertDocument(upsertFields, + bsonDocument) + So(upsertDocument, ShouldResemble, bsonDocument) + }) + Convey("the key/value combination in the upsert document should be "+ + "correct for unnested documents with several fields", func() { + bsonDocument := bson.D{{"a", 3}, {"b", "string value"}} + upsertFields := []string{"a"} + expectedDocument := bson.D{{"a", 3}} + upsertDocument := constructUpsertDocument(upsertFields, + bsonDocument) + So(upsertDocument, ShouldResemble, expectedDocument) + }) + Convey("the key/value combination in the upsert document should be "+ + "correct for nested documents with several fields", func() { + inner := bson.D{{testCollection, 4}} + bsonDocument := bson.D{{"a", inner}, {"b", "string value"}} + upsertFields := []string{"a.c"} + expectedDocument := bson.D{{"a.c", 4}} + upsertDocument := constructUpsertDocument(upsertFields, + bsonDocument) + So(upsertDocument, ShouldResemble, expectedDocument) + }) + Convey("the upsert document should be nil if the key does not exist "+ + "in the BSON document", func() { + bsonDocument := bson.D{{"a", 3}, {"b", "string value"}} + upsertFields := []string{testCollection} + upsertDocument := constructUpsertDocument(upsertFields, bsonDocument) + So(upsertDocument, ShouldBeNil) + }) + }) +} + +func TestSetNestedValue(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given a field, its value, and an existing BSON document...", t, func() { + b := bson.D{{"c", "d"}} + currentDocument := bson.D{ + {"a", 3}, + {"b", &b}, + } + Convey("ensure top level fields are set and others, unchanged", func() { + testDocument := ¤tDocument + expectedDocument := bson.DocElem{"c", 4} + setNestedValue("c", 4, testDocument) + newDocument := *testDocument + So(len(newDocument), ShouldEqual, 3) + So(newDocument[2], ShouldResemble, expectedDocument) + }) + Convey("ensure new nested top-level fields are set and others, unchanged", func() { + testDocument := ¤tDocument + expectedDocument := bson.D{{"b", "4"}} + setNestedValue("c.b", "4", testDocument) + newDocument := *testDocument + So(len(newDocument), ShouldEqual, 3) + So(newDocument[2].Name, ShouldResemble, "c") + So(*newDocument[2].Value.(*bson.D), ShouldResemble, expectedDocument) + }) + Convey("ensure existing nested level fields are set and others, unchanged", func() { + testDocument := ¤tDocument + expectedDocument := bson.D{{"c", "d"}, {"d", 9}} + setNestedValue("b.d", 9, testDocument) + newDocument := *testDocument + So(len(newDocument), ShouldEqual, 2) + So(newDocument[1].Name, ShouldResemble, "b") + So(*newDocument[1].Value.(*bson.D), ShouldResemble, expectedDocument) + }) + Convey("ensure subsequent calls update fields accordingly", func() { + testDocument := ¤tDocument + expectedDocumentOne := bson.D{{"c", "d"}, {"d", 9}} + expectedDocumentTwo := bson.DocElem{"f", 23} + setNestedValue("b.d", 9, testDocument) + newDocument := *testDocument + So(len(newDocument), ShouldEqual, 2) + So(newDocument[1].Name, ShouldResemble, "b") + So(*newDocument[1].Value.(*bson.D), ShouldResemble, expectedDocumentOne) + setNestedValue("f", 23, testDocument) + newDocument = *testDocument + So(len(newDocument), ShouldEqual, 3) + So(newDocument[2], ShouldResemble, expectedDocumentTwo) + }) + }) +} + +func TestRemoveBlankFields(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given an unordered BSON document", t, func() { + Convey("the same document should be returned if there are no blanks", func() { + bsonDocument := bson.D{{"a", 3}, {"b", "hello"}} + So(removeBlankFields(bsonDocument), ShouldResemble, bsonDocument) + }) + Convey("a new document without blanks should be returned if there are "+ + " blanks", func() { + d := bson.D{ + {"a", ""}, + {"b", ""}, + } + e := bson.D{ + {"a", ""}, + {"b", 1}, + } + bsonDocument := bson.D{ + {"a", 0}, + {"b", ""}, + {"c", ""}, + {"d", &d}, + {"e", &e}, + } + inner := bson.D{ + {"b", 1}, + } + expectedDocument := bson.D{ + {"a", 0}, + {"e", inner}, + } + So(removeBlankFields(bsonDocument), ShouldResemble, expectedDocument) + }) + }) +} + +func TestTokensToBSON(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given an slice of column specs and tokens to convert to BSON", t, func() { + Convey("the expected ordered BSON should be produced for the given"+ + "column specs and tokens", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + tokens := []string{"1", "2", "hello"} + expectedDocument := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", "hello"}, + } + bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0), false) + So(err, ShouldBeNil) + So(bsonD, ShouldResemble, expectedDocument) + }) + Convey("if there are more tokens than fields, additional fields should be prefixed"+ + " with 'fields' and an index indicating the header number", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + tokens := []string{"1", "2", "hello", "mongodb", "user"} + expectedDocument := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", "hello"}, + {"field3", "mongodb"}, + {"field4", "user"}, + } + bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0), false) + So(err, ShouldBeNil) + So(bsonD, ShouldResemble, expectedDocument) + }) + Convey("an error should be thrown if duplicate headers are found", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field3", new(FieldAutoParser), pgAutoCast, "auto"}, + } + tokens := []string{"1", "2", "hello", "mongodb", "user"} + _, err := tokensToBSON(colSpecs, tokens, uint64(0), false) + So(err, ShouldNotBeNil) + }) + Convey("fields with nested values should be set appropriately", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c.a", new(FieldAutoParser), pgAutoCast, "auto"}, + } + tokens := []string{"1", "2", "hello"} + c := bson.D{ + {"a", "hello"}, + } + expectedDocument := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", c}, + } + bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0), false) + So(err, ShouldBeNil) + So(expectedDocument[0].Name, ShouldResemble, bsonD[0].Name) + So(expectedDocument[0].Value, ShouldResemble, bsonD[0].Value) + So(expectedDocument[1].Name, ShouldResemble, bsonD[1].Name) + So(expectedDocument[1].Value, ShouldResemble, bsonD[1].Value) + So(expectedDocument[2].Name, ShouldResemble, bsonD[2].Name) + So(expectedDocument[2].Value, ShouldResemble, *bsonD[2].Value.(*bson.D)) + }) + }) +} + +func TestProcessDocuments(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given an import worker", t, func() { + index := uint64(0) + csvConverters := []CSVConverter{ + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field2", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field3", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"a", "b", "c"}, + index: index, + }, + CSVConverter{ + colSpecs: []ColumnSpec{ + {"field4", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field5", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field6", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"d", "e", "f"}, + index: index, + }, + } + expectedDocuments := []bson.D{ + { + {"field1", "a"}, + {"field2", "b"}, + {"field3", "c"}, + }, { + {"field4", "d"}, + {"field5", "e"}, + {"field6", "f"}, + }, + } + Convey("processDocuments should execute the expected conversion for documents, "+ + "pass then on the output channel, and close the input channel if ordered is true", func() { + inputChannel := make(chan Converter, 100) + outputChannel := make(chan bson.D, 100) + iw := &importWorker{ + unprocessedDataChan: inputChannel, + processedDocumentChan: outputChannel, + tomb: &tomb.Tomb{}, + } + inputChannel <- csvConverters[0] + inputChannel <- csvConverters[1] + close(inputChannel) + So(iw.processDocuments(true), ShouldBeNil) + doc1, open := <-outputChannel + So(doc1, ShouldResemble, expectedDocuments[0]) + So(open, ShouldEqual, true) + doc2, open := <-outputChannel + So(doc2, ShouldResemble, expectedDocuments[1]) + So(open, ShouldEqual, true) + _, open = <-outputChannel + So(open, ShouldEqual, false) + }) + Convey("processDocuments should execute the expected conversion for documents, "+ + "pass then on the output channel, and leave the input channel open if ordered is false", func() { + inputChannel := make(chan Converter, 100) + outputChannel := make(chan bson.D, 100) + iw := &importWorker{ + unprocessedDataChan: inputChannel, + processedDocumentChan: outputChannel, + tomb: &tomb.Tomb{}, + } + inputChannel <- csvConverters[0] + inputChannel <- csvConverters[1] + close(inputChannel) + So(iw.processDocuments(false), ShouldBeNil) + doc1, open := <-outputChannel + So(doc1, ShouldResemble, expectedDocuments[0]) + So(open, ShouldEqual, true) + doc2, open := <-outputChannel + So(doc2, ShouldResemble, expectedDocuments[1]) + So(open, ShouldEqual, true) + // close will throw a runtime error if outputChannel is already closed + close(outputChannel) + }) + }) +} + +func TestDoSequentialStreaming(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given some import workers, a Converters input channel and an bson.D output channel", t, func() { + inputChannel := make(chan Converter, 5) + outputChannel := make(chan bson.D, 5) + workerInputChannel := []chan Converter{ + make(chan Converter), + make(chan Converter), + } + workerOutputChannel := []chan bson.D{ + make(chan bson.D), + make(chan bson.D), + } + importWorkers := []*importWorker{ + &importWorker{ + unprocessedDataChan: workerInputChannel[0], + processedDocumentChan: workerOutputChannel[0], + tomb: &tomb.Tomb{}, + }, + &importWorker{ + unprocessedDataChan: workerInputChannel[1], + processedDocumentChan: workerOutputChannel[1], + tomb: &tomb.Tomb{}, + }, + } + Convey("documents moving through the input channel should be processed and returned in sequence", func() { + // start goroutines to do sequential processing + for _, iw := range importWorkers { + go iw.processDocuments(true) + } + // feed in a bunch of documents + for _, inputCSVDocument := range csvConverters { + inputChannel <- inputCSVDocument + } + close(inputChannel) + doSequentialStreaming(importWorkers, inputChannel, outputChannel) + for _, document := range expectedDocuments { + So(<-outputChannel, ShouldResemble, document) + } + }) + }) +} + +func TestStreamDocuments(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey(`Given: + 1. a boolean indicating streaming order + 2. an input channel where documents are streamed in + 3. an output channel where processed documents are streamed out`, t, func() { + + inputChannel := make(chan Converter, 5) + outputChannel := make(chan bson.D, 5) + + Convey("the entire pipeline should complete without error under normal circumstances", func() { + // stream in some documents + for _, csvConverter := range csvConverters { + inputChannel <- csvConverter + } + close(inputChannel) + So(streamDocuments(true, 3, inputChannel, outputChannel), ShouldBeNil) + + // ensure documents are streamed out and processed in the correct manner + for _, expectedDocument := range expectedDocuments { + So(<-outputChannel, ShouldResemble, expectedDocument) + } + }) + Convey("the entire pipeline should complete with error if an error is encountered", func() { + // stream in some documents - create duplicate headers to simulate an error + csvConverter := CSVConverter{ + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field2", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"a", "b", "c"}, + index: uint64(0), + } + inputChannel <- csvConverter + close(inputChannel) + + // ensure that an error is returned on the error channel + So(streamDocuments(true, 3, inputChannel, outputChannel), ShouldNotBeNil) + }) + }) +} + +func TestChannelQuorumError(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("Given a channel and a quorum...", t, func() { + Convey("an error should be returned if one is received", func() { + ch := make(chan error, 2) + ch <- nil + ch <- io.EOF + So(channelQuorumError(ch, 2), ShouldNotBeNil) + }) + Convey("no error should be returned if none is received", func() { + ch := make(chan error, 2) + ch <- nil + ch <- nil + So(channelQuorumError(ch, 2), ShouldBeNil) + }) + Convey("no error should be returned if up to quorum nil errors are received", func() { + ch := make(chan error, 3) + ch <- nil + ch <- nil + ch <- io.EOF + So(channelQuorumError(ch, 2), ShouldBeNil) + }) + }) +} + +func TestFilterIngestError(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given a boolean 'stopOnError' and an error...", t, func() { + + Convey("an error should be returned if stopOnError is true the err is not nil", func() { + So(filterIngestError(true, fmt.Errorf("")), ShouldNotBeNil) + }) + + Convey("errLostConnection should be returned if stopOnError is true the err is io.EOF", func() { + So(filterIngestError(true, io.EOF).Error(), ShouldEqual, db.ErrLostConnection) + }) + + Convey("no error should be returned if stopOnError is false the err is not nil", func() { + So(filterIngestError(false, fmt.Errorf("")), ShouldBeNil) + }) + + Convey("no error should be returned if stopOnError is false the err is nil", func() { + So(filterIngestError(false, nil), ShouldBeNil) + }) + + Convey("no error should be returned if stopOnError is true the err is nil", func() { + So(filterIngestError(true, nil), ShouldBeNil) + }) + }) +} diff --git a/src/mongo/gotools/mongoimport/csv.go b/src/mongo/gotools/mongoimport/csv.go new file mode 100644 index 00000000000..3e66d034ddc --- /dev/null +++ b/src/mongo/gotools/mongoimport/csv.go @@ -0,0 +1,151 @@ +package mongoimport + +import ( + gocsv "encoding/csv" + "fmt" + "io" + + "github.com/mongodb/mongo-tools/mongoimport/csv" + "gopkg.in/mgo.v2/bson" +) + +// CSVInputReader implements the InputReader interface for CSV input types. +type CSVInputReader struct { + // colSpecs is a list of column specifications in the BSON documents to be imported + colSpecs []ColumnSpec + + // csvReader is the underlying reader used to read data in from the CSV or CSV file + csvReader *csv.Reader + + // csvRejectWriter is where coercion-failed rows are written, if applicable + csvRejectWriter *gocsv.Writer + + // csvRecord stores each line of input we read from the underlying reader + csvRecord []string + + // numProcessed tracks the number of CSV records processed by the underlying reader + numProcessed uint64 + + // numDecoders is the number of concurrent goroutines to use for decoding + numDecoders int + + // embedded sizeTracker exposes the Size() method to check the number of bytes read so far + sizeTracker + + // ignoreBlanks is whether empty fields should be ignored + ignoreBlanks bool +} + +// CSVConverter implements the Converter interface for CSV input. +type CSVConverter struct { + colSpecs []ColumnSpec + data []string + index uint64 + ignoreBlanks bool + rejectWriter *gocsv.Writer +} + +// NewCSVInputReader returns a CSVInputReader configured to read data from the +// given io.Reader, extracting only the specified columns using exactly "numDecoders" +// goroutines. +func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, numDecoders int, ignoreBlanks bool) *CSVInputReader { + szCount := newSizeTrackingReader(newBomDiscardingReader(in)) + csvReader := csv.NewReader(szCount) + // allow variable number of colSpecs in document + csvReader.FieldsPerRecord = -1 + csvReader.TrimLeadingSpace = true + return &CSVInputReader{ + colSpecs: colSpecs, + csvReader: csvReader, + csvRejectWriter: gocsv.NewWriter(rejects), + numProcessed: uint64(0), + numDecoders: numDecoders, + sizeTracker: szCount, + ignoreBlanks: ignoreBlanks, + } +} + +// ReadAndValidateHeader reads the header from the underlying reader and validates +// the header fields. It sets err if the read/validation fails. +func (r *CSVInputReader) ReadAndValidateHeader() (err error) { + fields, err := r.csvReader.Read() + if err != nil { + return err + } + r.colSpecs = ParseAutoHeaders(fields) + return validateReaderFields(ColumnNames(r.colSpecs)) +} + +// ReadAndValidateHeader reads the header from the underlying reader and validates +// the header fields. It sets err if the read/validation fails. +func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { + fields, err := r.csvReader.Read() + if err != nil { + return err + } + r.colSpecs, err = ParseTypedHeaders(fields, parseGrace) + if err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs)) +} + +// StreamDocument takes a boolean indicating if the documents should be streamed +// in read order and a channel on which to stream the documents processed from +// the underlying reader. Returns a non-nil error if streaming fails. +func (r *CSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (retErr error) { + csvRecordChan := make(chan Converter, r.numDecoders) + csvErrChan := make(chan error) + + // begin reading from source + go func() { + var err error + for { + r.csvRecord, err = r.csvReader.Read() + if err != nil { + close(csvRecordChan) + if err == io.EOF { + csvErrChan <- nil + } else { + r.numProcessed++ + csvErrChan <- fmt.Errorf("read error on entry #%v: %v", r.numProcessed, err) + } + return + } + csvRecordChan <- CSVConverter{ + colSpecs: r.colSpecs, + data: r.csvRecord, + index: r.numProcessed, + ignoreBlanks: r.ignoreBlanks, + rejectWriter: r.csvRejectWriter, + } + r.numProcessed++ + } + }() + + go func() { + csvErrChan <- streamDocuments(ordered, r.numDecoders, csvRecordChan, readDocs) + }() + + return channelQuorumError(csvErrChan, 2) +} + +// Convert implements the Converter interface for CSV input. It converts a +// CSVConverter struct to a BSON document. +func (c CSVConverter) Convert() (b bson.D, err error) { + b, err = tokensToBSON( + c.colSpecs, + c.data, + c.index, + c.ignoreBlanks, + ) + if _, ok := err.(coercionError); ok { + c.Print() + err = nil + } + return +} + +func (c CSVConverter) Print() { + c.rejectWriter.Write(c.data) +} diff --git a/src/mongo/gotools/mongoimport/csv/reader.go b/src/mongo/gotools/mongoimport/csv/reader.go new file mode 100644 index 00000000000..aad3c4a32c3 --- /dev/null +++ b/src/mongo/gotools/mongoimport/csv/reader.go @@ -0,0 +1,363 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package csv reads and writes comma-separated values (CSV) files. +// +// A csv file contains zero or more records of one or more fields per record. +// Each record is separated by the newline character. The final record may +// optionally be followed by a newline character. +// +// field1,field2,field3 +// +// White space is considered part of a field. +// +// Carriage returns before newline characters are silently removed. +// +// Blank lines are ignored. A line with only whitespace characters (excluding +// the ending newline character) is not considered a blank line. +// +// Fields which start and stop with the quote character " are called +// quoted-fields. The beginning and ending quote are not part of the +// field. +// +// The source: +// +// normal string,"quoted-field" +// +// results in the fields +// +// {`normal string`, `quoted-field`} +// +// Within a quoted-field a quote character followed by a second quote +// character is considered a single quote. +// +// "the ""word"" is true","a ""quoted-field""" +// +// results in +// +// {`the "word" is true`, `a "quoted-field"`} +// +// Newlines and commas may be included in a quoted-field +// +// "Multi-line +// field","comma is ," +// +// results in +// +// {`Multi-line +// field`, `comma is ,`} +package csv + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "unicode" +) + +// A ParseError is returned for parsing errors. +// The first line is 1. The first column is 0. +type ParseError struct { + Line int // Line where the error occurred + Column int // Column (rune index) where the error occurred + Err error // The actual error +} + +func (e *ParseError) Error() string { + return fmt.Sprintf("line %d, column %d: %s", e.Line, e.Column, e.Err) +} + +// These are the errors that can be returned in ParseError.Error +var ( + ErrTrailingComma = errors.New("extra delimiter at end of line") // no longer used + ErrBareQuote = errors.New("bare \" in non-quoted-field") + ErrQuote = errors.New("extraneous \" in field") + ErrFieldCount = errors.New("wrong number of fields in line") +) + +// A Reader reads records from a CSV-encoded file. +// +// As returned by NewReader, a Reader expects input conforming to RFC 4180. +// The exported fields can be changed to customize the details before the +// first call to Read or ReadAll. +// +// Comma is the field delimiter. It defaults to ','. +// +// Comment, if not 0, is the comment character. Lines beginning with the +// Comment character are ignored. +// +// If FieldsPerRecord is positive, Read requires each record to +// have the given number of fields. If FieldsPerRecord is 0, Read sets it to +// the number of fields in the first record, so that future records must +// have the same field count. If FieldsPerRecord is negative, no check is +// made and records may have a variable number of fields. +// +// If LazyQuotes is true, a quote may appear in an unquoted field and a +// non-doubled quote may appear in a quoted field. +// +// If TrimLeadingSpace is true, leading white space in a field is ignored. +type Reader struct { + Comma rune // field delimiter (set to ',' by NewReader) + Comment rune // comment character for start of line + FieldsPerRecord int // number of expected fields per record + LazyQuotes bool // allow lazy quotes + TrailingComma bool // ignored; here for backwards compatibility + TrimLeadingSpace bool // trim leading space + line int + column int + r *bufio.Reader + field bytes.Buffer +} + +// NewReader returns a new Reader that reads from r. +func NewReader(r io.Reader) *Reader { + return &Reader{ + Comma: ',', + r: bufio.NewReader(r), + } +} + +// error creates a new ParseError based on err. +func (r *Reader) error(err error) error { + return &ParseError{ + Line: r.line, + Column: r.column, + Err: err, + } +} + +// Read reads one record from r. The record is a slice of strings with each +// string representing one field. +func (r *Reader) Read() (record []string, err error) { + for { + record, err = r.parseRecord() + if record != nil { + break + } + if err != nil { + return nil, err + } + } + + if r.FieldsPerRecord > 0 { + if len(record) != r.FieldsPerRecord { + r.column = 0 // report at start of record + return record, r.error(ErrFieldCount) + } + } else if r.FieldsPerRecord == 0 { + r.FieldsPerRecord = len(record) + } + return record, nil +} + +// ReadAll reads all the remaining records from r. +// Each record is a slice of fields. +// A successful call returns err == nil, not err == EOF. Because ReadAll is +// defined to read until EOF, it does not treat end of file as an error to be +// reported. +func (r *Reader) ReadAll() (records [][]string, err error) { + for { + record, err := r.Read() + if err == io.EOF { + return records, nil + } + if err != nil { + return nil, err + } + records = append(records, record) + } +} + +// readRune reads one rune from r, folding \r\n to \n and keeping track +// of how far into the line we have read. r.column will point to the start +// of this rune, not the end of this rune. +func (r *Reader) readRune() (rune, error) { + r1, _, err := r.r.ReadRune() + + // Handle \r\n here. We make the simplifying assumption that + // anytime \r is followed by \n that it can be folded to \n. + // We will not detect files which contain both \r\n and bare \n. + if r1 == '\r' { + r1, _, err = r.r.ReadRune() + if err == nil { + if r1 != '\n' { + r.r.UnreadRune() + r1 = '\r' + } + } + } + r.column++ + return r1, err +} + +// skip reads runes up to and including the rune delim or until error. +func (r *Reader) skip(delim rune) error { + for { + r1, err := r.readRune() + if err != nil { + return err + } + if r1 == delim { + return nil + } + } +} + +// parseRecord reads and parses a single csv record from r. +func (r *Reader) parseRecord() (fields []string, err error) { + // Each record starts on a new line. We increment our line + // number (lines start at 1, not 0) and set column to -1 + // so as we increment in readRune it points to the character we read. + r.line++ + r.column = -1 + + // Peek at the first rune. If it is an error we are done. + // If we are support comments and it is the comment character + // then skip to the end of line. + + r1, _, err := r.r.ReadRune() + if err != nil { + return nil, err + } + + if r.Comment != 0 && r1 == r.Comment { + return nil, r.skip('\n') + } + r.r.UnreadRune() + + // At this point we have at least one field. + for { + haveField, delim, err := r.parseField() + if haveField { + fields = append(fields, r.field.String()) + } + if delim == '\n' || err == io.EOF { + return fields, err + } else if err != nil { + return nil, err + } + } +} + +// parseField parses the next field in the record. The read field is +// located in r.field. Delim is the first character not part of the field +// (r.Comma or '\n'). +func (r *Reader) parseField() (haveField bool, delim rune, err error) { + r.field.Reset() + + r1, err := r.readRune() + for err == nil && r.TrimLeadingSpace && r1 != '\n' && unicode.IsSpace(r1) { + r1, err = r.readRune() + } + + if err == io.EOF && r.column != 0 { + return true, 0, err + } + if err != nil { + return false, 0, err + } + + var ws bytes.Buffer + + switch r1 { + case r.Comma: + // will check below + + case '\n': + // We are a trailing empty field or a blank line + if r.column == 0 { + return false, r1, nil + } + return true, r1, nil + + case '"': + // quoted field + Quoted: + for { + r1, err = r.readRune() + if err != nil { + if err == io.EOF { + if r.LazyQuotes { + return true, 0, err + } + return false, 0, r.error(ErrQuote) + } + return false, 0, err + } + switch r1 { + case '"': + r1, err = r.readRune() + if err == nil && r.TrimLeadingSpace && r1 != '\n' && unicode.IsSpace(r1) { + for err == nil && r.TrimLeadingSpace && r1 != '\n' && unicode.IsSpace(r1) { + r1, err = r.readRune() + } + // we don't want '"foo" "bar",' to look like '"foo""bar"' + // which evaluates to 'foo"bar' + // so we explicitly test for the case that the trimed whitespace isn't + // followed by a '"' + if err == nil && r1 == '"' { + r.column-- + return false, 0, r.error(ErrQuote) + } + } + if err != nil || r1 == r.Comma { + break Quoted + } + if r1 == '\n' { + return true, r1, nil + } + if r1 != '"' { + if !r.LazyQuotes { + r.column-- + return false, 0, r.error(ErrQuote) + } + // accept the bare quote + r.field.WriteRune('"') + } + case '\n': + r.line++ + r.column = -1 + } + r.field.WriteRune(r1) + } + + default: + // unquoted field + for { + // only write sections of whitespace if it's followed by non-whitespace + if unicode.IsSpace(r1) { + ws.WriteRune(r1) + } else { + r.field.WriteString(ws.String()) + ws.Reset() + r.field.WriteRune(r1) + } + r1, err = r.readRune() + if err != nil || r1 == r.Comma { + break + } + if r1 == '\n' { + return true, r1, nil + } + if !r.LazyQuotes && r1 == '"' { + return false, 0, r.error(ErrBareQuote) + } + } + } + //write any remaining section of whitespace unless TrimLeadingSpace on + if !r.TrimLeadingSpace { + r.field.WriteString(ws.String()) + } + + if err != nil { + if err == io.EOF { + return true, 0, err + } + return false, 0, err + } + + return true, r1, nil +} diff --git a/src/mongo/gotools/mongoimport/csv_test.go b/src/mongo/gotools/mongoimport/csv_test.go new file mode 100644 index 00000000000..0db342d9f12 --- /dev/null +++ b/src/mongo/gotools/mongoimport/csv_test.go @@ -0,0 +1,354 @@ +package mongoimport + +import ( + "bytes" + "io" + "os" + "strings" + "testing" + + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" +) + +func init() { + log.SetVerbosity(&options.Verbosity{ + VLevel: 4, + }) +} + +func TestCSVStreamDocument(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a CSV input reader", t, func() { + Convey("badly encoded CSV should result in a parsing error", func() { + contents := `1, 2, foo"bar` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldNotBeNil) + }) + Convey("escaped quotes are parsed correctly", func() { + contents := `1, 2, "foo""bar"` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + }) + Convey("multiple escaped quotes separated by whitespace parsed correctly", func() { + contents := `1, 2, "foo"" ""bar"` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", `foo" "bar`}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + Convey("integer valued strings should be converted", func() { + contents := `1, 2, " 3e"` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", " 3e"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + Convey("extra fields should be prefixed with 'field'", func() { + contents := `1, 2f , " 3e" , " may"` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", "2f"}, + {"c", " 3e"}, + {"field3", " may"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + Convey("nested CSV fields should be imported properly", func() { + contents := `1, 2f , " 3e" , " may"` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b.c", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + b := bson.D{{"c", "2f"}} + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", b}, + {"c", " 3e"}, + {"field3", " may"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 4) + So(r.StreamDocument(true, docChan), ShouldBeNil) + + readDocument := <-docChan + So(readDocument[0], ShouldResemble, expectedRead[0]) + So(readDocument[1].Name, ShouldResemble, expectedRead[1].Name) + So(*readDocument[1].Value.(*bson.D), ShouldResemble, expectedRead[1].Value) + So(readDocument[2], ShouldResemble, expectedRead[2]) + So(readDocument[3], ShouldResemble, expectedRead[3]) + }) + Convey("whitespace separated quoted strings are still an error", func() { + contents := `1, 2, "foo" "bar"` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldNotBeNil) + }) + Convey("nested CSV fields causing header collisions should error", func() { + contents := `1, 2f , " 3e" , " may", june` + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b.c", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field3", new(FieldAutoParser), pgAutoCast, "auto"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldNotBeNil) + }) + Convey("calling StreamDocument() for CSVs should return next set of "+ + "values", func() { + contents := "1, 2, 3\n4, 5, 6" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedReadOne := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + } + expectedReadTwo := bson.D{ + {"a", int32(4)}, + {"b", int32(5)}, + {"c", int32(6)}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 2) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedReadOne) + So(<-docChan, ShouldResemble, expectedReadTwo) + }) + Convey("valid CSV input file that starts with the UTF-8 BOM should "+ + "not raise an error", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedReads := []bson.D{ + { + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + }, { + {"a", int32(4)}, + {"b", int32(5)}, + {"c", int32(6)}, + }, + } + fileHandle, err := os.Open("testdata/test_bom.csv") + So(err, ShouldBeNil) + r := NewCSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false) + docChan := make(chan bson.D, len(expectedReads)) + So(r.StreamDocument(true, docChan), ShouldBeNil) + for _, expectedRead := range expectedReads { + for i, readDocument := range <-docChan { + So(readDocument.Name, ShouldResemble, expectedRead[i].Name) + So(readDocument.Value, ShouldResemble, expectedRead[i].Value) + } + } + }) + }) +} + +func TestCSVReadAndValidateHeader(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + var err error + Convey("With a CSV input reader", t, func() { + Convey("setting the header should read the first line of the CSV", func() { + contents := "extraHeader1, extraHeader2, extraHeader3" + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + So(len(r.colSpecs), ShouldEqual, 3) + }) + + Convey("setting non-colliding nested CSV headers should not raise an error", func() { + contents := "a, b, c" + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + So(len(r.colSpecs), ShouldEqual, 3) + contents = "a.b.c, a.b.d, c" + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + So(len(r.colSpecs), ShouldEqual, 3) + + contents = "a.b, ab, a.c" + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + So(len(r.colSpecs), ShouldEqual, 3) + + contents = "a, ab, ac, dd" + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + So(len(r.colSpecs), ShouldEqual, 4) + }) + + Convey("setting colliding nested CSV headers should raise an error", func() { + contents := "a, a.b, c" + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldNotBeNil) + + contents = "a.b.c, a.b.d.c, a.b.d" + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldNotBeNil) + + contents = "a, a, a" + colSpecs = []ColumnSpec{} + r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldNotBeNil) + }) + + Convey("setting the header that ends in a dot should error", func() { + contents := "c, a., b" + colSpecs := []ColumnSpec{} + So(err, ShouldBeNil) + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil) + }) + + Convey("setting the header that starts in a dot should error", func() { + contents := "c, .a, b" + colSpecs := []ColumnSpec{} + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil) + }) + + Convey("setting the header that contains multiple consecutive dots should error", func() { + contents := "c, a..a, b" + colSpecs := []ColumnSpec{} + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil) + + contents = "c, a.a, b.b...b" + colSpecs = []ColumnSpec{} + So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil) + }) + + Convey("setting the header using an empty file should return EOF", func() { + contents := "" + colSpecs := []ColumnSpec{} + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldEqual, io.EOF) + So(len(r.colSpecs), ShouldEqual, 0) + }) + Convey("setting the header with column specs already set should replace "+ + "the existing column specs", func() { + contents := "extraHeader1,extraHeader2,extraHeader3" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + // if ReadAndValidateHeader() is called with column specs already passed + // in, the header should be replaced with the read header line + So(len(r.colSpecs), ShouldEqual, 3) + So(ColumnNames(r.colSpecs), ShouldResemble, strings.Split(contents, ",")) + }) + Convey("plain CSV input file sources should be parsed correctly and "+ + "subsequent imports should parse correctly", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedReadOne := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + } + expectedReadTwo := bson.D{ + {"a", int32(3)}, + {"b", 5.4}, + {"c", "string"}, + } + fileHandle, err := os.Open("testdata/test.csv") + So(err, ShouldBeNil) + r := NewCSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false) + docChan := make(chan bson.D, 50) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedReadOne) + So(<-docChan, ShouldResemble, expectedReadTwo) + }) + }) +} + +func TestCSVConvert(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a CSV input reader", t, func() { + Convey("calling convert on a CSVConverter should return the expected BSON document", func() { + csvConverter := CSVConverter{ + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field2", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field3", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: []string{"a", "b", "c"}, + index: uint64(0), + } + expectedDocument := bson.D{ + {"field1", "a"}, + {"field2", "b"}, + {"field3", "c"}, + } + document, err := csvConverter.Convert() + So(err, ShouldBeNil) + So(document, ShouldResemble, expectedDocument) + }) + }) +} diff --git a/src/mongo/gotools/mongoimport/dateconv/dateconv.go b/src/mongo/gotools/mongoimport/dateconv/dateconv.go new file mode 100644 index 00000000000..91fecf51ad7 --- /dev/null +++ b/src/mongo/gotools/mongoimport/dateconv/dateconv.go @@ -0,0 +1,77 @@ +package dateconv + +import ( + "strings" +) + +var ( + // msReplacers based on: + // https://msdn.microsoft.com/en-us/library/ee634398(v=sql.130).aspx + msReplacers = []string{ + "dddd", "Monday", + "ddd", "Mon", + "dd", "02", + "d", "2", + "MMMM", "January", + "MMM", "Jan", + "MM", "01", + "M", "1", + // "gg", "?", + "hh", "03", + "h", "3", + "HH", "15", + "H", "15", + "mm", "04", + "m", "4", + "ss", "05", + "s", "5", + // "f", "?", + "tt", "PM", + // "t", "?", + "yyyy", "2006", + "yyy", "2006", + "yy", "06", + // "y", "?", + "zzz", "-07:00", + "zz", "-07", + // "z", "?", + } + msStringReplacer = strings.NewReplacer(msReplacers...) +) + +// FromMS reformats a datetime layout string from the Microsoft SQL Server +// FORMAT function into go's parse format. +func FromMS(layout string) string { + return msStringReplacer.Replace(layout) +} + +var ( + // oracleReplacers based on: + // http://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements004.htm#i34924 + oracleReplacers = []string{ + "AM", "PM", + "DAY", "Monday", + "DY", "Mon", + "DD", "02", + "HH12", "03", + "HH24", "15", + "HH", "03", + "MI", "04", + "MONTH", "January", + "MON", "Jan", + "MM", "01", + "SS", "05", + "TZD", "MST", + "TZH:TZM", "-07:00", + "TZH", "-07", + "YYYY", "2006", + "YY", "06", + } + oracleStringReplacer = strings.NewReplacer(oracleReplacers...) +) + +// FromOrace reformats a datetime layout string from the Oracle Database +// TO_DATE function into go's parse format. +func FromOracle(layout string) string { + return oracleStringReplacer.Replace(strings.ToUpper(layout)) +} diff --git a/src/mongo/gotools/mongoimport/json.go b/src/mongo/gotools/mongoimport/json.go new file mode 100644 index 00000000000..caa0dce3121 --- /dev/null +++ b/src/mongo/gotools/mongoimport/json.go @@ -0,0 +1,239 @@ +package mongoimport + +import ( + "errors" + "fmt" + "io" + "strings" + + "github.com/mongodb/mongo-tools/common/bsonutil" + "github.com/mongodb/mongo-tools/common/json" + "github.com/mongodb/mongo-tools/common/log" + "gopkg.in/mgo.v2/bson" +) + +// JSONInputReader is an implementation of InputReader that reads documents +// in JSON format. +type JSONInputReader struct { + // isArray indicates if the JSON import is an array of JSON documents + // or not + isArray bool + + // decoder is used to read the next valid JSON documents from the input source + decoder *json.Decoder + + // numProcessed indicates the number of JSON documents processed + numProcessed uint64 + + // readOpeningBracket indicates if the underlying io.Reader has consumed + // an opening bracket from the input source. Used to prevent errors when + // a JSON input source contains just '[]' + readOpeningBracket bool + + // expectedByte is used to store the next expected valid character for JSON + // array imports + expectedByte byte + + // bytesFromReader is used to store the next byte read from the Reader for + // JSON array imports + bytesFromReader []byte + + // separatorReader is used for JSON arrays to look for a valid array + // separator. It is a reader consisting of the decoder's buffer and the + // underlying reader + separatorReader io.Reader + + // embedded sizeTracker exposes the Size() method to check the number of bytes read so far + sizeTracker + + // numDecoders is the number of concurrent goroutines to use for decoding + numDecoders int +} + +// JSONConverter implements the Converter interface for JSON input. +type JSONConverter struct { + data []byte + index uint64 +} + +var ( + // ErrNoOpeningBracket means that the input source did not contain any + // opening brace - returned only if --jsonArray is passed in. + ErrNoOpeningBracket = errors.New("bad JSON array format - found no " + + "opening bracket '[' in input source") + + // ErrNoClosingBracket means that the input source did not contain any + // closing brace - returned only if --jsonArray is passed in. + ErrNoClosingBracket = errors.New("bad JSON array format - found no " + + "closing bracket ']' in input source") +) + +// NewJSONInputReader creates a new JSONInputReader in array mode if specified, +// configured to read data to the given io.Reader. +func NewJSONInputReader(isArray bool, in io.Reader, numDecoders int) *JSONInputReader { + szCount := newSizeTrackingReader(newBomDiscardingReader(in)) + return &JSONInputReader{ + isArray: isArray, + sizeTracker: szCount, + decoder: json.NewDecoder(szCount), + readOpeningBracket: false, + bytesFromReader: make([]byte, 1), + numDecoders: numDecoders, + } +} + +// ReadAndValidateHeader is a no-op for JSON imports; always returns nil. +func (r *JSONInputReader) ReadAndValidateHeader() error { + return nil +} + +// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil. +func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error { + return nil +} + +// StreamDocument takes a boolean indicating if the documents should be streamed +// in read order and a channel on which to stream the documents processed from +// the underlying reader. Returns a non-nil error if encountered +func (r *JSONInputReader) StreamDocument(ordered bool, readChan chan bson.D) (retErr error) { + rawChan := make(chan Converter, r.numDecoders) + jsonErrChan := make(chan error) + + // begin reading from source + go func() { + var err error + for { + if r.isArray { + if err = r.readJSONArraySeparator(); err != nil { + close(rawChan) + if err == io.EOF { + jsonErrChan <- nil + } else { + r.numProcessed++ + jsonErrChan <- fmt.Errorf("error reading separator after document #%v: %v", r.numProcessed, err) + } + return + } + } + rawBytes, err := r.decoder.ScanObject() + if err != nil { + close(rawChan) + if err == io.EOF { + jsonErrChan <- nil + } else { + r.numProcessed++ + jsonErrChan <- fmt.Errorf("error processing document #%v: %v", r.numProcessed, err) + } + return + } + rawChan <- JSONConverter{ + data: rawBytes, + index: r.numProcessed, + } + r.numProcessed++ + } + }() + + // begin processing read bytes + go func() { + jsonErrChan <- streamDocuments(ordered, r.numDecoders, rawChan, readChan) + }() + + return channelQuorumError(jsonErrChan, 2) +} + +// Convert implements the Converter interface for JSON input. It converts a +// JSONConverter struct to a BSON document. +func (c JSONConverter) Convert() (bson.D, error) { + document, err := json.UnmarshalBsonD(c.data) + if err != nil { + return nil, fmt.Errorf("error unmarshaling bytes on document #%v: %v", c.index, err) + } + log.Logvf(log.DebugHigh, "got line: %v", document) + + bsonD, err := bsonutil.GetExtendedBsonD(document) + if err != nil { + return nil, fmt.Errorf("error getting extended BSON for document #%v: %v", c.index, err) + } + log.Logvf(log.DebugHigh, "got extended line: %#v", bsonD) + return bsonD, nil +} + +// readJSONArraySeparator is a helper method used to process JSON arrays. It is +// used to read any of the valid separators for a JSON array and flag invalid +// characters. +// +// It will read a byte at a time until it finds an expected character after +// which it returns control to the caller. +// +// It will also return immediately if it finds any error (including EOF). If it +// reads a JSON_ARRAY_END byte, as a validity check it will continue to scan the +// input source until it hits an error (including EOF) to ensure the entire +// input source content is a valid JSON array +func (r *JSONInputReader) readJSONArraySeparator() error { + r.expectedByte = json.ArraySep + if r.numProcessed == 0 { + r.expectedByte = json.ArrayStart + } + + var readByte byte + scanp := 0 + + separatorReader := io.MultiReader( + r.decoder.Buffered(), + r.decoder.R, + ) + for readByte != r.expectedByte { + n, err := separatorReader.Read(r.bytesFromReader) + scanp += n + if n == 0 || err != nil { + if err == io.EOF { + return ErrNoClosingBracket + } + return err + } + readByte = r.bytesFromReader[0] + + if readByte == json.ArrayEnd { + // if we read the end of the JSON array, ensure we have no other + // non-whitespace characters at the end of the array + for { + _, err = separatorReader.Read(r.bytesFromReader) + if err != nil { + // takes care of the '[]' case + if !r.readOpeningBracket { + return ErrNoOpeningBracket + } + return err + } + readString := string(r.bytesFromReader[0]) + if strings.TrimSpace(readString) != "" { + return fmt.Errorf("bad JSON array format - found '%v' "+ + "after '%v' in input source", readString, + string(json.ArrayEnd)) + } + } + } + + // this will catch any invalid inter JSON object byte that occurs in the + // input source + if !(readByte == json.ArraySep || + strings.TrimSpace(string(readByte)) == "" || + readByte == json.ArrayStart || + readByte == json.ArrayEnd) { + if r.expectedByte == json.ArrayStart { + return ErrNoOpeningBracket + } + return fmt.Errorf("bad JSON array format - found '%v' outside "+ + "JSON object/array in input source", string(readByte)) + } + } + // adjust the buffer to account for read bytes + if scanp < len(r.decoder.Buf) { + r.decoder.Buf = r.decoder.Buf[scanp:] + } else { + r.decoder.Buf = []byte{} + } + r.readOpeningBracket = true + return nil +} diff --git a/src/mongo/gotools/mongoimport/json_test.go b/src/mongo/gotools/mongoimport/json_test.go new file mode 100644 index 00000000000..597b282856b --- /dev/null +++ b/src/mongo/gotools/mongoimport/json_test.go @@ -0,0 +1,264 @@ +package mongoimport + +import ( + "bytes" + "io" + "os" + "testing" + + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" +) + +func TestJSONArrayStreamDocument(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a JSON array input reader", t, func() { + var jsonFile, fileHandle *os.File + Convey("an error should be thrown if a plain JSON document is supplied", func() { + contents := `{"a": "ae"}` + r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil) + }) + + Convey("reading a JSON object that has no opening bracket should "+ + "error out", func() { + contents := `{"a":3},{"b":4}]` + r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil) + }) + + Convey("JSON arrays that do not end with a closing bracket should "+ + "error out", func() { + contents := `[{"a": "ae"}` + r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldNotBeNil) + // though first read should be fine + So(<-docChan, ShouldResemble, bson.D{{"a", "ae"}}) + }) + + Convey("an error should be thrown if a plain JSON file is supplied", func() { + fileHandle, err := os.Open("testdata/test_plain.json") + So(err, ShouldBeNil) + r := NewJSONInputReader(true, fileHandle, 1) + So(r.StreamDocument(true, make(chan bson.D, 50)), ShouldNotBeNil) + }) + + Convey("array JSON input file sources should be parsed correctly and "+ + "subsequent imports should parse correctly", func() { + // TODO: currently parses JSON as floats and not ints + expectedReadOne := bson.D{ + {"a", 1.2}, + {"b", "a"}, + {"c", 0.4}, + } + expectedReadTwo := bson.D{ + {"a", 2.4}, + {"b", "string"}, + {"c", 52.9}, + } + fileHandle, err := os.Open("testdata/test_array.json") + So(err, ShouldBeNil) + r := NewJSONInputReader(true, fileHandle, 1) + docChan := make(chan bson.D, 50) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedReadOne) + So(<-docChan, ShouldResemble, expectedReadTwo) + }) + + Reset(func() { + jsonFile.Close() + fileHandle.Close() + }) + }) +} + +func TestJSONPlainStreamDocument(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a plain JSON input reader", t, func() { + var jsonFile, fileHandle *os.File + Convey("string valued JSON documents should be imported properly", func() { + contents := `{"a": "ae"}` + expectedRead := bson.D{{"a", "ae"}} + r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("several string valued JSON documents should be imported "+ + "properly", func() { + contents := `{"a": "ae"}{"b": "dc"}` + expectedReadOne := bson.D{{"a", "ae"}} + expectedReadTwo := bson.D{{"b", "dc"}} + r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1) + docChan := make(chan bson.D, 2) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedReadOne) + So(<-docChan, ShouldResemble, expectedReadTwo) + }) + + Convey("number valued JSON documents should be imported properly", func() { + contents := `{"a": "ae", "b": 2.0}` + expectedRead := bson.D{{"a", "ae"}, {"b", 2.0}} + r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("JSON arrays should return an error", func() { + contents := `[{"a": "ae", "b": 2.0}]` + r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1) + So(r.StreamDocument(true, make(chan bson.D, 50)), ShouldNotBeNil) + }) + + Convey("plain JSON input file sources should be parsed correctly and "+ + "subsequent imports should parse correctly", func() { + expectedReads := []bson.D{ + { + {"a", 4}, + {"b", "string value"}, + {"c", 1}, + }, { + {"a", 5}, + {"b", "string value"}, + {"c", 2}, + }, { + {"a", 6}, + {"b", "string value"}, + {"c", 3}, + }, + } + fileHandle, err := os.Open("testdata/test_plain.json") + So(err, ShouldBeNil) + r := NewJSONInputReader(false, fileHandle, 1) + docChan := make(chan bson.D, len(expectedReads)) + So(r.StreamDocument(true, docChan), ShouldBeNil) + for i := 0; i < len(expectedReads); i++ { + for j, readDocument := range <-docChan { + So(readDocument.Name, ShouldEqual, expectedReads[i][j].Name) + So(readDocument.Value, ShouldEqual, expectedReads[i][j].Value) + } + } + }) + + Convey("reading JSON that starts with a UTF-8 BOM should not error", + func() { + expectedReads := []bson.D{ + { + {"a", 1}, + {"b", 2}, + {"c", 3}, + }, { + {"a", 4}, + {"b", 5}, + {"c", 6}, + }, + } + fileHandle, err := os.Open("testdata/test_bom.json") + So(err, ShouldBeNil) + r := NewJSONInputReader(false, fileHandle, 1) + docChan := make(chan bson.D, 2) + So(r.StreamDocument(true, docChan), ShouldBeNil) + for _, expectedRead := range expectedReads { + for i, readDocument := range <-docChan { + So(readDocument.Name, ShouldEqual, expectedRead[i].Name) + So(readDocument.Value, ShouldEqual, expectedRead[i].Value) + } + } + }) + + Reset(func() { + jsonFile.Close() + fileHandle.Close() + }) + }) +} + +func TestReadJSONArraySeparator(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With an array JSON input reader", t, func() { + Convey("reading a JSON array separator should consume [", + func() { + contents := `[{"a": "ae"}` + jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(jsonImporter.readJSONArraySeparator(), ShouldBeNil) + // at this point it should have consumed all bytes up to `{` + So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil) + }) + Convey("reading a closing JSON array separator without a "+ + "corresponding opening bracket should error out ", + func() { + contents := `]` + jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil) + }) + Convey("reading an opening JSON array separator without a "+ + "corresponding closing bracket should error out ", + func() { + contents := `[` + jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(jsonImporter.readJSONArraySeparator(), ShouldBeNil) + So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil) + }) + Convey("reading an opening JSON array separator with an ending "+ + "closing bracket should return EOF", + func() { + contents := `[]` + jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(jsonImporter.readJSONArraySeparator(), ShouldBeNil) + So(jsonImporter.readJSONArraySeparator(), ShouldEqual, io.EOF) + }) + Convey("reading an opening JSON array separator, an ending closing "+ + "bracket but then additional characters after that, should error", + func() { + contents := `[]a` + jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(jsonImporter.readJSONArraySeparator(), ShouldBeNil) + So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil) + }) + Convey("reading invalid JSON objects between valid objects should "+ + "error out", + func() { + contents := `[{"a":3}x{"b":4}]` + r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldNotBeNil) + // read first valid document + <-docChan + So(r.readJSONArraySeparator(), ShouldNotBeNil) + }) + Convey("reading invalid JSON objects after valid objects but between "+ + "valid objects should error out", + func() { + contents := `[{"a":3},b{"b":4}]` + r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil) + contents = `[{"a":3},,{"b":4}]` + r = NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1) + So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil) + }) + }) +} + +func TestJSONConvert(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a JSON input reader", t, func() { + Convey("calling convert on a JSONConverter should return the expected BSON document", func() { + jsonConverter := JSONConverter{ + data: []byte(`{field1:"a",field2:"b",field3:"c"}`), + index: uint64(0), + } + expectedDocument := bson.D{ + {"field1", "a"}, + {"field2", "b"}, + {"field3", "c"}, + } + document, err := jsonConverter.Convert() + So(err, ShouldBeNil) + So(document, ShouldResemble, expectedDocument) + }) + }) +} diff --git a/src/mongo/gotools/mongoimport/main/mongoimport.go b/src/mongo/gotools/mongoimport/main/mongoimport.go new file mode 100644 index 00000000000..50002bc5442 --- /dev/null +++ b/src/mongo/gotools/mongoimport/main/mongoimport.go @@ -0,0 +1,86 @@ +// Main package for the mongoimport tool. +package main + +import ( + "fmt" + "os" + + "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/signals" + "github.com/mongodb/mongo-tools/common/util" + "github.com/mongodb/mongo-tools/mongoimport" +) + +func main() { + // initialize command-line opts + opts := options.New("mongoimport", mongoimport.Usage, + options.EnabledOptions{Auth: true, Connection: true, Namespace: true}) + + inputOpts := &mongoimport.InputOptions{} + opts.AddOptions(inputOpts) + ingestOpts := &mongoimport.IngestOptions{} + opts.AddOptions(ingestOpts) + + args, err := opts.Parse() + if err != nil { + log.Logvf(log.Always, "error parsing command line options: %v", err) + log.Logvf(log.Always, "try 'mongoimport --help' for more information") + os.Exit(util.ExitBadOptions) + } + + log.SetVerbosity(opts.Verbosity) + signals.Handle() + + // print help, if specified + if opts.PrintHelp(false) { + return + } + + // print version, if specified + if opts.PrintVersion() { + return + } + + // connect directly, unless a replica set name is explicitly specified + _, setName := util.ParseConnectionString(opts.Host) + opts.Direct = (setName == "") + opts.ReplicaSetName = setName + + // create a session provider to connect to the db + sessionProvider, err := db.NewSessionProvider(*opts) + if err != nil { + log.Logvf(log.Always, "error connecting to host: %v", err) + os.Exit(util.ExitError) + } + sessionProvider.SetBypassDocumentValidation(ingestOpts.BypassDocumentValidation) + + m := mongoimport.MongoImport{ + ToolOptions: opts, + InputOptions: inputOpts, + IngestOptions: ingestOpts, + SessionProvider: sessionProvider, + } + + if err = m.ValidateSettings(args); err != nil { + log.Logvf(log.Always, "error validating settings: %v", err) + log.Logvf(log.Always, "try 'mongoimport --help' for more information") + os.Exit(util.ExitError) + } + + numDocs, err := m.ImportDocuments() + if !opts.Quiet { + if err != nil { + log.Logvf(log.Always, "Failed: %v", err) + } + message := fmt.Sprintf("imported 1 document") + if numDocs != 1 { + message = fmt.Sprintf("imported %v documents", numDocs) + } + log.Logvf(log.Always, message) + } + if err != nil { + os.Exit(util.ExitError) + } +} diff --git a/src/mongo/gotools/mongoimport/mongoimport.go b/src/mongo/gotools/mongoimport/mongoimport.go new file mode 100644 index 00000000000..599760cb053 --- /dev/null +++ b/src/mongo/gotools/mongoimport/mongoimport.go @@ -0,0 +1,575 @@ +// Package mongoimport allows importing content from a JSON, CSV, or TSV into a MongoDB instance. +package mongoimport + +import ( + "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/progress" + "github.com/mongodb/mongo-tools/common/util" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + "gopkg.in/tomb.v2" + + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" +) + +// Input format types accepted by mongoimport. +const ( + CSV = "csv" + TSV = "tsv" + JSON = "json" +) + +const ( + workerBufferSize = 16 + progressBarLength = 24 +) + +// MongoImport is a container for the user-specified options and +// internal state used for running mongoimport. +type MongoImport struct { + // insertionCount keeps track of how many documents have successfully + // been inserted into the database + // updated atomically, aligned at the beginning of the struct + insertionCount uint64 + + // generic mongo tool options + ToolOptions *options.ToolOptions + + // InputOptions defines options used to read data to be ingested + InputOptions *InputOptions + + // IngestOptions defines options used to ingest data into MongoDB + IngestOptions *IngestOptions + + // SessionProvider is used for connecting to the database + SessionProvider *db.SessionProvider + + // the tomb is used to synchronize ingestion goroutines and causes + // other sibling goroutines to terminate immediately if one errors out + tomb.Tomb + + // fields to use for upsert operations + upsertFields []string + + // type of node the SessionProvider is connected to + nodeType db.NodeType +} + +type InputReader interface { + // StreamDocument takes a boolean indicating if the documents should be streamed + // in read order and a channel on which to stream the documents processed from + // the underlying reader. Returns a non-nil error if encountered. + StreamDocument(ordered bool, read chan bson.D) error + + // ReadAndValidateHeader reads the header line from the InputReader and returns + // a non-nil error if the fields from the header line are invalid; returns + // nil otherwise. No-op for JSON input readers. + ReadAndValidateHeader() error + + // ReadAndValidateTypedHeader is the same as ReadAndValidateHeader, + // except it also parses types from the fields of the header. Parse errors + // will be handled according parseGrace. + ReadAndValidateTypedHeader(parseGrace ParseGrace) error + + // embedded io.Reader that tracks number of bytes read, to allow feeding into progress bar. + sizeTracker +} + +// ValidateSettings ensures that the tool specific options supplied for +// MongoImport are valid. +func (imp *MongoImport) ValidateSettings(args []string) error { + // namespace must have a valid database; if none is specified, use 'test' + if imp.ToolOptions.DB == "" { + imp.ToolOptions.DB = "test" + } + err := util.ValidateDBName(imp.ToolOptions.DB) + if err != nil { + return fmt.Errorf("invalid database name: %v", err) + } + + imp.InputOptions.Type = strings.ToLower(imp.InputOptions.Type) + // use JSON as default input type + if imp.InputOptions.Type == "" { + imp.InputOptions.Type = JSON + } else { + if !(imp.InputOptions.Type == TSV || + imp.InputOptions.Type == JSON || + imp.InputOptions.Type == CSV) { + return fmt.Errorf("unknown type %v", imp.InputOptions.Type) + } + } + + // ensure headers are supplied for CSV/TSV + if imp.InputOptions.Type == CSV || + imp.InputOptions.Type == TSV { + if !imp.InputOptions.HeaderLine { + if imp.InputOptions.Fields == nil && + imp.InputOptions.FieldFile == nil { + return fmt.Errorf("must specify --fields, --fieldFile or --headerline to import this file type") + } + if imp.InputOptions.FieldFile != nil && + *imp.InputOptions.FieldFile == "" { + return fmt.Errorf("--fieldFile can not be empty string") + } + if imp.InputOptions.Fields != nil && + imp.InputOptions.FieldFile != nil { + return fmt.Errorf("incompatible options: --fields and --fieldFile") + } + } else { + if imp.InputOptions.Fields != nil { + return fmt.Errorf("incompatible options: --fields and --headerline") + } + if imp.InputOptions.FieldFile != nil { + return fmt.Errorf("incompatible options: --fieldFile and --headerline") + } + } + + if _, err := ValidatePG(imp.InputOptions.ParseGrace); err != nil { + return err + } + } else { + // input type is JSON + if imp.InputOptions.HeaderLine { + return fmt.Errorf("can not use --headerline when input type is JSON") + } + if imp.InputOptions.Fields != nil { + return fmt.Errorf("can not use --fields when input type is JSON") + } + if imp.InputOptions.FieldFile != nil { + return fmt.Errorf("can not use --fieldFile when input type is JSON") + } + if imp.IngestOptions.IgnoreBlanks { + return fmt.Errorf("can not use --ignoreBlanks when input type is JSON") + } + if imp.InputOptions.ColumnsHaveTypes { + return fmt.Errorf("can not use --columnsHaveTypes when input type is JSON") + } + } + + if imp.IngestOptions.UpsertFields != "" { + imp.IngestOptions.Upsert = true + imp.upsertFields = strings.Split(imp.IngestOptions.UpsertFields, ",") + if err := validateFields(imp.upsertFields); err != nil { + return fmt.Errorf("invalid --upsertFields argument: %v", err) + } + } else if imp.IngestOptions.Upsert { + imp.upsertFields = []string{"_id"} + } + + if imp.IngestOptions.Upsert { + imp.IngestOptions.MaintainInsertionOrder = true + log.Logvf(log.Info, "using upsert fields: %v", imp.upsertFields) + } + + // set the number of decoding workers to use for imports + if imp.IngestOptions.NumDecodingWorkers <= 0 { + imp.IngestOptions.NumDecodingWorkers = imp.ToolOptions.MaxProcs + } + log.Logvf(log.DebugLow, "using %v decoding workers", imp.IngestOptions.NumDecodingWorkers) + + // set the number of insertion workers to use for imports + if imp.IngestOptions.NumInsertionWorkers <= 0 { + imp.IngestOptions.NumInsertionWorkers = 1 + } + + log.Logvf(log.DebugLow, "using %v insert workers", imp.IngestOptions.NumInsertionWorkers) + + // if --maintainInsertionOrder is set, we can only allow 1 insertion worker + if imp.IngestOptions.MaintainInsertionOrder { + imp.IngestOptions.NumInsertionWorkers = 1 + } + + // get the number of documents per batch + if imp.IngestOptions.BulkBufferSize <= 0 || imp.IngestOptions.BulkBufferSize > 1000 { + imp.IngestOptions.BulkBufferSize = 1000 + } + + // ensure no more than one positional argument is supplied + if len(args) > 1 { + return fmt.Errorf("only one positional argument is allowed") + } + + // ensure either a positional argument is supplied or an argument is passed + // to the --file flag - and not both + if imp.InputOptions.File != "" && len(args) != 0 { + return fmt.Errorf("incompatible options: --file and positional argument(s)") + } + + if imp.InputOptions.File == "" { + if len(args) != 0 { + // if --file is not supplied, use the positional argument supplied + imp.InputOptions.File = args[0] + } + } + + // ensure we have a valid string to use for the collection + if imp.ToolOptions.Collection == "" { + log.Logvf(log.Always, "no collection specified") + fileBaseName := filepath.Base(imp.InputOptions.File) + lastDotIndex := strings.LastIndex(fileBaseName, ".") + if lastDotIndex != -1 { + fileBaseName = fileBaseName[0:lastDotIndex] + } + log.Logvf(log.Always, "using filename '%v' as collection", fileBaseName) + imp.ToolOptions.Collection = fileBaseName + } + err = util.ValidateCollectionName(imp.ToolOptions.Collection) + if err != nil { + return fmt.Errorf("invalid collection name: %v", err) + } + return nil +} + +// getSourceReader returns an io.Reader to read from the input source. Also +// returns a progress.Progressor which can be used to track progress if the +// reader supports it. +func (imp *MongoImport) getSourceReader() (io.ReadCloser, int64, error) { + if imp.InputOptions.File != "" { + file, err := os.Open(util.ToUniversalPath(imp.InputOptions.File)) + if err != nil { + return nil, -1, err + } + fileStat, err := file.Stat() + if err != nil { + return nil, -1, err + } + log.Logvf(log.Info, "filesize: %v bytes", fileStat.Size()) + return file, int64(fileStat.Size()), err + } + + log.Logvf(log.Info, "reading from stdin") + + // Stdin has undefined max size, so return 0 + return os.Stdin, 0, nil +} + +// fileSizeProgressor implements Progressor to allow a sizeTracker to hook up with a +// progress.Bar instance, so that the progress bar can report the percentage of the file read. +type fileSizeProgressor struct { + max int64 + sizeTracker +} + +func (fsp *fileSizeProgressor) Progress() (int64, int64) { + return fsp.max, fsp.sizeTracker.Size() +} + +// ImportDocuments is used to write input data to the database. It returns the +// number of documents successfully imported to the appropriate namespace and +// any error encountered in doing this +func (imp *MongoImport) ImportDocuments() (uint64, error) { + source, fileSize, err := imp.getSourceReader() + if err != nil { + return 0, err + } + defer source.Close() + + inputReader, err := imp.getInputReader(source) + if err != nil { + return 0, err + } + + if imp.InputOptions.HeaderLine { + if imp.InputOptions.ColumnsHaveTypes { + err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace)) + } else { + err = inputReader.ReadAndValidateHeader() + } + if err != nil { + return 0, err + } + } + + bar := &progress.Bar{ + Name: fmt.Sprintf("%v.%v", imp.ToolOptions.DB, imp.ToolOptions.Collection), + Watching: &fileSizeProgressor{fileSize, inputReader}, + Writer: log.Writer(0), + BarLength: progressBarLength, + IsBytes: true, + } + bar.Start() + defer bar.Stop() + return imp.importDocuments(inputReader) +} + +// importDocuments is a helper to ImportDocuments and does all the ingestion +// work by taking data from the inputReader source and writing it to the +// appropriate namespace +func (imp *MongoImport) importDocuments(inputReader InputReader) (numImported uint64, retErr error) { + session, err := imp.SessionProvider.GetSession() + if err != nil { + return 0, err + } + defer session.Close() + + connURL := imp.ToolOptions.Host + if connURL == "" { + connURL = util.DefaultHost + } + if imp.ToolOptions.Port != "" { + connURL = connURL + ":" + imp.ToolOptions.Port + } + log.Logvf(log.Always, "connected to: %v", connURL) + + log.Logvf(log.Info, "ns: %v.%v", + imp.ToolOptions.Namespace.DB, + imp.ToolOptions.Namespace.Collection) + + // check if the server is a replica set, mongos, or standalone + imp.nodeType, err = imp.SessionProvider.GetNodeType() + if err != nil { + return 0, fmt.Errorf("error checking connected node type: %v", err) + } + log.Logvf(log.Info, "connected to node type: %v", imp.nodeType) + + if err = imp.configureSession(session); err != nil { + return 0, fmt.Errorf("error configuring session: %v", err) + } + + // drop the database if necessary + if imp.IngestOptions.Drop { + log.Logvf(log.Always, "dropping: %v.%v", + imp.ToolOptions.DB, + imp.ToolOptions.Collection) + collection := session.DB(imp.ToolOptions.DB). + C(imp.ToolOptions.Collection) + if err := collection.DropCollection(); err != nil { + if err.Error() != db.ErrNsNotFound { + return 0, err + } + } + } + + readDocs := make(chan bson.D, workerBufferSize) + processingErrChan := make(chan error) + ordered := imp.IngestOptions.MaintainInsertionOrder + + // read and process from the input reader + go func() { + processingErrChan <- inputReader.StreamDocument(ordered, readDocs) + }() + + // insert documents into the target database + go func() { + processingErrChan <- imp.ingestDocuments(readDocs) + }() + + e1 := channelQuorumError(processingErrChan, 2) + insertionCount := atomic.LoadUint64(&imp.insertionCount) + return insertionCount, e1 +} + +// ingestDocuments accepts a channel from which it reads documents to be inserted +// into the target collection. It spreads the insert/upsert workload across one +// or more workers. +func (imp *MongoImport) ingestDocuments(readDocs chan bson.D) (retErr error) { + numInsertionWorkers := imp.IngestOptions.NumInsertionWorkers + if numInsertionWorkers <= 0 { + numInsertionWorkers = 1 + } + + // Each ingest worker will return an error which will + // be set in the following cases: + // + // 1. There is a problem connecting with the server + // 2. The server becomes unreachable + // 3. There is an insertion/update error - e.g. duplicate key + // error - and stopOnError is set to true + + wg := new(sync.WaitGroup) + for i := 0; i < numInsertionWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // only set the first insertion error and cause sibling goroutines to terminate immediately + err := imp.runInsertionWorker(readDocs) + if err != nil && retErr == nil { + retErr = err + imp.Kill(err) + } + }() + } + wg.Wait() + return +} + +// configureSession takes in a session and modifies it with properly configured +// settings. It does the following configurations: +// +// 1. Sets the session to not timeout +// 2. Sets the write concern on the session +// 3. Sets the session safety +// +// returns an error if it's unable to set the write concern +func (imp *MongoImport) configureSession(session *mgo.Session) error { + // sockets to the database will never be forcibly closed + session.SetSocketTimeout(0) + sessionSafety, err := db.BuildWriteConcern(imp.IngestOptions.WriteConcern, imp.nodeType) + if err != nil { + return fmt.Errorf("write concern error: %v", err) + } + session.SetSafe(sessionSafety) + + return nil +} + +type flushInserter interface { + Insert(doc interface{}) error + Flush() error +} + +// runInsertionWorker is a helper to InsertDocuments - it reads document off +// the read channel and prepares then in batches for insertion into the databas +func (imp *MongoImport) runInsertionWorker(readDocs chan bson.D) (err error) { + session, err := imp.SessionProvider.GetSession() + if err != nil { + return fmt.Errorf("error connecting to mongod: %v", err) + } + defer session.Close() + if err = imp.configureSession(session); err != nil { + return fmt.Errorf("error configuring session: %v", err) + } + collection := session.DB(imp.ToolOptions.DB).C(imp.ToolOptions.Collection) + + var inserter flushInserter + if imp.IngestOptions.Upsert { + inserter = imp.newUpserter(collection) + } else { + inserter = db.NewBufferedBulkInserter(collection, imp.IngestOptions.BulkBufferSize, !imp.IngestOptions.StopOnError) + if !imp.IngestOptions.MaintainInsertionOrder { + inserter.(*db.BufferedBulkInserter).Unordered() + } + } + +readLoop: + for { + select { + case document, alive := <-readDocs: + if !alive { + break readLoop + } + err = filterIngestError(imp.IngestOptions.StopOnError, inserter.Insert(document)) + if err != nil { + return err + } + atomic.AddUint64(&imp.insertionCount, 1) + case <-imp.Dying(): + return nil + } + } + + err = inserter.Flush() + // TOOLS-349 correct import count for bulk operations + if bulkError, ok := err.(*mgo.BulkError); ok { + failedDocs := make(map[int]bool) // index of failures + for _, failure := range bulkError.Cases() { + failedDocs[failure.Index] = true + } + numFailures := len(failedDocs) + if numFailures > 0 { + log.Logvf(log.Always, "num failures: %d", numFailures) + atomic.AddUint64(&imp.insertionCount, ^uint64(numFailures-1)) + } + } + return filterIngestError(imp.IngestOptions.StopOnError, err) +} + +type upserter struct { + imp *MongoImport + collection *mgo.Collection +} + +func (imp *MongoImport) newUpserter(collection *mgo.Collection) *upserter { + return &upserter{ + imp: imp, + collection: collection, + } +} + +// Insert is part of the flushInserter interface and performs +// upserts or inserts. +func (up *upserter) Insert(doc interface{}) error { + document := doc.(bson.D) + selector := constructUpsertDocument(up.imp.upsertFields, document) + var err error + if selector == nil { + err = up.collection.Insert(document) + } else { + _, err = up.collection.Upsert(selector, document) + } + return err +} + +// Flush is needed so that upserter implements flushInserter, but upserter +// doesn't buffer anything so we don't need to do anything in Flush. +func (up *upserter) Flush() error { + return nil +} + +func splitInlineHeader(header string) (headers []string) { + var level uint8 + var currentField string + for _, c := range header { + if c == '(' { + level++ + } else if c == ')' && level > 0 { + level-- + } + if c == ',' && level == 0 { + headers = append(headers, currentField) + currentField = "" + } else { + currentField = currentField + string(c) + } + } + headers = append(headers, currentField) // add last field + return +} + +// getInputReader returns an implementation of InputReader based on the input type +func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) { + var colSpecs []ColumnSpec + var headers []string + var err error + if imp.InputOptions.Fields != nil { + headers = splitInlineHeader(*imp.InputOptions.Fields) + } else if imp.InputOptions.FieldFile != nil { + headers, err = util.GetFieldsFromFile(*imp.InputOptions.FieldFile) + if err != nil { + return nil, err + } + } + if imp.InputOptions.ColumnsHaveTypes { + colSpecs, err = ParseTypedHeaders(headers, ParsePG(imp.InputOptions.ParseGrace)) + if err != nil { + return nil, err + } + } else { + colSpecs = ParseAutoHeaders(headers) + } + + // header fields validation can only happen once we have an input reader + if !imp.InputOptions.HeaderLine { + if err = validateReaderFields(ColumnNames(colSpecs)); err != nil { + return nil, err + } + } + + out := os.Stdout + + ignoreBlanks := imp.IngestOptions.IgnoreBlanks && imp.InputOptions.Type != JSON + if imp.InputOptions.Type == CSV { + return NewCSVInputReader(colSpecs, in, out, imp.IngestOptions.NumDecodingWorkers, ignoreBlanks), nil + } else if imp.InputOptions.Type == TSV { + return NewTSVInputReader(colSpecs, in, out, imp.IngestOptions.NumDecodingWorkers, ignoreBlanks), nil + } + return NewJSONInputReader(imp.InputOptions.JSONArray, in, imp.IngestOptions.NumDecodingWorkers), nil +} diff --git a/src/mongo/gotools/mongoimport/mongoimport_test.go b/src/mongo/gotools/mongoimport/mongoimport_test.go new file mode 100644 index 00000000000..ff10ec5525c --- /dev/null +++ b/src/mongo/gotools/mongoimport/mongoimport_test.go @@ -0,0 +1,757 @@ +package mongoimport + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" + + "github.com/mongodb/mongo-tools/common/db" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" +) + +const ( + testDb = "db" + testCollection = "c" +) + +// checkOnlyHasDocuments returns an error if the documents in the test +// collection don't exactly match those that are passed in +func checkOnlyHasDocuments(sessionProvider db.SessionProvider, expectedDocuments []bson.M) error { + session, err := sessionProvider.GetSession() + if err != nil { + return err + } + defer session.Close() + + collection := session.DB(testDb).C(testCollection) + dbDocuments := []bson.M{} + err = collection.Find(nil).Sort("_id").All(&dbDocuments) + if err != nil { + return err + } + if len(dbDocuments) != len(expectedDocuments) { + return fmt.Errorf("document count mismatch: expected %#v, got %#v", + len(expectedDocuments), len(dbDocuments)) + } + for index := range dbDocuments { + if !reflect.DeepEqual(dbDocuments[index], expectedDocuments[index]) { + return fmt.Errorf("document mismatch: expected %#v, got %#v", + expectedDocuments[index], dbDocuments[index]) + } + } + return nil +} + +// getBasicToolOptions returns a test helper to instantiate the session provider +// for calls to StreamDocument +func getBasicToolOptions() *options.ToolOptions { + general := &options.General{} + ssl := testutil.GetSSLOptions() + auth := testutil.GetAuthOptions() + namespace := &options.Namespace{ + DB: testDb, + Collection: testCollection, + } + connection := &options.Connection{ + Host: "localhost", + Port: db.DefaultTestPort, + } + return &options.ToolOptions{ + General: general, + SSL: &ssl, + Namespace: namespace, + Connection: connection, + Auth: &auth, + } +} + +func NewMongoImport() (*MongoImport, error) { + toolOptions := getBasicToolOptions() + inputOptions := &InputOptions{ + ParseGrace: "stop", + } + ingestOptions := &IngestOptions{} + provider, err := db.NewSessionProvider(*toolOptions) + if err != nil { + return nil, err + } + return &MongoImport{ + ToolOptions: toolOptions, + InputOptions: inputOptions, + IngestOptions: ingestOptions, + SessionProvider: provider, + }, nil +} + +func TestSplitInlineHeader(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("handle normal, untyped headers", t, func() { + fields := []string{"foo.bar", "baz", "boo"} + header := strings.Join(fields, ",") + Convey("with '"+header+"'", func() { + So(splitInlineHeader(header), ShouldResemble, fields) + }) + }) + Convey("handle typed headers", t, func() { + fields := []string{"foo.bar.string()", "baz.date(January 2 2006)", "boo.binary(hex)"} + header := strings.Join(fields, ",") + Convey("with '"+header+"'", func() { + So(splitInlineHeader(header), ShouldResemble, fields) + }) + }) + Convey("handle typed headers that include commas", t, func() { + fields := []string{"foo.bar.date(,,,,)", "baz.date(January 2, 2006)", "boo.binary(hex)"} + header := strings.Join(fields, ",") + Convey("with '"+header+"'", func() { + So(splitInlineHeader(header), ShouldResemble, fields) + }) + }) +} + +func TestMongoImportValidateSettings(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Given a mongoimport instance for validation, ", t, func() { + Convey("an error should be thrown if no collection is given", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.ToolOptions.Namespace.DB = "" + imp.ToolOptions.Namespace.Collection = "" + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if an invalid type is given", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = "invalid" + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if neither --headerline is supplied "+ + "nor --fields/--fieldFile", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("no error should be thrown if --headerline is not supplied "+ + "but --fields is supplied", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fields := "a,b,c" + imp.InputOptions.Fields = &fields + imp.InputOptions.Type = CSV + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("no error should be thrown if no input type is supplied", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("no error should be thrown if there's just one positional argument", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + So(imp.ValidateSettings([]string{"a"}), ShouldBeNil) + }) + + Convey("an error should be thrown if --file is used with one positional argument", func() { + imp, err := NewMongoImport() + imp.InputOptions.File = "abc" + So(err, ShouldBeNil) + So(imp.ValidateSettings([]string{"a"}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if there's more than one positional argument", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + So(imp.ValidateSettings([]string{"a", "b"}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if --headerline is used with JSON input", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.HeaderLine = true + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if --fields is used with JSON input", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fields := "" + imp.InputOptions.Fields = &fields + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + fields = "a,b,c" + imp.InputOptions.Fields = &fields + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if --fieldFile is used with JSON input", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "" + imp.InputOptions.FieldFile = &fieldFile + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + fieldFile = "test.csv" + imp.InputOptions.FieldFile = &fieldFile + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("an error should be thrown if --ignoreBlanks is used with JSON input", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.IngestOptions.IgnoreBlanks = true + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("no error should be thrown if --headerline is not supplied "+ + "but --fieldFile is supplied", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "test.csv" + imp.InputOptions.FieldFile = &fieldFile + imp.InputOptions.Type = CSV + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("an error should be thrown if a field in the --upsertFields "+ + "argument starts with a dollar sign", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.IngestOptions.Upsert = true + imp.IngestOptions.UpsertFields = "a,$b,c" + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + imp.IngestOptions.UpsertFields = "a,.b,c" + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("no error should be thrown if --upsertFields is supplied without "+ + "--upsert", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.IngestOptions.UpsertFields = "a,b,c" + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("if --upsert is used without --upsertFields, _id should be set as "+ + "the upsert field", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.IngestOptions.Upsert = true + imp.IngestOptions.UpsertFields = "" + So(imp.ValidateSettings([]string{}), ShouldBeNil) + So(imp.upsertFields, ShouldResemble, []string{"_id"}) + }) + + Convey("no error should be thrown if all fields in the --upsertFields "+ + "argument are valid", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.IngestOptions.Upsert = true + imp.IngestOptions.UpsertFields = "a,b,c" + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("no error should be thrown if --fields is supplied with CSV import", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fields := "a,b,c" + imp.InputOptions.Fields = &fields + imp.InputOptions.Type = CSV + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("an error should be thrown if an empty --fields is supplied with CSV import", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fields := "" + imp.InputOptions.Fields = &fields + imp.InputOptions.Type = CSV + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("no error should be thrown if --fieldFile is supplied with CSV import", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "test.csv" + imp.InputOptions.FieldFile = &fieldFile + imp.InputOptions.Type = CSV + So(imp.ValidateSettings([]string{}), ShouldBeNil) + }) + + Convey("an error should be thrown if no collection and no file is supplied", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "test.csv" + imp.InputOptions.FieldFile = &fieldFile + imp.InputOptions.Type = CSV + imp.ToolOptions.Namespace.Collection = "" + So(imp.ValidateSettings([]string{}), ShouldNotBeNil) + }) + + Convey("no error should be thrown if --file is used (without -c) supplied "+ + "- the file name should be used as the collection name", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "input" + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.ToolOptions.Namespace.Collection = "" + So(imp.ValidateSettings([]string{}), ShouldBeNil) + So(imp.ToolOptions.Namespace.Collection, ShouldEqual, + imp.InputOptions.File) + }) + + Convey("with no collection name and a file name the base name of the "+ + "file (without the extension) should be used as the collection name", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + imp.InputOptions.HeaderLine = true + imp.InputOptions.Type = CSV + imp.ToolOptions.Namespace.Collection = "" + So(imp.ValidateSettings([]string{}), ShouldBeNil) + So(imp.ToolOptions.Namespace.Collection, ShouldEqual, "input") + }) + }) +} + +func TestGetSourceReader(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("Given a mongoimport instance, on calling getSourceReader", t, + func() { + Convey("an error should be thrown if the given file referenced by "+ + "the reader does not exist", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + imp.InputOptions.Type = CSV + imp.ToolOptions.Namespace.Collection = "" + _, _, err = imp.getSourceReader() + So(err, ShouldNotBeNil) + }) + + Convey("no error should be thrown if the file exists", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "testdata/test_array.json" + imp.InputOptions.Type = JSON + _, _, err = imp.getSourceReader() + So(err, ShouldBeNil) + }) + + Convey("no error should be thrown if stdin is used", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "" + _, _, err = imp.getSourceReader() + So(err, ShouldBeNil) + }) + }) +} + +func TestGetInputReader(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("Given a io.Reader on calling getInputReader", t, func() { + Convey("should parse --fields using valid csv escaping", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Fields = new(string) + *imp.InputOptions.Fields = "foo.auto(),bar.date(January 2, 2006)" + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + imp.InputOptions.ColumnsHaveTypes = true + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("should complain about non-escaped new lines in --fields", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Fields = new(string) + *imp.InputOptions.Fields = "foo.auto(),\nblah.binary(hex),bar.date(January 2, 2006)" + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + imp.InputOptions.ColumnsHaveTypes = true + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("no error should be thrown if neither --fields nor --fieldFile "+ + "is used", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("no error should be thrown if --fields is used", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fields := "a,b,c" + imp.InputOptions.Fields = &fields + imp.InputOptions.File = "/path/to/input/file/dot/input.txt" + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("no error should be thrown if --fieldFile is used and it "+ + "references a valid file", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "testdata/test.csv" + imp.InputOptions.FieldFile = &fieldFile + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("an error should be thrown if --fieldFile is used and it "+ + "references an invalid file", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "/path/to/input/file/dot/input.txt" + imp.InputOptions.FieldFile = &fieldFile + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldNotBeNil) + }) + Convey("no error should be thrown for CSV import inputs", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("no error should be thrown for TSV import inputs", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = TSV + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("no error should be thrown for JSON import inputs", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = JSON + _, err = imp.getInputReader(&os.File{}) + So(err, ShouldBeNil) + }) + Convey("an error should be thrown if --fieldFile fields are invalid", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "testdata/test_fields_invalid.txt" + imp.InputOptions.FieldFile = &fieldFile + file, err := os.Open(fieldFile) + So(err, ShouldBeNil) + _, err = imp.getInputReader(file) + So(err, ShouldNotBeNil) + }) + Convey("no error should be thrown if --fieldFile fields are valid", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + fieldFile := "testdata/test_fields_valid.txt" + imp.InputOptions.FieldFile = &fieldFile + file, err := os.Open(fieldFile) + So(err, ShouldBeNil) + _, err = imp.getInputReader(file) + So(err, ShouldBeNil) + }) + }) +} + +func TestImportDocuments(t *testing.T) { + testutil.VerifyTestType(t, testutil.IntegrationTestType) + Convey("With a mongoimport instance", t, func() { + Reset(func() { + sessionProvider, err := db.NewSessionProvider(*getBasicToolOptions()) + if err != nil { + t.Fatalf("error getting session provider session: %v", err) + } + session, err := sessionProvider.GetSession() + if err != nil { + t.Fatalf("error getting session: %v", err) + } + defer session.Close() + session.DB(testDb).C(testCollection).DropCollection() + }) + Convey("no error should be thrown for CSV import on test data and all "+ + "CSV data lines should be imported correctly", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test.csv" + fields := "a,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.WriteConcern = "majority" + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + }) + Convey("an error should be thrown for JSON import on test data that is "+ + "JSON array", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "testdata/test_array.json" + imp.IngestOptions.WriteConcern = "majority" + numImported, err := imp.ImportDocuments() + So(err, ShouldNotBeNil) + So(numImported, ShouldEqual, 0) + }) + Convey("TOOLS-247: no error should be thrown for JSON import on test "+ + "data and all documents should be imported correctly", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "testdata/test_plain2.json" + imp.IngestOptions.WriteConcern = "majority" + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 10) + }) + Convey("CSV import with --ignoreBlanks should import only non-blank fields", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test_blanks.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.IgnoreBlanks = true + + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2)}, + bson.M{"_id": 5, "c": "6e"}, + bson.M{"_id": 7, "b": int(8), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("CSV import without --ignoreBlanks should include blanks", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test_blanks.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": ""}, + bson.M{"_id": 5, "b": "", "c": "6e"}, + bson.M{"_id": 7, "b": int(8), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("no error should be thrown for CSV import on test data with --upsertFields", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.UpsertFields = "b,c" + imp.IngestOptions.MaintainInsertionOrder = true + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": int(3)}, + bson.M{"_id": 3, "b": 5.4, "c": "string"}, + bson.M{"_id": 5, "b": int(6), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("no error should be thrown for CSV import on test data with "+ + "--stopOnError. Only documents before error should be imported", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.StopOnError = true + imp.IngestOptions.MaintainInsertionOrder = true + imp.IngestOptions.WriteConcern = "majority" + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": int(3)}, + bson.M{"_id": 3, "b": 5.4, "c": "string"}, + bson.M{"_id": 5, "b": int(6), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("CSV import with duplicate _id's should not error if --stopOnError is not set", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test_duplicate.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.StopOnError = false + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 5) + + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": int(3)}, + bson.M{"_id": 3, "b": 5.4, "c": "string"}, + bson.M{"_id": 5, "b": int(6), "c": int(6)}, + bson.M{"_id": 8, "b": int(6), "c": int(6)}, + } + // all docs except the one with duplicate _id - should be imported + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("no error should be thrown for CSV import on test data with --drop", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.Drop = true + imp.IngestOptions.MaintainInsertionOrder = true + imp.IngestOptions.WriteConcern = "majority" + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": int(3)}, + bson.M{"_id": 3, "b": 5.4, "c": "string"}, + bson.M{"_id": 5, "b": int(6), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("CSV import on test data with --headerLine should succeed", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.InputOptions.HeaderLine = true + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 2) + }) + Convey("EOF should be thrown for CSV import with --headerLine if file is empty", func() { + csvFile, err := ioutil.TempFile("", "mongoimport_") + So(err, ShouldBeNil) + csvFile.Close() + + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = csvFile.Name() + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.InputOptions.HeaderLine = true + numImported, err := imp.ImportDocuments() + So(err, ShouldEqual, io.EOF) + So(numImported, ShouldEqual, 0) + }) + Convey("CSV import with --upsert and --upsertFields should succeed", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test.csv" + fields := "_id,c,b" + imp.InputOptions.Fields = &fields + imp.IngestOptions.UpsertFields = "_id" + imp.IngestOptions.MaintainInsertionOrder = true + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 3) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "c": int(2), "b": int(3)}, + bson.M{"_id": 3, "c": 5.4, "b": "string"}, + bson.M{"_id": 5, "c": int(6), "b": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("CSV import with --upsert/--upsertFields with duplicate id should succeed "+ + "if stopOnError is not set", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test_duplicate.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.Upsert = true + imp.upsertFields = []string{"_id"} + numImported, err := imp.ImportDocuments() + So(err, ShouldBeNil) + So(numImported, ShouldEqual, 5) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": int(3)}, + bson.M{"_id": 3, "b": 5.4, "c": "string"}, + bson.M{"_id": 5, "b": int(6), "c": int(9)}, + bson.M{"_id": 8, "b": int(6), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("an error should be thrown for CSV import on test data with "+ + "duplicate _id if --stopOnError is set", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test_duplicate.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.StopOnError = true + imp.IngestOptions.WriteConcern = "1" + imp.IngestOptions.MaintainInsertionOrder = true + _, err = imp.ImportDocuments() + So(err, ShouldNotBeNil) + expectedDocuments := []bson.M{ + bson.M{"_id": 1, "b": int(2), "c": int(3)}, + bson.M{"_id": 3, "b": 5.4, "c": "string"}, + bson.M{"_id": 5, "b": int(6), "c": int(6)}, + } + So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil) + }) + Convey("an error should be thrown for JSON import on test data that "+ + "is a JSON array without passing --jsonArray", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.File = "testdata/test_array.json" + imp.IngestOptions.WriteConcern = "1" + _, err = imp.ImportDocuments() + So(err, ShouldNotBeNil) + }) + Convey("an error should be thrown if a plain JSON file is supplied", func() { + fileHandle, err := os.Open("testdata/test_plain.json") + So(err, ShouldBeNil) + jsonInputReader := NewJSONInputReader(true, fileHandle, 1) + docChan := make(chan bson.D, 1) + So(jsonInputReader.StreamDocument(true, docChan), ShouldNotBeNil) + }) + Convey("an error should be thrown for invalid CSV import on test data", func() { + imp, err := NewMongoImport() + So(err, ShouldBeNil) + imp.InputOptions.Type = CSV + imp.InputOptions.File = "testdata/test_bad.csv" + fields := "_id,b,c" + imp.InputOptions.Fields = &fields + imp.IngestOptions.StopOnError = true + imp.IngestOptions.WriteConcern = "1" + imp.IngestOptions.MaintainInsertionOrder = true + _, err = imp.ImportDocuments() + So(err, ShouldNotBeNil) + }) + }) +} diff --git a/src/mongo/gotools/mongoimport/options.go b/src/mongo/gotools/mongoimport/options.go new file mode 100644 index 00000000000..2fe252fb527 --- /dev/null +++ b/src/mongo/gotools/mongoimport/options.go @@ -0,0 +1,79 @@ +package mongoimport + +var Usage = `<options> <file> + +Import CSV, TSV or JSON data into MongoDB. If no file is provided, mongoimport reads from stdin. + +See http://docs.mongodb.org/manual/reference/program/mongoimport/ for more information.` + +// InputOptions defines the set of options for reading input data. +type InputOptions struct { + // Fields is an option to directly specify comma-separated fields to import to CSV. + Fields *string `long:"fields" value-name:"<field>[,<field>]*" short:"f" description:"comma separated list of fields, e.g. -f name,age"` + + // FieldFile is a filename that refers to a list of fields to import, 1 per line. + FieldFile *string `long:"fieldFile" value-name:"<filename>" description:"file with field names - 1 per line"` + + // Specifies the location and name of a file containing the data to import. + File string `long:"file" value-name:"<filename>" description:"file to import from; if not specified, stdin is used"` + + // Treats the input source's first line as field list (csv and tsv only). + HeaderLine bool `long:"headerline" description:"use first line in input source as the field list (CSV and TSV only)"` + + // Indicates that the underlying input source contains a single JSON array with the documents to import. + JSONArray bool `long:"jsonArray" description:"treat input source as a JSON array"` + + // Indicates how to handle type coercion failures + ParseGrace string `long:"parseGrace" value-name:"<grace>" default:"stop" description:"controls behavior when type coercion fails - one of: autoCast, skipField, skipRow, stop (defaults to 'stop')"` + + // Specifies the file type to import. The default format is JSON, but it’s possible to import CSV and TSV files. + Type string `long:"type" value-name:"<type>" default:"json" default-mask:"-" description:"input format to import: json, csv, or tsv (defaults to 'json')"` + + // Indicates that field names include type descriptions + ColumnsHaveTypes bool `long:"columnsHaveTypes" description:"indicated that the field list (from --fields, --fieldsFile, or --headerline) specifies types; They must be in the form of '<colName>.<type>(<arg>)'. The type can be one of: auto, binary, bool, date, date_go, date_ms, date_oracle, double, int32, int64, string. For each of the date types, the argument is a datetime layout string. For the binary type, the argument can be one of: base32, base64, hex. All other types take an empty argument. Only valid for CSV and TSV imports. e.g. zipcode.string(), thumbnail.binary(base64)"` +} + +// Name returns a description of the InputOptions struct. +func (_ *InputOptions) Name() string { + return "input" +} + +// IngestOptions defines the set of options for storing data. +type IngestOptions struct { + // Drops target collection before importing. + Drop bool `long:"drop" description:"drop collection before inserting documents"` + + // Ignores fields with empty values in CSV and TSV imports. + IgnoreBlanks bool `long:"ignoreBlanks" description:"ignore fields with empty values in CSV and TSV"` + + // Indicates that documents will be inserted in the order of their appearance in the input source. + MaintainInsertionOrder bool `long:"maintainInsertionOrder" description:"insert documents in the order of their appearance in the input source"` + + // Sets the number of insertion routines to use + NumInsertionWorkers int `short:"j" value-name:"<number>" long:"numInsertionWorkers" description:"number of insert operations to run concurrently (defaults to 1)" default:"1" default-mask:"-"` + + // Forces mongoimport to halt the import operation at the first insert or upsert error. + StopOnError bool `long:"stopOnError" description:"stop importing at first insert/upsert error"` + + // Modifies the import process to update existing objects in the database if they match --upsertFields. + Upsert bool `long:"upsert" description:"insert or update objects that already exist"` + + // Specifies a list of fields for the query portion of the upsert; defaults to _id field. + UpsertFields string `long:"upsertFields" value-name:"<field>[,<field>]*" description:"comma-separated fields for the query part of the upsert"` + + // Sets write concern level for write operations. + WriteConcern string `long:"writeConcern" default:"majority" value-name:"<write-concern-specifier>" default-mask:"-" description:"write concern options e.g. --writeConcern majority, --writeConcern '{w: 3, wtimeout: 500, fsync: true, j: true}' (defaults to 'majority')"` + + // Indicates that the server should bypass document validation on import. + BypassDocumentValidation bool `long:"bypassDocumentValidation" description:"bypass document validation"` + + // Specifies the number of threads to use in processing data read from the input source + NumDecodingWorkers int `long:"numDecodingWorkers" default:"0" hidden:"true"` + + BulkBufferSize int `long:"batchSize" default:"1000" hidden:"true"` +} + +// Name returns a description of the IngestOptions struct. +func (_ *IngestOptions) Name() string { + return "ingest" +} diff --git a/src/mongo/gotools/mongoimport/testdata/test.csv b/src/mongo/gotools/mongoimport/testdata/test.csv new file mode 100644 index 00000000000..357d40e6da3 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test.csv @@ -0,0 +1,3 @@ +1,2,3 +3,5.4,string +5,6,6 diff --git a/src/mongo/gotools/mongoimport/testdata/test.tsv b/src/mongo/gotools/mongoimport/testdata/test.tsv new file mode 100644 index 00000000000..a6d5298b40a --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test.tsv @@ -0,0 +1,3 @@ +1 2 3 +3 4.6 5 +5 string 6 diff --git a/src/mongo/gotools/mongoimport/testdata/test_array.json b/src/mongo/gotools/mongoimport/testdata/test_array.json new file mode 100644 index 00000000000..c4642157433 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_array.json @@ -0,0 +1,31 @@ +[ {"a": 1.2, "b":"a", "c": 0.4} , + + +{"a": 2.4, "b":"string", "c": 52.9}, +{"a": 3, "b":"string", "c": 52}, +{"a": 4, "b":"string", "c": 52}, +{"a": 5, "b":"string", "c": 52}, +{"a": 6, "b":"string", "c": 52}, +{"a": 7, "b":"string", "c": 52} , +{"a": 8, "b":"string", "c": 52}, +{"a": 9, "b":"string", "c": 52}, +{"a": 10, "b":"string", "c": 52}, +{"a": 11, "b":"string", "c": 52}, +{"a": 12, "b":"string", "c": 52}, +{"a": 13, "b":"string", "c": 52}, +{"a": 14, "b":"string", "c": 52}, +{"a": 15, "b":"string", "c": 52}, + {"a": 16, "b":"string", "c": 52}, +{"a": 17, "b":"string", "c": 52}, +{"a": 18, "b":"string", "c": 52}, +{"a": 29, "b":"string", "c": 52}, +{"a": 20, "b":"string", "c": 52}, +{"a": 21, "b":"string", "c": 52} + + , +{"a": 22, "b":"string", "c": 52}, +{"a": 23, "b":"string", "c": 52}, +{"a": 24, "b":"string", "c": 52}, +{"a": 25, "b":"string", "c": 52}, +{"a": 25, "b":"string", "c": 52}, +{"a": 27, "b":"value", "c": 65}]
\ No newline at end of file diff --git a/src/mongo/gotools/mongoimport/testdata/test_bad.csv b/src/mongo/gotools/mongoimport/testdata/test_bad.csv new file mode 100644 index 00000000000..c1d6aeeca88 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_bad.csv @@ -0,0 +1,3 @@ +1,2,3 +3,5".4,string +5,6,6 diff --git a/src/mongo/gotools/mongoimport/testdata/test_blanks.csv b/src/mongo/gotools/mongoimport/testdata/test_blanks.csv new file mode 100644 index 00000000000..e94daca6d0d --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_blanks.csv @@ -0,0 +1,3 @@ +1,2, +5,,6e +7,8,6
\ No newline at end of file diff --git a/src/mongo/gotools/mongoimport/testdata/test_bom.csv b/src/mongo/gotools/mongoimport/testdata/test_bom.csv new file mode 100644 index 00000000000..eef9b0a80c5 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_bom.csv @@ -0,0 +1,2 @@ +1,2,3 +4,5,6 diff --git a/src/mongo/gotools/mongoimport/testdata/test_bom.json b/src/mongo/gotools/mongoimport/testdata/test_bom.json new file mode 100644 index 00000000000..e813e78d234 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_bom.json @@ -0,0 +1,2 @@ +{"a": 1, "b": 2, "c": 3} +{"a": 4, "b": 5, "c": 6} diff --git a/src/mongo/gotools/mongoimport/testdata/test_bom.tsv b/src/mongo/gotools/mongoimport/testdata/test_bom.tsv new file mode 100644 index 00000000000..4c117a5ca88 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_bom.tsv @@ -0,0 +1 @@ +1 2 3 diff --git a/src/mongo/gotools/mongoimport/testdata/test_duplicate.csv b/src/mongo/gotools/mongoimport/testdata/test_duplicate.csv new file mode 100644 index 00000000000..137f668e25a --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_duplicate.csv @@ -0,0 +1,5 @@ +1,2,3 +3,5.4,string +5,6,6 +5,6,9 +8,6,6
\ No newline at end of file diff --git a/src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt b/src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt new file mode 100644 index 00000000000..90505050d51 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt @@ -0,0 +1,3 @@ +a +$ +b diff --git a/src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt b/src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt new file mode 100644 index 00000000000..de980441c3a --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt @@ -0,0 +1,3 @@ +a +b +c diff --git a/src/mongo/gotools/mongoimport/testdata/test_plain.json b/src/mongo/gotools/mongoimport/testdata/test_plain.json new file mode 100644 index 00000000000..ce158ad792b --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_plain.json @@ -0,0 +1,3 @@ +{"a": 4, "b":"string value", "c": 1} +{"a": 5, "b":"string value", "c": 2} +{"a": 6, "b":"string value", "c": 3}
\ No newline at end of file diff --git a/src/mongo/gotools/mongoimport/testdata/test_plain2.json b/src/mongo/gotools/mongoimport/testdata/test_plain2.json new file mode 100644 index 00000000000..84efc925a9d --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_plain2.json @@ -0,0 +1,10 @@ +{"body": "'''Wei-chi''' may refer to:\n*The [[game of go]]\n*The [[Chinese word for \"crisis\"]]\n\n{{dab}}", "timestamp": {"$date": 1409836258000}, "page_id": 747205, "user": "TheQ Editor", "title": "Wei-chi"} +{"body": "'''Frameset''' may refer to:\n\n* [[Framing (World Wide Web)]]\n* [[Bicycle frame]]\n\n{{disambig}}", "timestamp": {"$date": 1409836258000}, "page_id": 1450638, "user": "Kkj11210", "title": "Frameset"} +{"body": "'''Poenit''' may refer to:\n*[[Poenitentiaria]] or Apostolic Penitentiary\n*[[Phut]]\n\n{{disambig}}", "timestamp": {"$date": 1409836258000}, "page_id": 379316, "user": "Omnipaedista", "title": "Poenit"} +{"body": "In Malawi, '''Tonga''' may be:\n* [[Tonga people (Malawi)]]\n* [[Tonga language (Malawi)]]\n\n{{dab}}", "timestamp": {"$date": 1409836258000}, "page_id": 3750295, "user": "Kwamikagami", "title": "Tonga (Malawi)"} +{"body": "'''Windows NT 6.0''' can refer to:\n*[[Windows Vista]]\n*[[Windows Server 2008]]\n\n{{disambiguation}}", "timestamp": {"$date": 1409836258000}, "page_id": 3875545, "user": "Codename Lisa", "title": "Windows NT 6.0"} +{"body": "'''Poyen''' may refer to:\n*[[Poyen, Arkansas]], United States\n* [[Poyen, Kargil]], India\n\n{{geodis}}", "timestamp": {"$date": 1409836258000}, "page_id": 1889856, "user": "PamD", "title": "Poyen"} +{"body": "'''Body check''' may refer to:\n*[[Checking (ice hockey)]]\n*[[Physical examination]]\n{{Disambiguation}}", "timestamp": {"$date": 1409836258000}, "page_id": 3555067, "user": "Bgheard", "title": "Body check"} +{"body": "'''Yevtushenko''' may refer to:\n\n* [[Yevgeny Yevtushenko]]\n* [[Vadym Yevtushenko]]\n\n{{disambiguation}}", "timestamp": {"$date": 1409836258000}, "page_id": 4842284, "user": "Kkj11210", "title": "Yevtushenko"} +{"body": "'''Tuks''' may refer to:\n*[[Tuks Senganga]], South African rapper\n* [[University of Pretoria]]\n\n{{dab}}", "timestamp": {"$date": 1409836258000}, "page_id": 490212, "user": "PamD", "title": "Tuks"} +{"body": "'''Ethanedithiol''' may refer to:\n\n* [[1,1-Ethanedithiol]]\n* [[1,2-Ethanedithiol]]\n\n{{chemistry index}}", "timestamp": {"$date": 1409836258000}, "page_id": 4514054, "user": "Kkj11210", "title": "Ethanedithiol"}
\ No newline at end of file diff --git a/src/mongo/gotools/mongoimport/testdata/test_type.csv b/src/mongo/gotools/mongoimport/testdata/test_type.csv new file mode 100644 index 00000000000..444321ee570 --- /dev/null +++ b/src/mongo/gotools/mongoimport/testdata/test_type.csv @@ -0,0 +1,4 @@ +zip.string(),number.double() +12345,20.2 +12345-1234,40.4 +23455,BLAH diff --git a/src/mongo/gotools/mongoimport/tsv.go b/src/mongo/gotools/mongoimport/tsv.go new file mode 100644 index 00000000000..09aadc67fdc --- /dev/null +++ b/src/mongo/gotools/mongoimport/tsv.go @@ -0,0 +1,163 @@ +package mongoimport + +import ( + "bufio" + "fmt" + "io" + "strings" + + "gopkg.in/mgo.v2/bson" +) + +const ( + entryDelimiter = '\n' + tokenSeparator = "\t" +) + +// TSVInputReader is a struct that implements the InputReader interface for a +// TSV input source. +type TSVInputReader struct { + // colSpecs is a list of column specifications in the BSON documents to be imported + colSpecs []ColumnSpec + + // tsvReader is the underlying reader used to read data in from the TSV + // or TSV file + tsvReader *bufio.Reader + + // tsvRejectWriter is where coercion-failed rows are written, if applicable + tsvRejectWriter io.Writer + + // tsvRecord stores each line of input we read from the underlying reader + tsvRecord string + + // numProcessed tracks the number of TSV records processed by the underlying reader + numProcessed uint64 + + // numDecoders is the number of concurrent goroutines to use for decoding + numDecoders int + + // embedded sizeTracker exposes the Size() method to check the number of bytes read so far + sizeTracker + + // ignoreBlanks is whether empty fields should be ignored + ignoreBlanks bool +} + +// TSVConverter implements the Converter interface for TSV input. +type TSVConverter struct { + colSpecs []ColumnSpec + data string + index uint64 + ignoreBlanks bool + rejectWriter io.Writer +} + +// NewTSVInputReader returns a TSVInputReader configured to read input from the +// given io.Reader, extracting the specified columns only. +func NewTSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, numDecoders int, ignoreBlanks bool) *TSVInputReader { + szCount := newSizeTrackingReader(newBomDiscardingReader(in)) + return &TSVInputReader{ + colSpecs: colSpecs, + tsvReader: bufio.NewReader(szCount), + tsvRejectWriter: rejects, + numProcessed: uint64(0), + numDecoders: numDecoders, + sizeTracker: szCount, + ignoreBlanks: ignoreBlanks, + } +} + +// ReadAndValidateHeader reads the header from the underlying reader and validates +// the header fields. It sets err if the read/validation fails. +func (r *TSVInputReader) ReadAndValidateHeader() (err error) { + header, err := r.tsvReader.ReadString(entryDelimiter) + if err != nil { + return err + } + for _, field := range strings.Split(header, tokenSeparator) { + r.colSpecs = append(r.colSpecs, ColumnSpec{ + Name: strings.TrimRight(field, "\r\n"), + Parser: new(FieldAutoParser), + }) + } + return validateReaderFields(ColumnNames(r.colSpecs)) +} + +// ReadAndValidateTypedHeader reads the header from the underlying reader and validates +// the header fields. It sets err if the read/validation fails. +func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) { + header, err := r.tsvReader.ReadString(entryDelimiter) + if err != nil { + return err + } + var headerFields []string + for _, field := range strings.Split(header, tokenSeparator) { + headerFields = append(headerFields, strings.TrimRight(field, "\r\n")) + } + r.colSpecs, err = ParseTypedHeaders(headerFields, parseGrace) + if err != nil { + return err + } + return validateReaderFields(ColumnNames(r.colSpecs)) +} + +// StreamDocument takes a boolean indicating if the documents should be streamed +// in read order and a channel on which to stream the documents processed from +// the underlying reader. Returns a non-nil error if streaming fails. +func (r *TSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (retErr error) { + tsvRecordChan := make(chan Converter, r.numDecoders) + tsvErrChan := make(chan error) + + // begin reading from source + go func() { + var err error + for { + r.tsvRecord, err = r.tsvReader.ReadString(entryDelimiter) + if err != nil { + close(tsvRecordChan) + if err == io.EOF { + tsvErrChan <- nil + } else { + r.numProcessed++ + tsvErrChan <- fmt.Errorf("read error on entry #%v: %v", r.numProcessed, err) + } + return + } + tsvRecordChan <- TSVConverter{ + colSpecs: r.colSpecs, + data: r.tsvRecord, + index: r.numProcessed, + ignoreBlanks: r.ignoreBlanks, + rejectWriter: r.tsvRejectWriter, + } + r.numProcessed++ + } + }() + + // begin processing read bytes + go func() { + tsvErrChan <- streamDocuments(ordered, r.numDecoders, tsvRecordChan, readDocs) + }() + + return channelQuorumError(tsvErrChan, 2) +} + +// Convert implements the Converter interface for TSV input. It converts a +// TSVConverter struct to a BSON document. +func (c TSVConverter) Convert() (b bson.D, err error) { + b, err = tokensToBSON( + c.colSpecs, + strings.Split(strings.TrimRight(c.data, "\r\n"), tokenSeparator), + c.index, + c.ignoreBlanks, + ) + if _, ok := err.(coercionError); ok { + c.Print() + err = nil + } + return +} + +func (c TSVConverter) Print() { + c.rejectWriter.Write([]byte(c.data + "\n")) +} diff --git a/src/mongo/gotools/mongoimport/tsv_test.go b/src/mongo/gotools/mongoimport/tsv_test.go new file mode 100644 index 00000000000..7ea33248f5a --- /dev/null +++ b/src/mongo/gotools/mongoimport/tsv_test.go @@ -0,0 +1,232 @@ +package mongoimport + +import ( + "bytes" + "os" + "testing" + + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" +) + +func TestTSVStreamDocument(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a TSV input reader", t, func() { + Convey("integer valued strings should be converted tsv1", func() { + contents := "1\t2\t3e\n" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", "3e"}, + } + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("valid TSV input file that starts with the UTF-8 BOM should "+ + "not raise an error", func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + } + fileHandle, err := os.Open("testdata/test_bom.tsv") + So(err, ShouldBeNil) + r := NewTSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false) + docChan := make(chan bson.D, 2) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("integer valued strings should be converted tsv2", func() { + contents := "a\tb\t\"cccc,cccc\"\td\n" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", "a"}, + {"b", "b"}, + {"c", `"cccc,cccc"`}, + {"field3", "d"}, + } + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("extra columns should be prefixed with 'field'", func() { + contents := "1\t2\t3e\t may\n" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", "3e"}, + {"field3", " may"}, + } + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("mixed values should be parsed correctly", func() { + contents := "12\t13.3\tInline\t14\n" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + {"d", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedRead := bson.D{ + {"a", int32(12)}, + {"b", 13.3}, + {"c", "Inline"}, + {"d", int32(14)}, + } + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 1) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedRead) + }) + + Convey("calling StreamDocument() in succession for TSVs should "+ + "return the correct next set of values", func() { + contents := "1\t2\t3\n4\t5\t6\n" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedReads := []bson.D{ + { + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + }, { + {"a", int32(4)}, + {"b", int32(5)}, + {"c", int32(6)}, + }, + } + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, len(expectedReads)) + So(r.StreamDocument(true, docChan), ShouldBeNil) + for i := 0; i < len(expectedReads); i++ { + for j, readDocument := range <-docChan { + So(readDocument.Name, ShouldEqual, expectedReads[i][j].Name) + So(readDocument.Value, ShouldEqual, expectedReads[i][j].Value) + } + } + }) + + Convey("calling StreamDocument() in succession for TSVs that contain "+ + "quotes should return the correct next set of values", func() { + contents := "1\t2\t3\n4\t\"\t6\n" + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedReadOne := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + } + expectedReadTwo := bson.D{ + {"a", int32(4)}, + {"b", `"`}, + {"c", int32(6)}, + } + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + docChan := make(chan bson.D, 2) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedReadOne) + So(<-docChan, ShouldResemble, expectedReadTwo) + }) + + Convey("plain TSV input file sources should be parsed correctly and "+ + "subsequent imports should parse correctly", + func() { + colSpecs := []ColumnSpec{ + {"a", new(FieldAutoParser), pgAutoCast, "auto"}, + {"b", new(FieldAutoParser), pgAutoCast, "auto"}, + {"c", new(FieldAutoParser), pgAutoCast, "auto"}, + } + expectedReadOne := bson.D{ + {"a", int32(1)}, + {"b", int32(2)}, + {"c", int32(3)}, + } + expectedReadTwo := bson.D{ + {"a", int32(3)}, + {"b", 4.6}, + {"c", int32(5)}, + } + fileHandle, err := os.Open("testdata/test.tsv") + So(err, ShouldBeNil) + r := NewTSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false) + docChan := make(chan bson.D, 50) + So(r.StreamDocument(true, docChan), ShouldBeNil) + So(<-docChan, ShouldResemble, expectedReadOne) + So(<-docChan, ShouldResemble, expectedReadTwo) + }) + }) +} + +func TestTSVReadAndValidateHeader(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a TSV input reader", t, func() { + Convey("setting the header should read the first line of the TSV", func() { + contents := "extraHeader1\textraHeader2\textraHeader3\n" + colSpecs := []ColumnSpec{} + r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false) + So(r.ReadAndValidateHeader(), ShouldBeNil) + So(len(r.colSpecs), ShouldEqual, 3) + }) + }) +} + +func TestTSVConvert(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + Convey("With a TSV input reader", t, func() { + Convey("calling convert on a TSVConverter should return the expected BSON document", func() { + tsvConverter := TSVConverter{ + colSpecs: []ColumnSpec{ + {"field1", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field2", new(FieldAutoParser), pgAutoCast, "auto"}, + {"field3", new(FieldAutoParser), pgAutoCast, "auto"}, + }, + data: "a\tb\tc", + index: uint64(0), + } + expectedDocument := bson.D{ + {"field1", "a"}, + {"field2", "b"}, + {"field3", "c"}, + } + document, err := tsvConverter.Convert() + So(err, ShouldBeNil) + So(document, ShouldResemble, expectedDocument) + }) + }) +} diff --git a/src/mongo/gotools/mongoimport/typed_fields.go b/src/mongo/gotools/mongoimport/typed_fields.go new file mode 100644 index 00000000000..aa1bb1c0e0a --- /dev/null +++ b/src/mongo/gotools/mongoimport/typed_fields.go @@ -0,0 +1,284 @@ +package mongoimport + +import ( + "encoding/base32" + "encoding/base64" + "encoding/hex" + "fmt" + "math" + "regexp" + "strconv" + "strings" + "time" + + "github.com/mongodb/mongo-tools/mongoimport/dateconv" + "gopkg.in/mgo.v2/bson" +) + +// columnType defines different types for columns that can be parsed distinctly +type columnType int + +const ( + ctAuto columnType = iota + ctBinary + ctBoolean + ctDate + ctDateGo + ctDateMS + ctDateOracle + ctDouble + ctInt32 + ctInt64 + ctDecimal + ctString +) + +var ( + columnTypeRE = regexp.MustCompile(`(?s)^(.*)\.(\w+)\((.*)\)$`) + columnTypeNameMap = map[string]columnType{ + "auto": ctAuto, + "binary": ctBinary, + "boolean": ctBoolean, + "date": ctDate, + "decimal": ctDecimal, + "date_go": ctDateGo, + "date_ms": ctDateMS, + "date_oracle": ctDateOracle, + "double": ctDouble, + "int32": ctInt32, + "int64": ctInt64, + "string": ctString, + } +) + +type binaryEncoding int + +const ( + beBase64 binaryEncoding = iota + beBase32 + beHex +) + +var binaryEncodingNameMap = map[string]binaryEncoding{ + "base64": beBase64, + "base32": beBase32, + "hex": beHex, +} + +// ColumnSpec keeps information for each 'column' of import. +type ColumnSpec struct { + Name string + Parser FieldParser + ParseGrace ParseGrace + TypeName string +} + +// ColumnNames maps a ColumnSpec slice to their associated names +func ColumnNames(fs []ColumnSpec) (s []string) { + for _, f := range fs { + s = append(s, f.Name) + } + return +} + +// ParseTypedHeader produces a ColumnSpec from a header item, extracting type +// information from the it. The parseGrace is passed along to the new ColumnSpec. +func ParseTypedHeader(header string, parseGrace ParseGrace) (f ColumnSpec, err error) { + match := columnTypeRE.FindStringSubmatch(header) + if len(match) != 4 { + err = fmt.Errorf("could not parse type from header %s", header) + return + } + t, ok := columnTypeNameMap[match[2]] + if !ok { + err = fmt.Errorf("invalid type %s in header %s", match[2], header) + return + } + p, err := NewFieldParser(t, match[3]) + if err != nil { + return + } + return ColumnSpec{match[1], p, parseGrace, match[2]}, nil +} + +// ParseTypedHeaders performs ParseTypedHeader on each item, returning an +// error if any single one fails. +func ParseTypedHeaders(headers []string, parseGrace ParseGrace) (fs []ColumnSpec, err error) { + fs = make([]ColumnSpec, len(headers)) + for i, f := range headers { + fs[i], err = ParseTypedHeader(f, parseGrace) + if err != nil { + return + } + } + return +} + +// ParseAutoHeaders converts a list of header items to ColumnSpec objects, with +// automatic parsers. +func ParseAutoHeaders(headers []string) (fs []ColumnSpec) { + fs = make([]ColumnSpec, len(headers)) + for i, f := range headers { + fs[i] = ColumnSpec{f, new(FieldAutoParser), pgAutoCast, "auto"} + } + return +} + +// FieldParser is the interface for any parser of a field item. +type FieldParser interface { + Parse(in string) (interface{}, error) +} + +var ( + escapeReplacements = []string{ + `\\`, `\`, + `\(`, "(", + `\)`, ")", + `\`, "", + } + escapeReplacer = strings.NewReplacer(escapeReplacements...) +) + +// NewFieldParser yields a FieldParser corresponding to the given columnType. +// arg is passed along to the specific type's parser, if it permits an +// argument. An error will be raised if arg is not valid for the type's +// parser. +func NewFieldParser(t columnType, arg string) (parser FieldParser, err error) { + arg = escapeReplacer.Replace(arg) + + switch t { // validate argument + case ctBinary: + case ctDate: + case ctDateGo: + case ctDateMS: + case ctDateOracle: + default: + if arg != "" { + err = fmt.Errorf("type %v does not support arguments", t) + return + } + } + + switch t { + case ctBinary: + parser, err = NewFieldBinaryParser(arg) + case ctBoolean: + parser = new(FieldBooleanParser) + case ctDate: + fallthrough + case ctDateGo: + parser = &FieldDateParser{arg} + case ctDateMS: + parser = &FieldDateParser{dateconv.FromMS(arg)} + case ctDateOracle: + parser = &FieldDateParser{dateconv.FromOracle(arg)} + case ctDouble: + parser = new(FieldDoubleParser) + case ctInt32: + parser = new(FieldInt32Parser) + case ctInt64: + parser = new(FieldInt64Parser) + case ctDecimal: + parser = new(FieldDecimalParser) + case ctString: + parser = new(FieldStringParser) + default: // ctAuto + parser = new(FieldAutoParser) + } + return +} + +func autoParse(in string) interface{} { + parsedInt, err := strconv.ParseInt(in, 10, 64) + if err == nil { + if math.MinInt32 <= parsedInt && parsedInt <= math.MaxInt32 { + return int32(parsedInt) + } + return parsedInt + } + parsedFloat, err := strconv.ParseFloat(in, 64) + if err == nil { + return parsedFloat + } + return in +} + +type FieldAutoParser struct{} + +func (ap *FieldAutoParser) Parse(in string) (interface{}, error) { + return autoParse(in), nil +} + +type FieldBinaryParser struct { + enc binaryEncoding +} + +func (bp *FieldBinaryParser) Parse(in string) (interface{}, error) { + switch bp.enc { + case beBase32: + return base32.StdEncoding.DecodeString(in) + case beBase64: + return base64.StdEncoding.DecodeString(in) + default: // beHex + return hex.DecodeString(in) + } +} + +func NewFieldBinaryParser(arg string) (*FieldBinaryParser, error) { + enc, ok := binaryEncodingNameMap[arg] + if !ok { + return nil, fmt.Errorf("invalid binary encoding: %s", arg) + } + return &FieldBinaryParser{enc}, nil +} + +type FieldBooleanParser struct{} + +func (bp *FieldBooleanParser) Parse(in string) (interface{}, error) { + if strings.ToLower(in) == "true" || in == "1" { + return true, nil + } + if strings.ToLower(in) == "false" || in == "0" { + return false, nil + } + return nil, fmt.Errorf("failed to parse boolean: %s", in) +} + +type FieldDateParser struct { + layout string +} + +func (dp *FieldDateParser) Parse(in string) (interface{}, error) { + return time.Parse(dp.layout, in) +} + +type FieldDoubleParser struct{} + +func (dp *FieldDoubleParser) Parse(in string) (interface{}, error) { + return strconv.ParseFloat(in, 64) +} + +type FieldInt32Parser struct{} + +func (ip *FieldInt32Parser) Parse(in string) (interface{}, error) { + value, err := strconv.ParseInt(in, 10, 32) + return int32(value), err +} + +type FieldInt64Parser struct{} + +func (ip *FieldInt64Parser) Parse(in string) (interface{}, error) { + return strconv.ParseInt(in, 10, 64) +} + +type FieldDecimalParser struct{} + +func (ip *FieldDecimalParser) Parse(in string) (interface{}, error) { + return bson.ParseDecimal128(in) +} + +type FieldStringParser struct{} + +func (sp *FieldStringParser) Parse(in string) (interface{}, error) { + return in, nil +} diff --git a/src/mongo/gotools/mongoimport/typed_fields_test.go b/src/mongo/gotools/mongoimport/typed_fields_test.go new file mode 100644 index 00000000000..073710c5762 --- /dev/null +++ b/src/mongo/gotools/mongoimport/typed_fields_test.go @@ -0,0 +1,407 @@ +package mongoimport + +import ( + "github.com/mongodb/mongo-tools/common/log" + "github.com/mongodb/mongo-tools/common/options" + "github.com/mongodb/mongo-tools/common/testutil" + . "github.com/smartystreets/goconvey/convey" + "gopkg.in/mgo.v2/bson" + "testing" + "time" +) + +func init() { + log.SetVerbosity(&options.Verbosity{ + VLevel: 4, + }) +} + +func TestTypedHeaderParser(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Using 'zip.string(),number.double(),foo.auto()'", t, func() { + var headers = []string{"zip.string()", "number.double()", "foo.auto()", `bar.date(January 2\, \(2006\))`} + var colSpecs []ColumnSpec + var err error + + Convey("with parse grace: auto", func() { + colSpecs, err = ParseTypedHeaders(headers, pgAutoCast) + So(colSpecs, ShouldResemble, []ColumnSpec{ + {"zip", new(FieldStringParser), pgAutoCast, "string"}, + {"number", new(FieldDoubleParser), pgAutoCast, "double"}, + {"foo", new(FieldAutoParser), pgAutoCast, "auto"}, + {"bar", &FieldDateParser{"January 2, (2006)"}, pgAutoCast, "date"}, + }) + So(err, ShouldBeNil) + }) + Convey("with parse grace: skipRow", func() { + colSpecs, err = ParseTypedHeaders(headers, pgSkipRow) + So(colSpecs, ShouldResemble, []ColumnSpec{ + {"zip", new(FieldStringParser), pgSkipRow, "string"}, + {"number", new(FieldDoubleParser), pgSkipRow, "double"}, + {"foo", new(FieldAutoParser), pgSkipRow, "auto"}, + {"bar", &FieldDateParser{"January 2, (2006)"}, pgSkipRow, "date"}, + }) + So(err, ShouldBeNil) + }) + }) + + Convey("Using various bad headers", t, func() { + var err error + + Convey("with non-empty arguments for types that don't want them", func() { + _, err = ParseTypedHeader("zip.string(blah)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.string(0)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.int32(0)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.int64(0)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.double(0)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.auto(0)", pgAutoCast) + So(err, ShouldNotBeNil) + }) + Convey("with bad arguments for the binary type", func() { + _, err = ParseTypedHeader("zip.binary(blah)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.binary(binary)", pgAutoCast) + So(err, ShouldNotBeNil) + _, err = ParseTypedHeader("zip.binary(decimal)", pgAutoCast) + So(err, ShouldNotBeNil) + }) + }) +} + +func TestAutoHeaderParser(t *testing.T) { + Convey("Using 'zip,number'", t, func() { + var headers = []string{"zip", "number", "foo"} + var colSpecs = ParseAutoHeaders(headers) + So(colSpecs, ShouldResemble, []ColumnSpec{ + {"zip", new(FieldAutoParser), pgAutoCast, "auto"}, + {"number", new(FieldAutoParser), pgAutoCast, "auto"}, + {"foo", new(FieldAutoParser), pgAutoCast, "auto"}, + }) + }) +} + +func TestFieldParsers(t *testing.T) { + testutil.VerifyTestType(t, testutil.UnitTestType) + + Convey("Using FieldAutoParser", t, func() { + var p, _ = NewFieldParser(ctAuto, "") + var value interface{} + var err error + + Convey("parses integers when it can", func() { + value, err = p.Parse("2147483648") + So(value.(int64), ShouldEqual, int64(2147483648)) + So(err, ShouldBeNil) + value, err = p.Parse("42") + So(value.(int32), ShouldEqual, 42) + So(err, ShouldBeNil) + value, err = p.Parse("-2147483649") + So(value.(int64), ShouldEqual, int64(-2147483649)) + }) + Convey("parses decimals when it can", func() { + value, err = p.Parse("3.14159265") + So(value.(float64), ShouldEqual, 3.14159265) + So(err, ShouldBeNil) + value, err = p.Parse("0.123123") + So(value.(float64), ShouldEqual, 0.123123) + So(err, ShouldBeNil) + value, err = p.Parse("-123456.789") + So(value.(float64), ShouldEqual, -123456.789) + So(err, ShouldBeNil) + value, err = p.Parse("-1.") + So(value.(float64), ShouldEqual, -1.0) + So(err, ShouldBeNil) + }) + Convey("leaves everything else as a string", func() { + value, err = p.Parse("12345-6789") + So(value.(string), ShouldEqual, "12345-6789") + So(err, ShouldBeNil) + value, err = p.Parse("06/02/1997") + So(value.(string), ShouldEqual, "06/02/1997") + So(err, ShouldBeNil) + value, err = p.Parse("") + So(value.(string), ShouldEqual, "") + So(err, ShouldBeNil) + }) + }) + + Convey("Using FieldBooleanParser", t, func() { + var p, _ = NewFieldParser(ctBoolean, "") + var value interface{} + var err error + + Convey("parses representations of true correctly", func() { + value, err = p.Parse("true") + So(value.(bool), ShouldBeTrue) + So(err, ShouldBeNil) + value, err = p.Parse("TrUe") + So(value.(bool), ShouldBeTrue) + So(err, ShouldBeNil) + value, err = p.Parse("1") + So(value.(bool), ShouldBeTrue) + So(err, ShouldBeNil) + }) + Convey("parses representations of false correctly", func() { + value, err = p.Parse("false") + So(value.(bool), ShouldBeFalse) + So(err, ShouldBeNil) + value, err = p.Parse("FaLsE") + So(value.(bool), ShouldBeFalse) + So(err, ShouldBeNil) + value, err = p.Parse("0") + So(value.(bool), ShouldBeFalse) + So(err, ShouldBeNil) + }) + Convey("does not parse other boolean representations", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("t") + So(err, ShouldNotBeNil) + _, err = p.Parse("f") + So(err, ShouldNotBeNil) + _, err = p.Parse("yes") + So(err, ShouldNotBeNil) + _, err = p.Parse("no") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldBinaryParser", t, func() { + var value interface{} + var err error + + Convey("using hex encoding", func() { + var p, _ = NewFieldParser(ctBinary, "hex") + Convey("parses valid hex values correctly", func() { + value, err = p.Parse("400a11") + So(value.([]byte), ShouldResemble, []byte{64, 10, 17}) + So(err, ShouldBeNil) + value, err = p.Parse("400A11") + So(value.([]byte), ShouldResemble, []byte{64, 10, 17}) + So(err, ShouldBeNil) + value, err = p.Parse("0b400A11") + So(value.([]byte), ShouldResemble, []byte{11, 64, 10, 17}) + So(err, ShouldBeNil) + value, err = p.Parse("") + So(value.([]byte), ShouldResemble, []byte{}) + So(err, ShouldBeNil) + }) + }) + Convey("using base32 encoding", func() { + var p, _ = NewFieldParser(ctBinary, "base32") + Convey("parses valid base32 values correctly", func() { + value, err = p.Parse("") + So(value.([]uint8), ShouldResemble, []uint8{}) + So(err, ShouldBeNil) + value, err = p.Parse("MZXW6YTBOI======") + So(value.([]uint8), ShouldResemble, []uint8{102, 111, 111, 98, 97, 114}) + So(err, ShouldBeNil) + }) + }) + Convey("using base64 encoding", func() { + var p, _ = NewFieldParser(ctBinary, "base64") + Convey("parses valid base64 values correctly", func() { + value, err = p.Parse("") + So(value.([]uint8), ShouldResemble, []uint8{}) + So(err, ShouldBeNil) + value, err = p.Parse("Zm9vYmFy") + So(value.([]uint8), ShouldResemble, []uint8{102, 111, 111, 98, 97, 114}) + So(err, ShouldBeNil) + }) + }) + }) + + Convey("Using FieldDateParser", t, func() { + var value interface{} + var err error + + Convey("with Go's format", func() { + var p, _ = NewFieldParser(ctDateGo, "01/02/2006 3:04:05pm MST") + Convey("parses valid timestamps correctly", func() { + value, err = p.Parse("01/04/2000 5:38:10pm UTC") + So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC)) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid dates", func() { + _, err = p.Parse("01/04/2000 5:38:10pm") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 5:38:10 pm UTC") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000") + So(err, ShouldNotBeNil) + }) + }) + Convey("with MS's format", func() { + var p, _ = NewFieldParser(ctDateMS, "MM/dd/yyyy h:mm:sstt") + Convey("parses valid timestamps correctly", func() { + value, err = p.Parse("01/04/2000 5:38:10PM") + So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC)) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid dates", func() { + _, err = p.Parse("01/04/2000 :) 05:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 005:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 5:38:10 PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000") + So(err, ShouldNotBeNil) + }) + }) + Convey("with Oracle's format", func() { + var p, _ = NewFieldParser(ctDateOracle, "mm/Dd/yYYy hh:MI:SsAm") + Convey("parses valid timestamps correctly", func() { + value, err = p.Parse("01/04/2000 05:38:10PM") + So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC)) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid dates", func() { + _, err = p.Parse("01/04/2000 :) 05:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 005:38:10PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000 5:38:10 PM") + So(err, ShouldNotBeNil) + _, err = p.Parse("01/04/2000") + So(err, ShouldNotBeNil) + }) + }) + }) + + Convey("Using FieldDoubleParser", t, func() { + var p, _ = NewFieldParser(ctDouble, "") + var value interface{} + var err error + + Convey("parses valid decimal values correctly", func() { + value, err = p.Parse("3.14159265") + So(value.(float64), ShouldEqual, 3.14159265) + So(err, ShouldBeNil) + value, err = p.Parse("0.123123") + So(value.(float64), ShouldEqual, 0.123123) + So(err, ShouldBeNil) + value, err = p.Parse("-123456.789") + So(value.(float64), ShouldEqual, -123456.789) + So(err, ShouldBeNil) + value, err = p.Parse("-1.") + So(value.(float64), ShouldEqual, -1.0) + So(err, ShouldBeNil) + }) + Convey("does not parse invalid numbers", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("1.1.1") + So(err, ShouldNotBeNil) + _, err = p.Parse("1-2.0") + So(err, ShouldNotBeNil) + _, err = p.Parse("80-") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldInt32Parser", t, func() { + var p, _ = NewFieldParser(ctInt32, "") + var value interface{} + var err error + + Convey("parses valid integer values correctly", func() { + value, err = p.Parse("2147483647") + So(value.(int32), ShouldEqual, 2147483647) + So(err, ShouldBeNil) + value, err = p.Parse("42") + So(value.(int32), ShouldEqual, 42) + So(err, ShouldBeNil) + value, err = p.Parse("-2147483648") + So(value.(int32), ShouldEqual, -2147483648) + }) + Convey("does not parse invalid numbers", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("42.0") + So(err, ShouldNotBeNil) + _, err = p.Parse("1-2") + So(err, ShouldNotBeNil) + _, err = p.Parse("80-") + So(err, ShouldNotBeNil) + value, err = p.Parse("2147483648") + So(err, ShouldNotBeNil) + value, err = p.Parse("-2147483649") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldInt64Parser", t, func() { + var p, _ = NewFieldParser(ctInt64, "") + var value interface{} + var err error + + Convey("parses valid integer values correctly", func() { + value, err = p.Parse("2147483648") + So(value.(int64), ShouldEqual, int64(2147483648)) + So(err, ShouldBeNil) + value, err = p.Parse("42") + So(value.(int64), ShouldEqual, 42) + So(err, ShouldBeNil) + value, err = p.Parse("-2147483649") + So(value.(int64), ShouldEqual, int64(-2147483649)) + }) + Convey("does not parse invalid numbers", func() { + _, err = p.Parse("") + So(err, ShouldNotBeNil) + _, err = p.Parse("42.0") + So(err, ShouldNotBeNil) + _, err = p.Parse("1-2") + So(err, ShouldNotBeNil) + _, err = p.Parse("80-") + So(err, ShouldNotBeNil) + }) + }) + + Convey("Using FieldDecimalParser", t, func() { + var p, _ = NewFieldParser(ctDecimal, "") + var err error + + Convey("parses valid decimal values correctly", func() { + for _, ts := range []string{"12235.2355", "42", "0", "-124", "-124.55"} { + testVal, err := bson.ParseDecimal128(ts) + So(err, ShouldBeNil) + parsedValue, err := p.Parse(ts) + So(err, ShouldBeNil) + + So(testVal, ShouldResemble, parsedValue.(bson.Decimal128)) + } + }) + Convey("does not parse invalid decimal values", func() { + for _, ts := range []string{"", "1-2", "abcd"} { + _, err = p.Parse(ts) + So(err, ShouldNotBeNil) + } + }) + }) + + Convey("Using FieldStringParser", t, func() { + var p, _ = NewFieldParser(ctString, "") + var value interface{} + var err error + + Convey("parses strings as strings only", func() { + value, err = p.Parse("42") + So(value.(string), ShouldEqual, "42") + So(err, ShouldBeNil) + value, err = p.Parse("true") + So(value.(string), ShouldEqual, "true") + So(err, ShouldBeNil) + value, err = p.Parse("") + So(value.(string), ShouldEqual, "") + So(err, ShouldBeNil) + }) + }) + +} |