summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/mongoimport
diff options
context:
space:
mode:
authorRamon Fernandez <ramon@mongodb.com>2016-08-25 16:34:34 -0400
committerRamon Fernandez <ramon@mongodb.com>2016-08-25 16:54:18 -0400
commitc330c9991ab45e7d0685d53e699ef26dba065660 (patch)
tree3dc5cd06b5f6c7eaaa4cb20cbe763504c14a772b /src/mongo/gotools/mongoimport
parenteb62b862d5ebf179a1bcd9f394070e69c30188ab (diff)
downloadmongo-c330c9991ab45e7d0685d53e699ef26dba065660.tar.gz
Import tools: 5b883d86fdb4df55036d5dba2ca6f9dfa0750b44 from branch v3.3
ref: 1ac1389bda..5b883d86fd for: 3.3.12 SERVER-25814 Initial vendor import: tools
Diffstat (limited to 'src/mongo/gotools/mongoimport')
-rw-r--r--src/mongo/gotools/mongoimport/common.go472
-rw-r--r--src/mongo/gotools/mongoimport/common_test.go600
-rw-r--r--src/mongo/gotools/mongoimport/csv.go151
-rw-r--r--src/mongo/gotools/mongoimport/csv/reader.go363
-rw-r--r--src/mongo/gotools/mongoimport/csv_test.go354
-rw-r--r--src/mongo/gotools/mongoimport/dateconv/dateconv.go77
-rw-r--r--src/mongo/gotools/mongoimport/json.go239
-rw-r--r--src/mongo/gotools/mongoimport/json_test.go264
-rw-r--r--src/mongo/gotools/mongoimport/main/mongoimport.go86
-rw-r--r--src/mongo/gotools/mongoimport/mongoimport.go575
-rw-r--r--src/mongo/gotools/mongoimport/mongoimport_test.go757
-rw-r--r--src/mongo/gotools/mongoimport/options.go79
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test.csv3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test.tsv3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_array.json31
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_bad.csv3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_blanks.csv3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_bom.csv2
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_bom.json2
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_bom.tsv1
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_duplicate.csv5
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_plain.json3
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_plain2.json10
-rw-r--r--src/mongo/gotools/mongoimport/testdata/test_type.csv4
-rw-r--r--src/mongo/gotools/mongoimport/tsv.go163
-rw-r--r--src/mongo/gotools/mongoimport/tsv_test.go232
-rw-r--r--src/mongo/gotools/mongoimport/typed_fields.go284
-rw-r--r--src/mongo/gotools/mongoimport/typed_fields_test.go407
30 files changed, 5179 insertions, 0 deletions
diff --git a/src/mongo/gotools/mongoimport/common.go b/src/mongo/gotools/mongoimport/common.go
new file mode 100644
index 00000000000..cd7c4d333e8
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/common.go
@@ -0,0 +1,472 @@
+package mongoimport
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "github.com/mongodb/mongo-tools/common/bsonutil"
+ "github.com/mongodb/mongo-tools/common/db"
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/util"
+ "gopkg.in/mgo.v2/bson"
+ "gopkg.in/tomb.v2"
+)
+
+type ParseGrace int
+
+const (
+ pgAutoCast ParseGrace = iota
+ pgSkipField
+ pgSkipRow
+ pgStop
+)
+
+// ValidatePG ensures the user-provided parseGrace is one of the allowed
+// values.
+func ValidatePG(pg string) (ParseGrace, error) {
+ switch pg {
+ case "autoCast":
+ return pgAutoCast, nil
+ case "skipField":
+ return pgSkipField, nil
+ case "skipRow":
+ return pgSkipRow, nil
+ case "stop":
+ return pgStop, nil
+ default:
+ return pgAutoCast, fmt.Errorf("invalid parse grace: %s", pg)
+ }
+}
+
+// ParsePG interprets the user-provided parseGrace, assuming it is valid.
+func ParsePG(pg string) (res ParseGrace) {
+ res, _ = ValidatePG(pg)
+ return
+}
+
+// Converter is an interface that adds the basic Convert method which returns a
+// valid BSON document that has been converted by the underlying implementation.
+// If conversion fails, err will be set.
+type Converter interface {
+ Convert() (document bson.D, err error)
+}
+
+// An importWorker reads Converter from the unprocessedDataChan channel and
+// sends processed BSON documents on the processedDocumentChan channel
+type importWorker struct {
+ // unprocessedDataChan is used to stream the input data for a worker to process
+ unprocessedDataChan chan Converter
+
+ // used to stream the processed document back to the caller
+ processedDocumentChan chan bson.D
+
+ // used to synchronise all worker goroutines
+ tomb *tomb.Tomb
+}
+
+// an interface for tracking the number of bytes, which is used in mongoimport to feed
+// the progress bar.
+type sizeTracker interface {
+ Size() int64
+}
+
+// sizeTrackingReader implements Reader and sizeTracker by wrapping an io.Reader and keeping track
+// of the total number of bytes read from each call to Read().
+type sizeTrackingReader struct {
+ bytesRead int64
+ reader io.Reader
+}
+
+func (str *sizeTrackingReader) Size() int64 {
+ bytes := atomic.LoadInt64(&str.bytesRead)
+ return bytes
+}
+
+func (str *sizeTrackingReader) Read(p []byte) (n int, err error) {
+ n, err = str.reader.Read(p)
+ atomic.AddInt64(&str.bytesRead, int64(n))
+ return
+}
+
+func newSizeTrackingReader(reader io.Reader) *sizeTrackingReader {
+ return &sizeTrackingReader{
+ reader: reader,
+ bytesRead: 0,
+ }
+}
+
+var (
+ UTF8_BOM = []byte{0xEF, 0xBB, 0xBF}
+)
+
+// bomDiscardingReader implements and wraps io.Reader, discarding the UTF-8 BOM, if applicable
+type bomDiscardingReader struct {
+ buf *bufio.Reader
+ didRead bool
+}
+
+func (bd *bomDiscardingReader) Read(p []byte) (int, error) {
+ if !bd.didRead {
+ bom, err := bd.buf.Peek(3)
+ if err == nil && bytes.Equal(bom, UTF8_BOM) {
+ bd.buf.Read(make([]byte, 3)) // discard BOM
+ }
+ bd.didRead = true
+ }
+ return bd.buf.Read(p)
+}
+
+func newBomDiscardingReader(r io.Reader) *bomDiscardingReader {
+ return &bomDiscardingReader{buf: bufio.NewReader(r)}
+}
+
+// channelQuorumError takes a channel and a quorum - which specifies how many
+// messages to receive on that channel before returning. It either returns the
+// first non-nil error received on the channel or nil if up to `quorum` nil
+// errors are received
+func channelQuorumError(ch <-chan error, quorum int) (err error) {
+ for i := 0; i < quorum; i++ {
+ if err = <-ch; err != nil {
+ return
+ }
+ }
+ return
+}
+
+// constructUpsertDocument constructs a BSON document to use for upserts
+func constructUpsertDocument(upsertFields []string, document bson.D) bson.D {
+ upsertDocument := bson.D{}
+ var hasDocumentKey bool
+ for _, key := range upsertFields {
+ val := getUpsertValue(key, document)
+ if val != nil {
+ hasDocumentKey = true
+ }
+ upsertDocument = append(upsertDocument, bson.DocElem{Name: key, Value: val})
+ }
+ if !hasDocumentKey {
+ return nil
+ }
+ return upsertDocument
+}
+
+// doSequentialStreaming takes a slice of workers, a readDocs (input) channel and
+// an outputChan (output) channel. It sequentially writes unprocessed data read from
+// the input channel to each worker and then sequentially reads the processed data
+// from each worker before passing it on to the output channel
+func doSequentialStreaming(workers []*importWorker, readDocs chan Converter, outputChan chan bson.D) {
+ numWorkers := len(workers)
+
+ // feed in the data to be processed and do round-robin
+ // reads from each worker once processing is completed
+ go func() {
+ i := 0
+ for doc := range readDocs {
+ workers[i].unprocessedDataChan <- doc
+ i = (i + 1) % numWorkers
+ }
+
+ // close the read channels of all the workers
+ for i := 0; i < numWorkers; i++ {
+ close(workers[i].unprocessedDataChan)
+ }
+ }()
+
+ // coordinate the order in which the documents are sent over to the
+ // main output channel
+ numDoneWorkers := 0
+ i := 0
+ for {
+ processedDocument, open := <-workers[i].processedDocumentChan
+ if open {
+ outputChan <- processedDocument
+ } else {
+ numDoneWorkers++
+ }
+ if numDoneWorkers == numWorkers {
+ break
+ }
+ i = (i + 1) % numWorkers
+ }
+}
+
+// getUpsertValue takes a given BSON document and a given field, and returns the
+// field's associated value in the document. The field is specified using dot
+// notation for nested fields. e.g. "person.age" would return 34 would return
+// 34 in the document: bson.M{"person": bson.M{"age": 34}} whereas,
+// "person.name" would return nil
+func getUpsertValue(field string, document bson.D) interface{} {
+ index := strings.Index(field, ".")
+ if index == -1 {
+ // grab the value (ignoring errors because we are okay with nil)
+ val, _ := bsonutil.FindValueByKey(field, &document)
+ return val
+ }
+ // recurse into subdocuments
+ left := field[0:index]
+ subDoc, _ := bsonutil.FindValueByKey(left, &document)
+ if subDoc == nil {
+ return nil
+ }
+ subDocD, ok := subDoc.(bson.D)
+ if !ok {
+ return nil
+ }
+ return getUpsertValue(field[index+1:], subDocD)
+}
+
+// filterIngestError accepts a boolean indicating if a non-nil error should be,
+// returned as an actual error.
+//
+// If the error indicates an unreachable server, it returns that immediately.
+//
+// If the error indicates an invalid write concern was passed, it returns nil
+//
+// If the error is not nil, it logs the error. If the error is an io.EOF error -
+// indicating a lost connection to the server, it sets the error as such.
+//
+func filterIngestError(stopOnError bool, err error) error {
+ if err == nil {
+ return nil
+ }
+ if err.Error() == io.EOF.Error() {
+ return fmt.Errorf(db.ErrLostConnection)
+ }
+ if stopOnError || db.IsConnectionError(err) {
+ return err
+ }
+ log.Logvf(log.Always, "error inserting documents: %v", err)
+ return nil
+}
+
+// removeBlankFields takes document and returns a new copy in which
+// fields with empty/blank values are removed
+func removeBlankFields(document bson.D) (newDocument bson.D) {
+ for _, keyVal := range document {
+ if val, ok := keyVal.Value.(*bson.D); ok {
+ keyVal.Value = removeBlankFields(*val)
+ }
+ if val, ok := keyVal.Value.(string); ok && val == "" {
+ continue
+ }
+ if val, ok := keyVal.Value.(bson.D); ok && val == nil {
+ continue
+ }
+ newDocument = append(newDocument, keyVal)
+ }
+ return newDocument
+}
+
+// setNestedValue takes a nested field - in the form "a.b.c" -
+// its associated value, and a document. It then assigns that
+// value to the appropriate nested field within the document
+func setNestedValue(key string, value interface{}, document *bson.D) {
+ index := strings.Index(key, ".")
+ if index == -1 {
+ *document = append(*document, bson.DocElem{Name: key, Value: value})
+ return
+ }
+ keyName := key[0:index]
+ subDocument := &bson.D{}
+ elem, err := bsonutil.FindValueByKey(keyName, document)
+ if err != nil { // no such key in the document
+ elem = nil
+ }
+ var existingKey bool
+ if elem != nil {
+ subDocument = elem.(*bson.D)
+ existingKey = true
+ }
+ setNestedValue(key[index+1:], value, subDocument)
+ if !existingKey {
+ *document = append(*document, bson.DocElem{Name: keyName, Value: subDocument})
+ }
+}
+
+// streamDocuments concurrently processes data gotten from the inputChan
+// channel in parallel and then sends over the processed data to the outputChan
+// channel - either in sequence or concurrently (depending on the value of
+// ordered) - in which the data was received
+func streamDocuments(ordered bool, numDecoders int, readDocs chan Converter, outputChan chan bson.D) (retErr error) {
+ if numDecoders == 0 {
+ numDecoders = 1
+ }
+ var importWorkers []*importWorker
+ wg := new(sync.WaitGroup)
+ importTomb := new(tomb.Tomb)
+ inChan := readDocs
+ outChan := outputChan
+ for i := 0; i < numDecoders; i++ {
+ if ordered {
+ inChan = make(chan Converter, workerBufferSize)
+ outChan = make(chan bson.D, workerBufferSize)
+ }
+ iw := &importWorker{
+ unprocessedDataChan: inChan,
+ processedDocumentChan: outChan,
+ tomb: importTomb,
+ }
+ importWorkers = append(importWorkers, iw)
+ wg.Add(1)
+ go func(iw importWorker) {
+ defer wg.Done()
+ // only set the first worker error and cause sibling goroutines
+ // to terminate immediately
+ err := iw.processDocuments(ordered)
+ if err != nil && retErr == nil {
+ retErr = err
+ iw.tomb.Kill(err)
+ }
+ }(*iw)
+ }
+
+ // if ordered, we have to coordinate the sequence in which processed
+ // documents are passed to the main read channel
+ if ordered {
+ doSequentialStreaming(importWorkers, readDocs, outputChan)
+ }
+ wg.Wait()
+ close(outputChan)
+ return
+}
+
+// coercionError should only be used as a specific error type to check
+// whether tokensToBSON wants the row to print
+type coercionError struct{}
+
+func (coercionError) Error() string { return "coercionError" }
+
+// tokensToBSON reads in slice of records - along with ordered column names -
+// and returns a BSON document for the record.
+func tokensToBSON(colSpecs []ColumnSpec, tokens []string, numProcessed uint64, ignoreBlanks bool) (bson.D, error) {
+ log.Logvf(log.DebugHigh, "got line: %v", tokens)
+ var parsedValue interface{}
+ document := bson.D{}
+ for index, token := range tokens {
+ if token == "" && ignoreBlanks {
+ continue
+ }
+ if index < len(colSpecs) {
+ parsedValue, err := colSpecs[index].Parser.Parse(token)
+ if err != nil {
+ log.Logvf(log.DebugHigh, "parse failure in document #%d for column '%s',"+
+ "could not parse token '%s' to type %s",
+ numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName)
+ switch colSpecs[index].ParseGrace {
+ case pgAutoCast:
+ parsedValue = autoParse(token)
+ case pgSkipField:
+ continue
+ case pgSkipRow:
+ log.Logvf(log.Always, "skipping row #%d: %v", numProcessed, tokens)
+ return nil, coercionError{}
+ case pgStop:
+ return nil, fmt.Errorf("type coercion failure in document #%d for column '%s', "+
+ "could not parse token '%s' to type %s",
+ numProcessed, colSpecs[index].Name, token, colSpecs[index].TypeName)
+ }
+ }
+ if strings.Index(colSpecs[index].Name, ".") != -1 {
+ setNestedValue(colSpecs[index].Name, parsedValue, &document)
+ } else {
+ document = append(document, bson.DocElem{Name: colSpecs[index].Name, Value: parsedValue})
+ }
+ } else {
+ parsedValue = autoParse(token)
+ key := "field" + strconv.Itoa(index)
+ if util.StringSliceContains(ColumnNames(colSpecs), key) {
+ return nil, fmt.Errorf("duplicate field name - on %v - for token #%v ('%v') in document #%v",
+ key, index+1, parsedValue, numProcessed)
+ }
+ document = append(document, bson.DocElem{Name: key, Value: parsedValue})
+ }
+ }
+ return document, nil
+}
+
+// validateFields takes a slice of fields and returns an error if the fields
+// are invalid, returns nil otherwise
+func validateFields(fields []string) error {
+ fieldsCopy := make([]string, len(fields), len(fields))
+ copy(fieldsCopy, fields)
+ sort.Sort(sort.StringSlice(fieldsCopy))
+
+ for index, field := range fieldsCopy {
+ if strings.HasSuffix(field, ".") {
+ return fmt.Errorf("field '%v' cannot end with a '.'", field)
+ }
+ if strings.HasPrefix(field, ".") {
+ return fmt.Errorf("field '%v' cannot start with a '.'", field)
+ }
+ if strings.HasPrefix(field, "$") {
+ return fmt.Errorf("field '%v' cannot start with a '$'", field)
+ }
+ if strings.Contains(field, "..") {
+ return fmt.Errorf("field '%v' cannot contain consecutive '.' characters", field)
+ }
+ // NOTE: since fields is sorted, this check ensures that no field
+ // is incompatible with another one that occurs further down the list.
+ // meant to prevent cases where we have fields like "a" and "a.c"
+ for _, latterField := range fieldsCopy[index+1:] {
+ // NOTE: this means we will not support imports that have fields that
+ // include e.g. a, a.b
+ if strings.HasPrefix(latterField, field+".") {
+ return fmt.Errorf("fields '%v' and '%v' are incompatible", field, latterField)
+ }
+ // NOTE: this means we will not support imports that have fields like
+ // a, a - since this is invalid in MongoDB
+ if field == latterField {
+ return fmt.Errorf("fields cannot be identical: '%v' and '%v'", field, latterField)
+ }
+ }
+ }
+ return nil
+}
+
+// validateReaderFields is a helper to validate fields for input readers
+func validateReaderFields(fields []string) error {
+ if err := validateFields(fields); err != nil {
+ return err
+ }
+ if len(fields) == 1 {
+ log.Logvf(log.Info, "using field: %v", fields[0])
+ } else {
+ log.Logvf(log.Info, "using fields: %v", strings.Join(fields, ","))
+ }
+ return nil
+}
+
+// processDocuments reads from the Converter channel and for each record, converts it
+// to a bson.D document before sending it on the processedDocumentChan channel. Once the
+// input channel is closed the processed channel is also closed if the worker streams its
+// reads in order
+func (iw *importWorker) processDocuments(ordered bool) error {
+ if ordered {
+ defer close(iw.processedDocumentChan)
+ }
+ for {
+ select {
+ case converter, alive := <-iw.unprocessedDataChan:
+ if !alive {
+ return nil
+ }
+ document, err := converter.Convert()
+ if err != nil {
+ return err
+ }
+ if document == nil {
+ continue
+ }
+ iw.processedDocumentChan <- document
+ case <-iw.tomb.Dying():
+ return nil
+ }
+ }
+}
diff --git a/src/mongo/gotools/mongoimport/common_test.go b/src/mongo/gotools/mongoimport/common_test.go
new file mode 100644
index 00000000000..4745333a927
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/common_test.go
@@ -0,0 +1,600 @@
+package mongoimport
+
+import (
+ "fmt"
+ "io"
+ "testing"
+
+ "github.com/mongodb/mongo-tools/common/db"
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+ "gopkg.in/tomb.v2"
+)
+
+func init() {
+ log.SetVerbosity(&options.Verbosity{
+ VLevel: 4,
+ })
+}
+
+var (
+ index = uint64(0)
+ csvConverters = []CSVConverter{
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field1", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field2", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field3", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"a", "b", "c"},
+ index: index,
+ },
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field4", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field5", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field6", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"d", "e", "f"},
+ index: index,
+ },
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field7", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field8", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field9", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"d", "e", "f"},
+ index: index,
+ },
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field10", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field11", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field12", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"d", "e", "f"},
+ index: index,
+ },
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field13", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field14", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field15", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"d", "e", "f"},
+ index: index,
+ },
+ }
+ expectedDocuments = []bson.D{
+ {
+ {"field1", "a"},
+ {"field2", "b"},
+ {"field3", "c"},
+ }, {
+ {"field4", "d"},
+ {"field5", "e"},
+ {"field6", "f"},
+ }, {
+ {"field7", "d"},
+ {"field8", "e"},
+ {"field9", "f"},
+ }, {
+ {"field10", "d"},
+ {"field11", "e"},
+ {"field12", "f"},
+ }, {
+ {"field13", "d"},
+ {"field14", "e"},
+ {"field15", "f"},
+ },
+ }
+)
+
+func convertBSONDToRaw(documents []bson.D) []bson.Raw {
+ rawBSONDocuments := []bson.Raw{}
+ for _, document := range documents {
+ rawBytes, err := bson.Marshal(document)
+ So(err, ShouldBeNil)
+ rawBSONDocuments = append(rawBSONDocuments, bson.Raw{3, rawBytes})
+ }
+ return rawBSONDocuments
+}
+
+func TestValidateFields(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given an import input, in validating the headers", t, func() {
+ Convey("if the fields contain '..', an error should be thrown", func() {
+ So(validateFields([]string{"a..a"}), ShouldNotBeNil)
+ })
+ Convey("if the fields start/end in a '.', an error should be thrown", func() {
+ So(validateFields([]string{".a"}), ShouldNotBeNil)
+ So(validateFields([]string{"a."}), ShouldNotBeNil)
+ })
+ Convey("if the fields start in a '$', an error should be thrown", func() {
+ So(validateFields([]string{"$.a"}), ShouldNotBeNil)
+ So(validateFields([]string{"$"}), ShouldNotBeNil)
+ So(validateFields([]string{"$a"}), ShouldNotBeNil)
+ So(validateFields([]string{"a$a"}), ShouldBeNil)
+ })
+ Convey("if the fields collide, an error should be thrown", func() {
+ So(validateFields([]string{"a", "a.a"}), ShouldNotBeNil)
+ So(validateFields([]string{"a", "a.ba", "b.a"}), ShouldNotBeNil)
+ So(validateFields([]string{"a", "a.ba", "b.a"}), ShouldNotBeNil)
+ So(validateFields([]string{"a", "a.b.c"}), ShouldNotBeNil)
+ })
+ Convey("if the fields don't collide, no error should be thrown", func() {
+ So(validateFields([]string{"a", "aa"}), ShouldBeNil)
+ So(validateFields([]string{"a", "aa", "b.a", "b.c"}), ShouldBeNil)
+ So(validateFields([]string{"a", "ba", "ab", "b.a"}), ShouldBeNil)
+ So(validateFields([]string{"a", "ba", "ab", "b.a", "b.c.d"}), ShouldBeNil)
+ So(validateFields([]string{"a", "ab.c"}), ShouldBeNil)
+ })
+ Convey("if the fields contain the same keys, an error should be thrown", func() {
+ So(validateFields([]string{"a", "ba", "a"}), ShouldNotBeNil)
+ })
+ })
+}
+
+func TestGetUpsertValue(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given a field and a BSON document, on calling getUpsertValue", t, func() {
+ Convey("the value of the key should be correct for unnested documents", func() {
+ bsonDocument := bson.D{{"a", 3}}
+ So(getUpsertValue("a", bsonDocument), ShouldEqual, 3)
+ })
+ Convey("the value of the key should be correct for nested document fields", func() {
+ inner := bson.D{{"b", 4}}
+ bsonDocument := bson.D{{"a", inner}}
+ So(getUpsertValue("a.b", bsonDocument), ShouldEqual, 4)
+ })
+ Convey("the value of the key should be nil for unnested document "+
+ "fields that do not exist", func() {
+ bsonDocument := bson.D{{"a", 4}}
+ So(getUpsertValue("c", bsonDocument), ShouldBeNil)
+ })
+ Convey("the value of the key should be nil for nested document "+
+ "fields that do not exist", func() {
+ inner := bson.D{{"b", 4}}
+ bsonDocument := bson.D{{"a", inner}}
+ So(getUpsertValue("a.c", bsonDocument), ShouldBeNil)
+ })
+ Convey("the value of the key should be nil for nil document values", func() {
+ So(getUpsertValue("a", bson.D{{"a", nil}}), ShouldBeNil)
+ })
+ })
+}
+
+func TestConstructUpsertDocument(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given a set of upsert fields and a BSON document, on calling "+
+ "constructUpsertDocument", t, func() {
+ Convey("the key/value combination in the upsert document should be "+
+ "correct for unnested documents with single fields", func() {
+ bsonDocument := bson.D{{"a", 3}}
+ upsertFields := []string{"a"}
+ upsertDocument := constructUpsertDocument(upsertFields,
+ bsonDocument)
+ So(upsertDocument, ShouldResemble, bsonDocument)
+ })
+ Convey("the key/value combination in the upsert document should be "+
+ "correct for unnested documents with several fields", func() {
+ bsonDocument := bson.D{{"a", 3}, {"b", "string value"}}
+ upsertFields := []string{"a"}
+ expectedDocument := bson.D{{"a", 3}}
+ upsertDocument := constructUpsertDocument(upsertFields,
+ bsonDocument)
+ So(upsertDocument, ShouldResemble, expectedDocument)
+ })
+ Convey("the key/value combination in the upsert document should be "+
+ "correct for nested documents with several fields", func() {
+ inner := bson.D{{testCollection, 4}}
+ bsonDocument := bson.D{{"a", inner}, {"b", "string value"}}
+ upsertFields := []string{"a.c"}
+ expectedDocument := bson.D{{"a.c", 4}}
+ upsertDocument := constructUpsertDocument(upsertFields,
+ bsonDocument)
+ So(upsertDocument, ShouldResemble, expectedDocument)
+ })
+ Convey("the upsert document should be nil if the key does not exist "+
+ "in the BSON document", func() {
+ bsonDocument := bson.D{{"a", 3}, {"b", "string value"}}
+ upsertFields := []string{testCollection}
+ upsertDocument := constructUpsertDocument(upsertFields, bsonDocument)
+ So(upsertDocument, ShouldBeNil)
+ })
+ })
+}
+
+func TestSetNestedValue(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given a field, its value, and an existing BSON document...", t, func() {
+ b := bson.D{{"c", "d"}}
+ currentDocument := bson.D{
+ {"a", 3},
+ {"b", &b},
+ }
+ Convey("ensure top level fields are set and others, unchanged", func() {
+ testDocument := &currentDocument
+ expectedDocument := bson.DocElem{"c", 4}
+ setNestedValue("c", 4, testDocument)
+ newDocument := *testDocument
+ So(len(newDocument), ShouldEqual, 3)
+ So(newDocument[2], ShouldResemble, expectedDocument)
+ })
+ Convey("ensure new nested top-level fields are set and others, unchanged", func() {
+ testDocument := &currentDocument
+ expectedDocument := bson.D{{"b", "4"}}
+ setNestedValue("c.b", "4", testDocument)
+ newDocument := *testDocument
+ So(len(newDocument), ShouldEqual, 3)
+ So(newDocument[2].Name, ShouldResemble, "c")
+ So(*newDocument[2].Value.(*bson.D), ShouldResemble, expectedDocument)
+ })
+ Convey("ensure existing nested level fields are set and others, unchanged", func() {
+ testDocument := &currentDocument
+ expectedDocument := bson.D{{"c", "d"}, {"d", 9}}
+ setNestedValue("b.d", 9, testDocument)
+ newDocument := *testDocument
+ So(len(newDocument), ShouldEqual, 2)
+ So(newDocument[1].Name, ShouldResemble, "b")
+ So(*newDocument[1].Value.(*bson.D), ShouldResemble, expectedDocument)
+ })
+ Convey("ensure subsequent calls update fields accordingly", func() {
+ testDocument := &currentDocument
+ expectedDocumentOne := bson.D{{"c", "d"}, {"d", 9}}
+ expectedDocumentTwo := bson.DocElem{"f", 23}
+ setNestedValue("b.d", 9, testDocument)
+ newDocument := *testDocument
+ So(len(newDocument), ShouldEqual, 2)
+ So(newDocument[1].Name, ShouldResemble, "b")
+ So(*newDocument[1].Value.(*bson.D), ShouldResemble, expectedDocumentOne)
+ setNestedValue("f", 23, testDocument)
+ newDocument = *testDocument
+ So(len(newDocument), ShouldEqual, 3)
+ So(newDocument[2], ShouldResemble, expectedDocumentTwo)
+ })
+ })
+}
+
+func TestRemoveBlankFields(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given an unordered BSON document", t, func() {
+ Convey("the same document should be returned if there are no blanks", func() {
+ bsonDocument := bson.D{{"a", 3}, {"b", "hello"}}
+ So(removeBlankFields(bsonDocument), ShouldResemble, bsonDocument)
+ })
+ Convey("a new document without blanks should be returned if there are "+
+ " blanks", func() {
+ d := bson.D{
+ {"a", ""},
+ {"b", ""},
+ }
+ e := bson.D{
+ {"a", ""},
+ {"b", 1},
+ }
+ bsonDocument := bson.D{
+ {"a", 0},
+ {"b", ""},
+ {"c", ""},
+ {"d", &d},
+ {"e", &e},
+ }
+ inner := bson.D{
+ {"b", 1},
+ }
+ expectedDocument := bson.D{
+ {"a", 0},
+ {"e", inner},
+ }
+ So(removeBlankFields(bsonDocument), ShouldResemble, expectedDocument)
+ })
+ })
+}
+
+func TestTokensToBSON(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given an slice of column specs and tokens to convert to BSON", t, func() {
+ Convey("the expected ordered BSON should be produced for the given"+
+ "column specs and tokens", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ tokens := []string{"1", "2", "hello"}
+ expectedDocument := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", "hello"},
+ }
+ bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0), false)
+ So(err, ShouldBeNil)
+ So(bsonD, ShouldResemble, expectedDocument)
+ })
+ Convey("if there are more tokens than fields, additional fields should be prefixed"+
+ " with 'fields' and an index indicating the header number", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ tokens := []string{"1", "2", "hello", "mongodb", "user"}
+ expectedDocument := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", "hello"},
+ {"field3", "mongodb"},
+ {"field4", "user"},
+ }
+ bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0), false)
+ So(err, ShouldBeNil)
+ So(bsonD, ShouldResemble, expectedDocument)
+ })
+ Convey("an error should be thrown if duplicate headers are found", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field3", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ tokens := []string{"1", "2", "hello", "mongodb", "user"}
+ _, err := tokensToBSON(colSpecs, tokens, uint64(0), false)
+ So(err, ShouldNotBeNil)
+ })
+ Convey("fields with nested values should be set appropriately", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c.a", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ tokens := []string{"1", "2", "hello"}
+ c := bson.D{
+ {"a", "hello"},
+ }
+ expectedDocument := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", c},
+ }
+ bsonD, err := tokensToBSON(colSpecs, tokens, uint64(0), false)
+ So(err, ShouldBeNil)
+ So(expectedDocument[0].Name, ShouldResemble, bsonD[0].Name)
+ So(expectedDocument[0].Value, ShouldResemble, bsonD[0].Value)
+ So(expectedDocument[1].Name, ShouldResemble, bsonD[1].Name)
+ So(expectedDocument[1].Value, ShouldResemble, bsonD[1].Value)
+ So(expectedDocument[2].Name, ShouldResemble, bsonD[2].Name)
+ So(expectedDocument[2].Value, ShouldResemble, *bsonD[2].Value.(*bson.D))
+ })
+ })
+}
+
+func TestProcessDocuments(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given an import worker", t, func() {
+ index := uint64(0)
+ csvConverters := []CSVConverter{
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field1", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field2", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field3", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"a", "b", "c"},
+ index: index,
+ },
+ CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field4", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field5", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field6", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"d", "e", "f"},
+ index: index,
+ },
+ }
+ expectedDocuments := []bson.D{
+ {
+ {"field1", "a"},
+ {"field2", "b"},
+ {"field3", "c"},
+ }, {
+ {"field4", "d"},
+ {"field5", "e"},
+ {"field6", "f"},
+ },
+ }
+ Convey("processDocuments should execute the expected conversion for documents, "+
+ "pass then on the output channel, and close the input channel if ordered is true", func() {
+ inputChannel := make(chan Converter, 100)
+ outputChannel := make(chan bson.D, 100)
+ iw := &importWorker{
+ unprocessedDataChan: inputChannel,
+ processedDocumentChan: outputChannel,
+ tomb: &tomb.Tomb{},
+ }
+ inputChannel <- csvConverters[0]
+ inputChannel <- csvConverters[1]
+ close(inputChannel)
+ So(iw.processDocuments(true), ShouldBeNil)
+ doc1, open := <-outputChannel
+ So(doc1, ShouldResemble, expectedDocuments[0])
+ So(open, ShouldEqual, true)
+ doc2, open := <-outputChannel
+ So(doc2, ShouldResemble, expectedDocuments[1])
+ So(open, ShouldEqual, true)
+ _, open = <-outputChannel
+ So(open, ShouldEqual, false)
+ })
+ Convey("processDocuments should execute the expected conversion for documents, "+
+ "pass then on the output channel, and leave the input channel open if ordered is false", func() {
+ inputChannel := make(chan Converter, 100)
+ outputChannel := make(chan bson.D, 100)
+ iw := &importWorker{
+ unprocessedDataChan: inputChannel,
+ processedDocumentChan: outputChannel,
+ tomb: &tomb.Tomb{},
+ }
+ inputChannel <- csvConverters[0]
+ inputChannel <- csvConverters[1]
+ close(inputChannel)
+ So(iw.processDocuments(false), ShouldBeNil)
+ doc1, open := <-outputChannel
+ So(doc1, ShouldResemble, expectedDocuments[0])
+ So(open, ShouldEqual, true)
+ doc2, open := <-outputChannel
+ So(doc2, ShouldResemble, expectedDocuments[1])
+ So(open, ShouldEqual, true)
+ // close will throw a runtime error if outputChannel is already closed
+ close(outputChannel)
+ })
+ })
+}
+
+func TestDoSequentialStreaming(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given some import workers, a Converters input channel and an bson.D output channel", t, func() {
+ inputChannel := make(chan Converter, 5)
+ outputChannel := make(chan bson.D, 5)
+ workerInputChannel := []chan Converter{
+ make(chan Converter),
+ make(chan Converter),
+ }
+ workerOutputChannel := []chan bson.D{
+ make(chan bson.D),
+ make(chan bson.D),
+ }
+ importWorkers := []*importWorker{
+ &importWorker{
+ unprocessedDataChan: workerInputChannel[0],
+ processedDocumentChan: workerOutputChannel[0],
+ tomb: &tomb.Tomb{},
+ },
+ &importWorker{
+ unprocessedDataChan: workerInputChannel[1],
+ processedDocumentChan: workerOutputChannel[1],
+ tomb: &tomb.Tomb{},
+ },
+ }
+ Convey("documents moving through the input channel should be processed and returned in sequence", func() {
+ // start goroutines to do sequential processing
+ for _, iw := range importWorkers {
+ go iw.processDocuments(true)
+ }
+ // feed in a bunch of documents
+ for _, inputCSVDocument := range csvConverters {
+ inputChannel <- inputCSVDocument
+ }
+ close(inputChannel)
+ doSequentialStreaming(importWorkers, inputChannel, outputChannel)
+ for _, document := range expectedDocuments {
+ So(<-outputChannel, ShouldResemble, document)
+ }
+ })
+ })
+}
+
+func TestStreamDocuments(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey(`Given:
+ 1. a boolean indicating streaming order
+ 2. an input channel where documents are streamed in
+ 3. an output channel where processed documents are streamed out`, t, func() {
+
+ inputChannel := make(chan Converter, 5)
+ outputChannel := make(chan bson.D, 5)
+
+ Convey("the entire pipeline should complete without error under normal circumstances", func() {
+ // stream in some documents
+ for _, csvConverter := range csvConverters {
+ inputChannel <- csvConverter
+ }
+ close(inputChannel)
+ So(streamDocuments(true, 3, inputChannel, outputChannel), ShouldBeNil)
+
+ // ensure documents are streamed out and processed in the correct manner
+ for _, expectedDocument := range expectedDocuments {
+ So(<-outputChannel, ShouldResemble, expectedDocument)
+ }
+ })
+ Convey("the entire pipeline should complete with error if an error is encountered", func() {
+ // stream in some documents - create duplicate headers to simulate an error
+ csvConverter := CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field1", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field2", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"a", "b", "c"},
+ index: uint64(0),
+ }
+ inputChannel <- csvConverter
+ close(inputChannel)
+
+ // ensure that an error is returned on the error channel
+ So(streamDocuments(true, 3, inputChannel, outputChannel), ShouldNotBeNil)
+ })
+ })
+}
+
+func TestChannelQuorumError(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("Given a channel and a quorum...", t, func() {
+ Convey("an error should be returned if one is received", func() {
+ ch := make(chan error, 2)
+ ch <- nil
+ ch <- io.EOF
+ So(channelQuorumError(ch, 2), ShouldNotBeNil)
+ })
+ Convey("no error should be returned if none is received", func() {
+ ch := make(chan error, 2)
+ ch <- nil
+ ch <- nil
+ So(channelQuorumError(ch, 2), ShouldBeNil)
+ })
+ Convey("no error should be returned if up to quorum nil errors are received", func() {
+ ch := make(chan error, 3)
+ ch <- nil
+ ch <- nil
+ ch <- io.EOF
+ So(channelQuorumError(ch, 2), ShouldBeNil)
+ })
+ })
+}
+
+func TestFilterIngestError(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given a boolean 'stopOnError' and an error...", t, func() {
+
+ Convey("an error should be returned if stopOnError is true the err is not nil", func() {
+ So(filterIngestError(true, fmt.Errorf("")), ShouldNotBeNil)
+ })
+
+ Convey("errLostConnection should be returned if stopOnError is true the err is io.EOF", func() {
+ So(filterIngestError(true, io.EOF).Error(), ShouldEqual, db.ErrLostConnection)
+ })
+
+ Convey("no error should be returned if stopOnError is false the err is not nil", func() {
+ So(filterIngestError(false, fmt.Errorf("")), ShouldBeNil)
+ })
+
+ Convey("no error should be returned if stopOnError is false the err is nil", func() {
+ So(filterIngestError(false, nil), ShouldBeNil)
+ })
+
+ Convey("no error should be returned if stopOnError is true the err is nil", func() {
+ So(filterIngestError(true, nil), ShouldBeNil)
+ })
+ })
+}
diff --git a/src/mongo/gotools/mongoimport/csv.go b/src/mongo/gotools/mongoimport/csv.go
new file mode 100644
index 00000000000..3e66d034ddc
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/csv.go
@@ -0,0 +1,151 @@
+package mongoimport
+
+import (
+ gocsv "encoding/csv"
+ "fmt"
+ "io"
+
+ "github.com/mongodb/mongo-tools/mongoimport/csv"
+ "gopkg.in/mgo.v2/bson"
+)
+
+// CSVInputReader implements the InputReader interface for CSV input types.
+type CSVInputReader struct {
+ // colSpecs is a list of column specifications in the BSON documents to be imported
+ colSpecs []ColumnSpec
+
+ // csvReader is the underlying reader used to read data in from the CSV or CSV file
+ csvReader *csv.Reader
+
+ // csvRejectWriter is where coercion-failed rows are written, if applicable
+ csvRejectWriter *gocsv.Writer
+
+ // csvRecord stores each line of input we read from the underlying reader
+ csvRecord []string
+
+ // numProcessed tracks the number of CSV records processed by the underlying reader
+ numProcessed uint64
+
+ // numDecoders is the number of concurrent goroutines to use for decoding
+ numDecoders int
+
+ // embedded sizeTracker exposes the Size() method to check the number of bytes read so far
+ sizeTracker
+
+ // ignoreBlanks is whether empty fields should be ignored
+ ignoreBlanks bool
+}
+
+// CSVConverter implements the Converter interface for CSV input.
+type CSVConverter struct {
+ colSpecs []ColumnSpec
+ data []string
+ index uint64
+ ignoreBlanks bool
+ rejectWriter *gocsv.Writer
+}
+
+// NewCSVInputReader returns a CSVInputReader configured to read data from the
+// given io.Reader, extracting only the specified columns using exactly "numDecoders"
+// goroutines.
+func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, numDecoders int, ignoreBlanks bool) *CSVInputReader {
+ szCount := newSizeTrackingReader(newBomDiscardingReader(in))
+ csvReader := csv.NewReader(szCount)
+ // allow variable number of colSpecs in document
+ csvReader.FieldsPerRecord = -1
+ csvReader.TrimLeadingSpace = true
+ return &CSVInputReader{
+ colSpecs: colSpecs,
+ csvReader: csvReader,
+ csvRejectWriter: gocsv.NewWriter(rejects),
+ numProcessed: uint64(0),
+ numDecoders: numDecoders,
+ sizeTracker: szCount,
+ ignoreBlanks: ignoreBlanks,
+ }
+}
+
+// ReadAndValidateHeader reads the header from the underlying reader and validates
+// the header fields. It sets err if the read/validation fails.
+func (r *CSVInputReader) ReadAndValidateHeader() (err error) {
+ fields, err := r.csvReader.Read()
+ if err != nil {
+ return err
+ }
+ r.colSpecs = ParseAutoHeaders(fields)
+ return validateReaderFields(ColumnNames(r.colSpecs))
+}
+
+// ReadAndValidateHeader reads the header from the underlying reader and validates
+// the header fields. It sets err if the read/validation fails.
+func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) {
+ fields, err := r.csvReader.Read()
+ if err != nil {
+ return err
+ }
+ r.colSpecs, err = ParseTypedHeaders(fields, parseGrace)
+ if err != nil {
+ return err
+ }
+ return validateReaderFields(ColumnNames(r.colSpecs))
+}
+
+// StreamDocument takes a boolean indicating if the documents should be streamed
+// in read order and a channel on which to stream the documents processed from
+// the underlying reader. Returns a non-nil error if streaming fails.
+func (r *CSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (retErr error) {
+ csvRecordChan := make(chan Converter, r.numDecoders)
+ csvErrChan := make(chan error)
+
+ // begin reading from source
+ go func() {
+ var err error
+ for {
+ r.csvRecord, err = r.csvReader.Read()
+ if err != nil {
+ close(csvRecordChan)
+ if err == io.EOF {
+ csvErrChan <- nil
+ } else {
+ r.numProcessed++
+ csvErrChan <- fmt.Errorf("read error on entry #%v: %v", r.numProcessed, err)
+ }
+ return
+ }
+ csvRecordChan <- CSVConverter{
+ colSpecs: r.colSpecs,
+ data: r.csvRecord,
+ index: r.numProcessed,
+ ignoreBlanks: r.ignoreBlanks,
+ rejectWriter: r.csvRejectWriter,
+ }
+ r.numProcessed++
+ }
+ }()
+
+ go func() {
+ csvErrChan <- streamDocuments(ordered, r.numDecoders, csvRecordChan, readDocs)
+ }()
+
+ return channelQuorumError(csvErrChan, 2)
+}
+
+// Convert implements the Converter interface for CSV input. It converts a
+// CSVConverter struct to a BSON document.
+func (c CSVConverter) Convert() (b bson.D, err error) {
+ b, err = tokensToBSON(
+ c.colSpecs,
+ c.data,
+ c.index,
+ c.ignoreBlanks,
+ )
+ if _, ok := err.(coercionError); ok {
+ c.Print()
+ err = nil
+ }
+ return
+}
+
+func (c CSVConverter) Print() {
+ c.rejectWriter.Write(c.data)
+}
diff --git a/src/mongo/gotools/mongoimport/csv/reader.go b/src/mongo/gotools/mongoimport/csv/reader.go
new file mode 100644
index 00000000000..aad3c4a32c3
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/csv/reader.go
@@ -0,0 +1,363 @@
+// Copyright 2011 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package csv reads and writes comma-separated values (CSV) files.
+//
+// A csv file contains zero or more records of one or more fields per record.
+// Each record is separated by the newline character. The final record may
+// optionally be followed by a newline character.
+//
+// field1,field2,field3
+//
+// White space is considered part of a field.
+//
+// Carriage returns before newline characters are silently removed.
+//
+// Blank lines are ignored. A line with only whitespace characters (excluding
+// the ending newline character) is not considered a blank line.
+//
+// Fields which start and stop with the quote character " are called
+// quoted-fields. The beginning and ending quote are not part of the
+// field.
+//
+// The source:
+//
+// normal string,"quoted-field"
+//
+// results in the fields
+//
+// {`normal string`, `quoted-field`}
+//
+// Within a quoted-field a quote character followed by a second quote
+// character is considered a single quote.
+//
+// "the ""word"" is true","a ""quoted-field"""
+//
+// results in
+//
+// {`the "word" is true`, `a "quoted-field"`}
+//
+// Newlines and commas may be included in a quoted-field
+//
+// "Multi-line
+// field","comma is ,"
+//
+// results in
+//
+// {`Multi-line
+// field`, `comma is ,`}
+package csv
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "unicode"
+)
+
+// A ParseError is returned for parsing errors.
+// The first line is 1. The first column is 0.
+type ParseError struct {
+ Line int // Line where the error occurred
+ Column int // Column (rune index) where the error occurred
+ Err error // The actual error
+}
+
+func (e *ParseError) Error() string {
+ return fmt.Sprintf("line %d, column %d: %s", e.Line, e.Column, e.Err)
+}
+
+// These are the errors that can be returned in ParseError.Error
+var (
+ ErrTrailingComma = errors.New("extra delimiter at end of line") // no longer used
+ ErrBareQuote = errors.New("bare \" in non-quoted-field")
+ ErrQuote = errors.New("extraneous \" in field")
+ ErrFieldCount = errors.New("wrong number of fields in line")
+)
+
+// A Reader reads records from a CSV-encoded file.
+//
+// As returned by NewReader, a Reader expects input conforming to RFC 4180.
+// The exported fields can be changed to customize the details before the
+// first call to Read or ReadAll.
+//
+// Comma is the field delimiter. It defaults to ','.
+//
+// Comment, if not 0, is the comment character. Lines beginning with the
+// Comment character are ignored.
+//
+// If FieldsPerRecord is positive, Read requires each record to
+// have the given number of fields. If FieldsPerRecord is 0, Read sets it to
+// the number of fields in the first record, so that future records must
+// have the same field count. If FieldsPerRecord is negative, no check is
+// made and records may have a variable number of fields.
+//
+// If LazyQuotes is true, a quote may appear in an unquoted field and a
+// non-doubled quote may appear in a quoted field.
+//
+// If TrimLeadingSpace is true, leading white space in a field is ignored.
+type Reader struct {
+ Comma rune // field delimiter (set to ',' by NewReader)
+ Comment rune // comment character for start of line
+ FieldsPerRecord int // number of expected fields per record
+ LazyQuotes bool // allow lazy quotes
+ TrailingComma bool // ignored; here for backwards compatibility
+ TrimLeadingSpace bool // trim leading space
+ line int
+ column int
+ r *bufio.Reader
+ field bytes.Buffer
+}
+
+// NewReader returns a new Reader that reads from r.
+func NewReader(r io.Reader) *Reader {
+ return &Reader{
+ Comma: ',',
+ r: bufio.NewReader(r),
+ }
+}
+
+// error creates a new ParseError based on err.
+func (r *Reader) error(err error) error {
+ return &ParseError{
+ Line: r.line,
+ Column: r.column,
+ Err: err,
+ }
+}
+
+// Read reads one record from r. The record is a slice of strings with each
+// string representing one field.
+func (r *Reader) Read() (record []string, err error) {
+ for {
+ record, err = r.parseRecord()
+ if record != nil {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if r.FieldsPerRecord > 0 {
+ if len(record) != r.FieldsPerRecord {
+ r.column = 0 // report at start of record
+ return record, r.error(ErrFieldCount)
+ }
+ } else if r.FieldsPerRecord == 0 {
+ r.FieldsPerRecord = len(record)
+ }
+ return record, nil
+}
+
+// ReadAll reads all the remaining records from r.
+// Each record is a slice of fields.
+// A successful call returns err == nil, not err == EOF. Because ReadAll is
+// defined to read until EOF, it does not treat end of file as an error to be
+// reported.
+func (r *Reader) ReadAll() (records [][]string, err error) {
+ for {
+ record, err := r.Read()
+ if err == io.EOF {
+ return records, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ records = append(records, record)
+ }
+}
+
+// readRune reads one rune from r, folding \r\n to \n and keeping track
+// of how far into the line we have read. r.column will point to the start
+// of this rune, not the end of this rune.
+func (r *Reader) readRune() (rune, error) {
+ r1, _, err := r.r.ReadRune()
+
+ // Handle \r\n here. We make the simplifying assumption that
+ // anytime \r is followed by \n that it can be folded to \n.
+ // We will not detect files which contain both \r\n and bare \n.
+ if r1 == '\r' {
+ r1, _, err = r.r.ReadRune()
+ if err == nil {
+ if r1 != '\n' {
+ r.r.UnreadRune()
+ r1 = '\r'
+ }
+ }
+ }
+ r.column++
+ return r1, err
+}
+
+// skip reads runes up to and including the rune delim or until error.
+func (r *Reader) skip(delim rune) error {
+ for {
+ r1, err := r.readRune()
+ if err != nil {
+ return err
+ }
+ if r1 == delim {
+ return nil
+ }
+ }
+}
+
+// parseRecord reads and parses a single csv record from r.
+func (r *Reader) parseRecord() (fields []string, err error) {
+ // Each record starts on a new line. We increment our line
+ // number (lines start at 1, not 0) and set column to -1
+ // so as we increment in readRune it points to the character we read.
+ r.line++
+ r.column = -1
+
+ // Peek at the first rune. If it is an error we are done.
+ // If we are support comments and it is the comment character
+ // then skip to the end of line.
+
+ r1, _, err := r.r.ReadRune()
+ if err != nil {
+ return nil, err
+ }
+
+ if r.Comment != 0 && r1 == r.Comment {
+ return nil, r.skip('\n')
+ }
+ r.r.UnreadRune()
+
+ // At this point we have at least one field.
+ for {
+ haveField, delim, err := r.parseField()
+ if haveField {
+ fields = append(fields, r.field.String())
+ }
+ if delim == '\n' || err == io.EOF {
+ return fields, err
+ } else if err != nil {
+ return nil, err
+ }
+ }
+}
+
+// parseField parses the next field in the record. The read field is
+// located in r.field. Delim is the first character not part of the field
+// (r.Comma or '\n').
+func (r *Reader) parseField() (haveField bool, delim rune, err error) {
+ r.field.Reset()
+
+ r1, err := r.readRune()
+ for err == nil && r.TrimLeadingSpace && r1 != '\n' && unicode.IsSpace(r1) {
+ r1, err = r.readRune()
+ }
+
+ if err == io.EOF && r.column != 0 {
+ return true, 0, err
+ }
+ if err != nil {
+ return false, 0, err
+ }
+
+ var ws bytes.Buffer
+
+ switch r1 {
+ case r.Comma:
+ // will check below
+
+ case '\n':
+ // We are a trailing empty field or a blank line
+ if r.column == 0 {
+ return false, r1, nil
+ }
+ return true, r1, nil
+
+ case '"':
+ // quoted field
+ Quoted:
+ for {
+ r1, err = r.readRune()
+ if err != nil {
+ if err == io.EOF {
+ if r.LazyQuotes {
+ return true, 0, err
+ }
+ return false, 0, r.error(ErrQuote)
+ }
+ return false, 0, err
+ }
+ switch r1 {
+ case '"':
+ r1, err = r.readRune()
+ if err == nil && r.TrimLeadingSpace && r1 != '\n' && unicode.IsSpace(r1) {
+ for err == nil && r.TrimLeadingSpace && r1 != '\n' && unicode.IsSpace(r1) {
+ r1, err = r.readRune()
+ }
+ // we don't want '"foo" "bar",' to look like '"foo""bar"'
+ // which evaluates to 'foo"bar'
+ // so we explicitly test for the case that the trimed whitespace isn't
+ // followed by a '"'
+ if err == nil && r1 == '"' {
+ r.column--
+ return false, 0, r.error(ErrQuote)
+ }
+ }
+ if err != nil || r1 == r.Comma {
+ break Quoted
+ }
+ if r1 == '\n' {
+ return true, r1, nil
+ }
+ if r1 != '"' {
+ if !r.LazyQuotes {
+ r.column--
+ return false, 0, r.error(ErrQuote)
+ }
+ // accept the bare quote
+ r.field.WriteRune('"')
+ }
+ case '\n':
+ r.line++
+ r.column = -1
+ }
+ r.field.WriteRune(r1)
+ }
+
+ default:
+ // unquoted field
+ for {
+ // only write sections of whitespace if it's followed by non-whitespace
+ if unicode.IsSpace(r1) {
+ ws.WriteRune(r1)
+ } else {
+ r.field.WriteString(ws.String())
+ ws.Reset()
+ r.field.WriteRune(r1)
+ }
+ r1, err = r.readRune()
+ if err != nil || r1 == r.Comma {
+ break
+ }
+ if r1 == '\n' {
+ return true, r1, nil
+ }
+ if !r.LazyQuotes && r1 == '"' {
+ return false, 0, r.error(ErrBareQuote)
+ }
+ }
+ }
+ //write any remaining section of whitespace unless TrimLeadingSpace on
+ if !r.TrimLeadingSpace {
+ r.field.WriteString(ws.String())
+ }
+
+ if err != nil {
+ if err == io.EOF {
+ return true, 0, err
+ }
+ return false, 0, err
+ }
+
+ return true, r1, nil
+}
diff --git a/src/mongo/gotools/mongoimport/csv_test.go b/src/mongo/gotools/mongoimport/csv_test.go
new file mode 100644
index 00000000000..0db342d9f12
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/csv_test.go
@@ -0,0 +1,354 @@
+package mongoimport
+
+import (
+ "bytes"
+ "io"
+ "os"
+ "strings"
+ "testing"
+
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+)
+
+func init() {
+ log.SetVerbosity(&options.Verbosity{
+ VLevel: 4,
+ })
+}
+
+func TestCSVStreamDocument(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a CSV input reader", t, func() {
+ Convey("badly encoded CSV should result in a parsing error", func() {
+ contents := `1, 2, foo"bar`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldNotBeNil)
+ })
+ Convey("escaped quotes are parsed correctly", func() {
+ contents := `1, 2, "foo""bar"`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ })
+ Convey("multiple escaped quotes separated by whitespace parsed correctly", func() {
+ contents := `1, 2, "foo"" ""bar"`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", `foo" "bar`},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+ Convey("integer valued strings should be converted", func() {
+ contents := `1, 2, " 3e"`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", " 3e"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+ Convey("extra fields should be prefixed with 'field'", func() {
+ contents := `1, 2f , " 3e" , " may"`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", "2f"},
+ {"c", " 3e"},
+ {"field3", " may"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+ Convey("nested CSV fields should be imported properly", func() {
+ contents := `1, 2f , " 3e" , " may"`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b.c", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ b := bson.D{{"c", "2f"}}
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", b},
+ {"c", " 3e"},
+ {"field3", " may"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 4)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+
+ readDocument := <-docChan
+ So(readDocument[0], ShouldResemble, expectedRead[0])
+ So(readDocument[1].Name, ShouldResemble, expectedRead[1].Name)
+ So(*readDocument[1].Value.(*bson.D), ShouldResemble, expectedRead[1].Value)
+ So(readDocument[2], ShouldResemble, expectedRead[2])
+ So(readDocument[3], ShouldResemble, expectedRead[3])
+ })
+ Convey("whitespace separated quoted strings are still an error", func() {
+ contents := `1, 2, "foo" "bar"`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldNotBeNil)
+ })
+ Convey("nested CSV fields causing header collisions should error", func() {
+ contents := `1, 2f , " 3e" , " may", june`
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b.c", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field3", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldNotBeNil)
+ })
+ Convey("calling StreamDocument() for CSVs should return next set of "+
+ "values", func() {
+ contents := "1, 2, 3\n4, 5, 6"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedReadOne := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }
+ expectedReadTwo := bson.D{
+ {"a", int32(4)},
+ {"b", int32(5)},
+ {"c", int32(6)},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 2)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedReadOne)
+ So(<-docChan, ShouldResemble, expectedReadTwo)
+ })
+ Convey("valid CSV input file that starts with the UTF-8 BOM should "+
+ "not raise an error", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedReads := []bson.D{
+ {
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }, {
+ {"a", int32(4)},
+ {"b", int32(5)},
+ {"c", int32(6)},
+ },
+ }
+ fileHandle, err := os.Open("testdata/test_bom.csv")
+ So(err, ShouldBeNil)
+ r := NewCSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false)
+ docChan := make(chan bson.D, len(expectedReads))
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ for _, expectedRead := range expectedReads {
+ for i, readDocument := range <-docChan {
+ So(readDocument.Name, ShouldResemble, expectedRead[i].Name)
+ So(readDocument.Value, ShouldResemble, expectedRead[i].Value)
+ }
+ }
+ })
+ })
+}
+
+func TestCSVReadAndValidateHeader(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ var err error
+ Convey("With a CSV input reader", t, func() {
+ Convey("setting the header should read the first line of the CSV", func() {
+ contents := "extraHeader1, extraHeader2, extraHeader3"
+ colSpecs := []ColumnSpec{}
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ So(len(r.colSpecs), ShouldEqual, 3)
+ })
+
+ Convey("setting non-colliding nested CSV headers should not raise an error", func() {
+ contents := "a, b, c"
+ colSpecs := []ColumnSpec{}
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ So(len(r.colSpecs), ShouldEqual, 3)
+ contents = "a.b.c, a.b.d, c"
+ colSpecs = []ColumnSpec{}
+ r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ So(len(r.colSpecs), ShouldEqual, 3)
+
+ contents = "a.b, ab, a.c"
+ colSpecs = []ColumnSpec{}
+ r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ So(len(r.colSpecs), ShouldEqual, 3)
+
+ contents = "a, ab, ac, dd"
+ colSpecs = []ColumnSpec{}
+ r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ So(len(r.colSpecs), ShouldEqual, 4)
+ })
+
+ Convey("setting colliding nested CSV headers should raise an error", func() {
+ contents := "a, a.b, c"
+ colSpecs := []ColumnSpec{}
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldNotBeNil)
+
+ contents = "a.b.c, a.b.d.c, a.b.d"
+ colSpecs = []ColumnSpec{}
+ r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldNotBeNil)
+
+ contents = "a, a, a"
+ colSpecs = []ColumnSpec{}
+ r = NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldNotBeNil)
+ })
+
+ Convey("setting the header that ends in a dot should error", func() {
+ contents := "c, a., b"
+ colSpecs := []ColumnSpec{}
+ So(err, ShouldBeNil)
+ So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil)
+ })
+
+ Convey("setting the header that starts in a dot should error", func() {
+ contents := "c, .a, b"
+ colSpecs := []ColumnSpec{}
+ So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil)
+ })
+
+ Convey("setting the header that contains multiple consecutive dots should error", func() {
+ contents := "c, a..a, b"
+ colSpecs := []ColumnSpec{}
+ So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil)
+
+ contents = "c, a.a, b.b...b"
+ colSpecs = []ColumnSpec{}
+ So(NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false).ReadAndValidateHeader(), ShouldNotBeNil)
+ })
+
+ Convey("setting the header using an empty file should return EOF", func() {
+ contents := ""
+ colSpecs := []ColumnSpec{}
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldEqual, io.EOF)
+ So(len(r.colSpecs), ShouldEqual, 0)
+ })
+ Convey("setting the header with column specs already set should replace "+
+ "the existing column specs", func() {
+ contents := "extraHeader1,extraHeader2,extraHeader3"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ r := NewCSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ // if ReadAndValidateHeader() is called with column specs already passed
+ // in, the header should be replaced with the read header line
+ So(len(r.colSpecs), ShouldEqual, 3)
+ So(ColumnNames(r.colSpecs), ShouldResemble, strings.Split(contents, ","))
+ })
+ Convey("plain CSV input file sources should be parsed correctly and "+
+ "subsequent imports should parse correctly", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedReadOne := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }
+ expectedReadTwo := bson.D{
+ {"a", int32(3)},
+ {"b", 5.4},
+ {"c", "string"},
+ }
+ fileHandle, err := os.Open("testdata/test.csv")
+ So(err, ShouldBeNil)
+ r := NewCSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false)
+ docChan := make(chan bson.D, 50)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedReadOne)
+ So(<-docChan, ShouldResemble, expectedReadTwo)
+ })
+ })
+}
+
+func TestCSVConvert(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a CSV input reader", t, func() {
+ Convey("calling convert on a CSVConverter should return the expected BSON document", func() {
+ csvConverter := CSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field1", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field2", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field3", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: []string{"a", "b", "c"},
+ index: uint64(0),
+ }
+ expectedDocument := bson.D{
+ {"field1", "a"},
+ {"field2", "b"},
+ {"field3", "c"},
+ }
+ document, err := csvConverter.Convert()
+ So(err, ShouldBeNil)
+ So(document, ShouldResemble, expectedDocument)
+ })
+ })
+}
diff --git a/src/mongo/gotools/mongoimport/dateconv/dateconv.go b/src/mongo/gotools/mongoimport/dateconv/dateconv.go
new file mode 100644
index 00000000000..91fecf51ad7
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/dateconv/dateconv.go
@@ -0,0 +1,77 @@
+package dateconv
+
+import (
+ "strings"
+)
+
+var (
+ // msReplacers based on:
+ // https://msdn.microsoft.com/en-us/library/ee634398(v=sql.130).aspx
+ msReplacers = []string{
+ "dddd", "Monday",
+ "ddd", "Mon",
+ "dd", "02",
+ "d", "2",
+ "MMMM", "January",
+ "MMM", "Jan",
+ "MM", "01",
+ "M", "1",
+ // "gg", "?",
+ "hh", "03",
+ "h", "3",
+ "HH", "15",
+ "H", "15",
+ "mm", "04",
+ "m", "4",
+ "ss", "05",
+ "s", "5",
+ // "f", "?",
+ "tt", "PM",
+ // "t", "?",
+ "yyyy", "2006",
+ "yyy", "2006",
+ "yy", "06",
+ // "y", "?",
+ "zzz", "-07:00",
+ "zz", "-07",
+ // "z", "?",
+ }
+ msStringReplacer = strings.NewReplacer(msReplacers...)
+)
+
+// FromMS reformats a datetime layout string from the Microsoft SQL Server
+// FORMAT function into go's parse format.
+func FromMS(layout string) string {
+ return msStringReplacer.Replace(layout)
+}
+
+var (
+ // oracleReplacers based on:
+ // http://docs.oracle.com/cd/B19306_01/server.102/b14200/sql_elements004.htm#i34924
+ oracleReplacers = []string{
+ "AM", "PM",
+ "DAY", "Monday",
+ "DY", "Mon",
+ "DD", "02",
+ "HH12", "03",
+ "HH24", "15",
+ "HH", "03",
+ "MI", "04",
+ "MONTH", "January",
+ "MON", "Jan",
+ "MM", "01",
+ "SS", "05",
+ "TZD", "MST",
+ "TZH:TZM", "-07:00",
+ "TZH", "-07",
+ "YYYY", "2006",
+ "YY", "06",
+ }
+ oracleStringReplacer = strings.NewReplacer(oracleReplacers...)
+)
+
+// FromOrace reformats a datetime layout string from the Oracle Database
+// TO_DATE function into go's parse format.
+func FromOracle(layout string) string {
+ return oracleStringReplacer.Replace(strings.ToUpper(layout))
+}
diff --git a/src/mongo/gotools/mongoimport/json.go b/src/mongo/gotools/mongoimport/json.go
new file mode 100644
index 00000000000..caa0dce3121
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/json.go
@@ -0,0 +1,239 @@
+package mongoimport
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/mongodb/mongo-tools/common/bsonutil"
+ "github.com/mongodb/mongo-tools/common/json"
+ "github.com/mongodb/mongo-tools/common/log"
+ "gopkg.in/mgo.v2/bson"
+)
+
+// JSONInputReader is an implementation of InputReader that reads documents
+// in JSON format.
+type JSONInputReader struct {
+ // isArray indicates if the JSON import is an array of JSON documents
+ // or not
+ isArray bool
+
+ // decoder is used to read the next valid JSON documents from the input source
+ decoder *json.Decoder
+
+ // numProcessed indicates the number of JSON documents processed
+ numProcessed uint64
+
+ // readOpeningBracket indicates if the underlying io.Reader has consumed
+ // an opening bracket from the input source. Used to prevent errors when
+ // a JSON input source contains just '[]'
+ readOpeningBracket bool
+
+ // expectedByte is used to store the next expected valid character for JSON
+ // array imports
+ expectedByte byte
+
+ // bytesFromReader is used to store the next byte read from the Reader for
+ // JSON array imports
+ bytesFromReader []byte
+
+ // separatorReader is used for JSON arrays to look for a valid array
+ // separator. It is a reader consisting of the decoder's buffer and the
+ // underlying reader
+ separatorReader io.Reader
+
+ // embedded sizeTracker exposes the Size() method to check the number of bytes read so far
+ sizeTracker
+
+ // numDecoders is the number of concurrent goroutines to use for decoding
+ numDecoders int
+}
+
+// JSONConverter implements the Converter interface for JSON input.
+type JSONConverter struct {
+ data []byte
+ index uint64
+}
+
+var (
+ // ErrNoOpeningBracket means that the input source did not contain any
+ // opening brace - returned only if --jsonArray is passed in.
+ ErrNoOpeningBracket = errors.New("bad JSON array format - found no " +
+ "opening bracket '[' in input source")
+
+ // ErrNoClosingBracket means that the input source did not contain any
+ // closing brace - returned only if --jsonArray is passed in.
+ ErrNoClosingBracket = errors.New("bad JSON array format - found no " +
+ "closing bracket ']' in input source")
+)
+
+// NewJSONInputReader creates a new JSONInputReader in array mode if specified,
+// configured to read data to the given io.Reader.
+func NewJSONInputReader(isArray bool, in io.Reader, numDecoders int) *JSONInputReader {
+ szCount := newSizeTrackingReader(newBomDiscardingReader(in))
+ return &JSONInputReader{
+ isArray: isArray,
+ sizeTracker: szCount,
+ decoder: json.NewDecoder(szCount),
+ readOpeningBracket: false,
+ bytesFromReader: make([]byte, 1),
+ numDecoders: numDecoders,
+ }
+}
+
+// ReadAndValidateHeader is a no-op for JSON imports; always returns nil.
+func (r *JSONInputReader) ReadAndValidateHeader() error {
+ return nil
+}
+
+// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil.
+func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error {
+ return nil
+}
+
+// StreamDocument takes a boolean indicating if the documents should be streamed
+// in read order and a channel on which to stream the documents processed from
+// the underlying reader. Returns a non-nil error if encountered
+func (r *JSONInputReader) StreamDocument(ordered bool, readChan chan bson.D) (retErr error) {
+ rawChan := make(chan Converter, r.numDecoders)
+ jsonErrChan := make(chan error)
+
+ // begin reading from source
+ go func() {
+ var err error
+ for {
+ if r.isArray {
+ if err = r.readJSONArraySeparator(); err != nil {
+ close(rawChan)
+ if err == io.EOF {
+ jsonErrChan <- nil
+ } else {
+ r.numProcessed++
+ jsonErrChan <- fmt.Errorf("error reading separator after document #%v: %v", r.numProcessed, err)
+ }
+ return
+ }
+ }
+ rawBytes, err := r.decoder.ScanObject()
+ if err != nil {
+ close(rawChan)
+ if err == io.EOF {
+ jsonErrChan <- nil
+ } else {
+ r.numProcessed++
+ jsonErrChan <- fmt.Errorf("error processing document #%v: %v", r.numProcessed, err)
+ }
+ return
+ }
+ rawChan <- JSONConverter{
+ data: rawBytes,
+ index: r.numProcessed,
+ }
+ r.numProcessed++
+ }
+ }()
+
+ // begin processing read bytes
+ go func() {
+ jsonErrChan <- streamDocuments(ordered, r.numDecoders, rawChan, readChan)
+ }()
+
+ return channelQuorumError(jsonErrChan, 2)
+}
+
+// Convert implements the Converter interface for JSON input. It converts a
+// JSONConverter struct to a BSON document.
+func (c JSONConverter) Convert() (bson.D, error) {
+ document, err := json.UnmarshalBsonD(c.data)
+ if err != nil {
+ return nil, fmt.Errorf("error unmarshaling bytes on document #%v: %v", c.index, err)
+ }
+ log.Logvf(log.DebugHigh, "got line: %v", document)
+
+ bsonD, err := bsonutil.GetExtendedBsonD(document)
+ if err != nil {
+ return nil, fmt.Errorf("error getting extended BSON for document #%v: %v", c.index, err)
+ }
+ log.Logvf(log.DebugHigh, "got extended line: %#v", bsonD)
+ return bsonD, nil
+}
+
+// readJSONArraySeparator is a helper method used to process JSON arrays. It is
+// used to read any of the valid separators for a JSON array and flag invalid
+// characters.
+//
+// It will read a byte at a time until it finds an expected character after
+// which it returns control to the caller.
+//
+// It will also return immediately if it finds any error (including EOF). If it
+// reads a JSON_ARRAY_END byte, as a validity check it will continue to scan the
+// input source until it hits an error (including EOF) to ensure the entire
+// input source content is a valid JSON array
+func (r *JSONInputReader) readJSONArraySeparator() error {
+ r.expectedByte = json.ArraySep
+ if r.numProcessed == 0 {
+ r.expectedByte = json.ArrayStart
+ }
+
+ var readByte byte
+ scanp := 0
+
+ separatorReader := io.MultiReader(
+ r.decoder.Buffered(),
+ r.decoder.R,
+ )
+ for readByte != r.expectedByte {
+ n, err := separatorReader.Read(r.bytesFromReader)
+ scanp += n
+ if n == 0 || err != nil {
+ if err == io.EOF {
+ return ErrNoClosingBracket
+ }
+ return err
+ }
+ readByte = r.bytesFromReader[0]
+
+ if readByte == json.ArrayEnd {
+ // if we read the end of the JSON array, ensure we have no other
+ // non-whitespace characters at the end of the array
+ for {
+ _, err = separatorReader.Read(r.bytesFromReader)
+ if err != nil {
+ // takes care of the '[]' case
+ if !r.readOpeningBracket {
+ return ErrNoOpeningBracket
+ }
+ return err
+ }
+ readString := string(r.bytesFromReader[0])
+ if strings.TrimSpace(readString) != "" {
+ return fmt.Errorf("bad JSON array format - found '%v' "+
+ "after '%v' in input source", readString,
+ string(json.ArrayEnd))
+ }
+ }
+ }
+
+ // this will catch any invalid inter JSON object byte that occurs in the
+ // input source
+ if !(readByte == json.ArraySep ||
+ strings.TrimSpace(string(readByte)) == "" ||
+ readByte == json.ArrayStart ||
+ readByte == json.ArrayEnd) {
+ if r.expectedByte == json.ArrayStart {
+ return ErrNoOpeningBracket
+ }
+ return fmt.Errorf("bad JSON array format - found '%v' outside "+
+ "JSON object/array in input source", string(readByte))
+ }
+ }
+ // adjust the buffer to account for read bytes
+ if scanp < len(r.decoder.Buf) {
+ r.decoder.Buf = r.decoder.Buf[scanp:]
+ } else {
+ r.decoder.Buf = []byte{}
+ }
+ r.readOpeningBracket = true
+ return nil
+}
diff --git a/src/mongo/gotools/mongoimport/json_test.go b/src/mongo/gotools/mongoimport/json_test.go
new file mode 100644
index 00000000000..597b282856b
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/json_test.go
@@ -0,0 +1,264 @@
+package mongoimport
+
+import (
+ "bytes"
+ "io"
+ "os"
+ "testing"
+
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+)
+
+func TestJSONArrayStreamDocument(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a JSON array input reader", t, func() {
+ var jsonFile, fileHandle *os.File
+ Convey("an error should be thrown if a plain JSON document is supplied", func() {
+ contents := `{"a": "ae"}`
+ r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil)
+ })
+
+ Convey("reading a JSON object that has no opening bracket should "+
+ "error out", func() {
+ contents := `{"a":3},{"b":4}]`
+ r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil)
+ })
+
+ Convey("JSON arrays that do not end with a closing bracket should "+
+ "error out", func() {
+ contents := `[{"a": "ae"}`
+ r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldNotBeNil)
+ // though first read should be fine
+ So(<-docChan, ShouldResemble, bson.D{{"a", "ae"}})
+ })
+
+ Convey("an error should be thrown if a plain JSON file is supplied", func() {
+ fileHandle, err := os.Open("testdata/test_plain.json")
+ So(err, ShouldBeNil)
+ r := NewJSONInputReader(true, fileHandle, 1)
+ So(r.StreamDocument(true, make(chan bson.D, 50)), ShouldNotBeNil)
+ })
+
+ Convey("array JSON input file sources should be parsed correctly and "+
+ "subsequent imports should parse correctly", func() {
+ // TODO: currently parses JSON as floats and not ints
+ expectedReadOne := bson.D{
+ {"a", 1.2},
+ {"b", "a"},
+ {"c", 0.4},
+ }
+ expectedReadTwo := bson.D{
+ {"a", 2.4},
+ {"b", "string"},
+ {"c", 52.9},
+ }
+ fileHandle, err := os.Open("testdata/test_array.json")
+ So(err, ShouldBeNil)
+ r := NewJSONInputReader(true, fileHandle, 1)
+ docChan := make(chan bson.D, 50)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedReadOne)
+ So(<-docChan, ShouldResemble, expectedReadTwo)
+ })
+
+ Reset(func() {
+ jsonFile.Close()
+ fileHandle.Close()
+ })
+ })
+}
+
+func TestJSONPlainStreamDocument(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a plain JSON input reader", t, func() {
+ var jsonFile, fileHandle *os.File
+ Convey("string valued JSON documents should be imported properly", func() {
+ contents := `{"a": "ae"}`
+ expectedRead := bson.D{{"a", "ae"}}
+ r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("several string valued JSON documents should be imported "+
+ "properly", func() {
+ contents := `{"a": "ae"}{"b": "dc"}`
+ expectedReadOne := bson.D{{"a", "ae"}}
+ expectedReadTwo := bson.D{{"b", "dc"}}
+ r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1)
+ docChan := make(chan bson.D, 2)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedReadOne)
+ So(<-docChan, ShouldResemble, expectedReadTwo)
+ })
+
+ Convey("number valued JSON documents should be imported properly", func() {
+ contents := `{"a": "ae", "b": 2.0}`
+ expectedRead := bson.D{{"a", "ae"}, {"b", 2.0}}
+ r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("JSON arrays should return an error", func() {
+ contents := `[{"a": "ae", "b": 2.0}]`
+ r := NewJSONInputReader(false, bytes.NewReader([]byte(contents)), 1)
+ So(r.StreamDocument(true, make(chan bson.D, 50)), ShouldNotBeNil)
+ })
+
+ Convey("plain JSON input file sources should be parsed correctly and "+
+ "subsequent imports should parse correctly", func() {
+ expectedReads := []bson.D{
+ {
+ {"a", 4},
+ {"b", "string value"},
+ {"c", 1},
+ }, {
+ {"a", 5},
+ {"b", "string value"},
+ {"c", 2},
+ }, {
+ {"a", 6},
+ {"b", "string value"},
+ {"c", 3},
+ },
+ }
+ fileHandle, err := os.Open("testdata/test_plain.json")
+ So(err, ShouldBeNil)
+ r := NewJSONInputReader(false, fileHandle, 1)
+ docChan := make(chan bson.D, len(expectedReads))
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ for i := 0; i < len(expectedReads); i++ {
+ for j, readDocument := range <-docChan {
+ So(readDocument.Name, ShouldEqual, expectedReads[i][j].Name)
+ So(readDocument.Value, ShouldEqual, expectedReads[i][j].Value)
+ }
+ }
+ })
+
+ Convey("reading JSON that starts with a UTF-8 BOM should not error",
+ func() {
+ expectedReads := []bson.D{
+ {
+ {"a", 1},
+ {"b", 2},
+ {"c", 3},
+ }, {
+ {"a", 4},
+ {"b", 5},
+ {"c", 6},
+ },
+ }
+ fileHandle, err := os.Open("testdata/test_bom.json")
+ So(err, ShouldBeNil)
+ r := NewJSONInputReader(false, fileHandle, 1)
+ docChan := make(chan bson.D, 2)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ for _, expectedRead := range expectedReads {
+ for i, readDocument := range <-docChan {
+ So(readDocument.Name, ShouldEqual, expectedRead[i].Name)
+ So(readDocument.Value, ShouldEqual, expectedRead[i].Value)
+ }
+ }
+ })
+
+ Reset(func() {
+ jsonFile.Close()
+ fileHandle.Close()
+ })
+ })
+}
+
+func TestReadJSONArraySeparator(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With an array JSON input reader", t, func() {
+ Convey("reading a JSON array separator should consume [",
+ func() {
+ contents := `[{"a": "ae"}`
+ jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(jsonImporter.readJSONArraySeparator(), ShouldBeNil)
+ // at this point it should have consumed all bytes up to `{`
+ So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil)
+ })
+ Convey("reading a closing JSON array separator without a "+
+ "corresponding opening bracket should error out ",
+ func() {
+ contents := `]`
+ jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil)
+ })
+ Convey("reading an opening JSON array separator without a "+
+ "corresponding closing bracket should error out ",
+ func() {
+ contents := `[`
+ jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(jsonImporter.readJSONArraySeparator(), ShouldBeNil)
+ So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil)
+ })
+ Convey("reading an opening JSON array separator with an ending "+
+ "closing bracket should return EOF",
+ func() {
+ contents := `[]`
+ jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(jsonImporter.readJSONArraySeparator(), ShouldBeNil)
+ So(jsonImporter.readJSONArraySeparator(), ShouldEqual, io.EOF)
+ })
+ Convey("reading an opening JSON array separator, an ending closing "+
+ "bracket but then additional characters after that, should error",
+ func() {
+ contents := `[]a`
+ jsonImporter := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(jsonImporter.readJSONArraySeparator(), ShouldBeNil)
+ So(jsonImporter.readJSONArraySeparator(), ShouldNotBeNil)
+ })
+ Convey("reading invalid JSON objects between valid objects should "+
+ "error out",
+ func() {
+ contents := `[{"a":3}x{"b":4}]`
+ r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldNotBeNil)
+ // read first valid document
+ <-docChan
+ So(r.readJSONArraySeparator(), ShouldNotBeNil)
+ })
+ Convey("reading invalid JSON objects after valid objects but between "+
+ "valid objects should error out",
+ func() {
+ contents := `[{"a":3},b{"b":4}]`
+ r := NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil)
+ contents = `[{"a":3},,{"b":4}]`
+ r = NewJSONInputReader(true, bytes.NewReader([]byte(contents)), 1)
+ So(r.StreamDocument(true, make(chan bson.D, 1)), ShouldNotBeNil)
+ })
+ })
+}
+
+func TestJSONConvert(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a JSON input reader", t, func() {
+ Convey("calling convert on a JSONConverter should return the expected BSON document", func() {
+ jsonConverter := JSONConverter{
+ data: []byte(`{field1:"a",field2:"b",field3:"c"}`),
+ index: uint64(0),
+ }
+ expectedDocument := bson.D{
+ {"field1", "a"},
+ {"field2", "b"},
+ {"field3", "c"},
+ }
+ document, err := jsonConverter.Convert()
+ So(err, ShouldBeNil)
+ So(document, ShouldResemble, expectedDocument)
+ })
+ })
+}
diff --git a/src/mongo/gotools/mongoimport/main/mongoimport.go b/src/mongo/gotools/mongoimport/main/mongoimport.go
new file mode 100644
index 00000000000..50002bc5442
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/main/mongoimport.go
@@ -0,0 +1,86 @@
+// Main package for the mongoimport tool.
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/mongodb/mongo-tools/common/db"
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/signals"
+ "github.com/mongodb/mongo-tools/common/util"
+ "github.com/mongodb/mongo-tools/mongoimport"
+)
+
+func main() {
+ // initialize command-line opts
+ opts := options.New("mongoimport", mongoimport.Usage,
+ options.EnabledOptions{Auth: true, Connection: true, Namespace: true})
+
+ inputOpts := &mongoimport.InputOptions{}
+ opts.AddOptions(inputOpts)
+ ingestOpts := &mongoimport.IngestOptions{}
+ opts.AddOptions(ingestOpts)
+
+ args, err := opts.Parse()
+ if err != nil {
+ log.Logvf(log.Always, "error parsing command line options: %v", err)
+ log.Logvf(log.Always, "try 'mongoimport --help' for more information")
+ os.Exit(util.ExitBadOptions)
+ }
+
+ log.SetVerbosity(opts.Verbosity)
+ signals.Handle()
+
+ // print help, if specified
+ if opts.PrintHelp(false) {
+ return
+ }
+
+ // print version, if specified
+ if opts.PrintVersion() {
+ return
+ }
+
+ // connect directly, unless a replica set name is explicitly specified
+ _, setName := util.ParseConnectionString(opts.Host)
+ opts.Direct = (setName == "")
+ opts.ReplicaSetName = setName
+
+ // create a session provider to connect to the db
+ sessionProvider, err := db.NewSessionProvider(*opts)
+ if err != nil {
+ log.Logvf(log.Always, "error connecting to host: %v", err)
+ os.Exit(util.ExitError)
+ }
+ sessionProvider.SetBypassDocumentValidation(ingestOpts.BypassDocumentValidation)
+
+ m := mongoimport.MongoImport{
+ ToolOptions: opts,
+ InputOptions: inputOpts,
+ IngestOptions: ingestOpts,
+ SessionProvider: sessionProvider,
+ }
+
+ if err = m.ValidateSettings(args); err != nil {
+ log.Logvf(log.Always, "error validating settings: %v", err)
+ log.Logvf(log.Always, "try 'mongoimport --help' for more information")
+ os.Exit(util.ExitError)
+ }
+
+ numDocs, err := m.ImportDocuments()
+ if !opts.Quiet {
+ if err != nil {
+ log.Logvf(log.Always, "Failed: %v", err)
+ }
+ message := fmt.Sprintf("imported 1 document")
+ if numDocs != 1 {
+ message = fmt.Sprintf("imported %v documents", numDocs)
+ }
+ log.Logvf(log.Always, message)
+ }
+ if err != nil {
+ os.Exit(util.ExitError)
+ }
+}
diff --git a/src/mongo/gotools/mongoimport/mongoimport.go b/src/mongo/gotools/mongoimport/mongoimport.go
new file mode 100644
index 00000000000..599760cb053
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/mongoimport.go
@@ -0,0 +1,575 @@
+// Package mongoimport allows importing content from a JSON, CSV, or TSV into a MongoDB instance.
+package mongoimport
+
+import (
+ "github.com/mongodb/mongo-tools/common/db"
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/progress"
+ "github.com/mongodb/mongo-tools/common/util"
+ "gopkg.in/mgo.v2"
+ "gopkg.in/mgo.v2/bson"
+ "gopkg.in/tomb.v2"
+
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "sync/atomic"
+)
+
+// Input format types accepted by mongoimport.
+const (
+ CSV = "csv"
+ TSV = "tsv"
+ JSON = "json"
+)
+
+const (
+ workerBufferSize = 16
+ progressBarLength = 24
+)
+
+// MongoImport is a container for the user-specified options and
+// internal state used for running mongoimport.
+type MongoImport struct {
+ // insertionCount keeps track of how many documents have successfully
+ // been inserted into the database
+ // updated atomically, aligned at the beginning of the struct
+ insertionCount uint64
+
+ // generic mongo tool options
+ ToolOptions *options.ToolOptions
+
+ // InputOptions defines options used to read data to be ingested
+ InputOptions *InputOptions
+
+ // IngestOptions defines options used to ingest data into MongoDB
+ IngestOptions *IngestOptions
+
+ // SessionProvider is used for connecting to the database
+ SessionProvider *db.SessionProvider
+
+ // the tomb is used to synchronize ingestion goroutines and causes
+ // other sibling goroutines to terminate immediately if one errors out
+ tomb.Tomb
+
+ // fields to use for upsert operations
+ upsertFields []string
+
+ // type of node the SessionProvider is connected to
+ nodeType db.NodeType
+}
+
+type InputReader interface {
+ // StreamDocument takes a boolean indicating if the documents should be streamed
+ // in read order and a channel on which to stream the documents processed from
+ // the underlying reader. Returns a non-nil error if encountered.
+ StreamDocument(ordered bool, read chan bson.D) error
+
+ // ReadAndValidateHeader reads the header line from the InputReader and returns
+ // a non-nil error if the fields from the header line are invalid; returns
+ // nil otherwise. No-op for JSON input readers.
+ ReadAndValidateHeader() error
+
+ // ReadAndValidateTypedHeader is the same as ReadAndValidateHeader,
+ // except it also parses types from the fields of the header. Parse errors
+ // will be handled according parseGrace.
+ ReadAndValidateTypedHeader(parseGrace ParseGrace) error
+
+ // embedded io.Reader that tracks number of bytes read, to allow feeding into progress bar.
+ sizeTracker
+}
+
+// ValidateSettings ensures that the tool specific options supplied for
+// MongoImport are valid.
+func (imp *MongoImport) ValidateSettings(args []string) error {
+ // namespace must have a valid database; if none is specified, use 'test'
+ if imp.ToolOptions.DB == "" {
+ imp.ToolOptions.DB = "test"
+ }
+ err := util.ValidateDBName(imp.ToolOptions.DB)
+ if err != nil {
+ return fmt.Errorf("invalid database name: %v", err)
+ }
+
+ imp.InputOptions.Type = strings.ToLower(imp.InputOptions.Type)
+ // use JSON as default input type
+ if imp.InputOptions.Type == "" {
+ imp.InputOptions.Type = JSON
+ } else {
+ if !(imp.InputOptions.Type == TSV ||
+ imp.InputOptions.Type == JSON ||
+ imp.InputOptions.Type == CSV) {
+ return fmt.Errorf("unknown type %v", imp.InputOptions.Type)
+ }
+ }
+
+ // ensure headers are supplied for CSV/TSV
+ if imp.InputOptions.Type == CSV ||
+ imp.InputOptions.Type == TSV {
+ if !imp.InputOptions.HeaderLine {
+ if imp.InputOptions.Fields == nil &&
+ imp.InputOptions.FieldFile == nil {
+ return fmt.Errorf("must specify --fields, --fieldFile or --headerline to import this file type")
+ }
+ if imp.InputOptions.FieldFile != nil &&
+ *imp.InputOptions.FieldFile == "" {
+ return fmt.Errorf("--fieldFile can not be empty string")
+ }
+ if imp.InputOptions.Fields != nil &&
+ imp.InputOptions.FieldFile != nil {
+ return fmt.Errorf("incompatible options: --fields and --fieldFile")
+ }
+ } else {
+ if imp.InputOptions.Fields != nil {
+ return fmt.Errorf("incompatible options: --fields and --headerline")
+ }
+ if imp.InputOptions.FieldFile != nil {
+ return fmt.Errorf("incompatible options: --fieldFile and --headerline")
+ }
+ }
+
+ if _, err := ValidatePG(imp.InputOptions.ParseGrace); err != nil {
+ return err
+ }
+ } else {
+ // input type is JSON
+ if imp.InputOptions.HeaderLine {
+ return fmt.Errorf("can not use --headerline when input type is JSON")
+ }
+ if imp.InputOptions.Fields != nil {
+ return fmt.Errorf("can not use --fields when input type is JSON")
+ }
+ if imp.InputOptions.FieldFile != nil {
+ return fmt.Errorf("can not use --fieldFile when input type is JSON")
+ }
+ if imp.IngestOptions.IgnoreBlanks {
+ return fmt.Errorf("can not use --ignoreBlanks when input type is JSON")
+ }
+ if imp.InputOptions.ColumnsHaveTypes {
+ return fmt.Errorf("can not use --columnsHaveTypes when input type is JSON")
+ }
+ }
+
+ if imp.IngestOptions.UpsertFields != "" {
+ imp.IngestOptions.Upsert = true
+ imp.upsertFields = strings.Split(imp.IngestOptions.UpsertFields, ",")
+ if err := validateFields(imp.upsertFields); err != nil {
+ return fmt.Errorf("invalid --upsertFields argument: %v", err)
+ }
+ } else if imp.IngestOptions.Upsert {
+ imp.upsertFields = []string{"_id"}
+ }
+
+ if imp.IngestOptions.Upsert {
+ imp.IngestOptions.MaintainInsertionOrder = true
+ log.Logvf(log.Info, "using upsert fields: %v", imp.upsertFields)
+ }
+
+ // set the number of decoding workers to use for imports
+ if imp.IngestOptions.NumDecodingWorkers <= 0 {
+ imp.IngestOptions.NumDecodingWorkers = imp.ToolOptions.MaxProcs
+ }
+ log.Logvf(log.DebugLow, "using %v decoding workers", imp.IngestOptions.NumDecodingWorkers)
+
+ // set the number of insertion workers to use for imports
+ if imp.IngestOptions.NumInsertionWorkers <= 0 {
+ imp.IngestOptions.NumInsertionWorkers = 1
+ }
+
+ log.Logvf(log.DebugLow, "using %v insert workers", imp.IngestOptions.NumInsertionWorkers)
+
+ // if --maintainInsertionOrder is set, we can only allow 1 insertion worker
+ if imp.IngestOptions.MaintainInsertionOrder {
+ imp.IngestOptions.NumInsertionWorkers = 1
+ }
+
+ // get the number of documents per batch
+ if imp.IngestOptions.BulkBufferSize <= 0 || imp.IngestOptions.BulkBufferSize > 1000 {
+ imp.IngestOptions.BulkBufferSize = 1000
+ }
+
+ // ensure no more than one positional argument is supplied
+ if len(args) > 1 {
+ return fmt.Errorf("only one positional argument is allowed")
+ }
+
+ // ensure either a positional argument is supplied or an argument is passed
+ // to the --file flag - and not both
+ if imp.InputOptions.File != "" && len(args) != 0 {
+ return fmt.Errorf("incompatible options: --file and positional argument(s)")
+ }
+
+ if imp.InputOptions.File == "" {
+ if len(args) != 0 {
+ // if --file is not supplied, use the positional argument supplied
+ imp.InputOptions.File = args[0]
+ }
+ }
+
+ // ensure we have a valid string to use for the collection
+ if imp.ToolOptions.Collection == "" {
+ log.Logvf(log.Always, "no collection specified")
+ fileBaseName := filepath.Base(imp.InputOptions.File)
+ lastDotIndex := strings.LastIndex(fileBaseName, ".")
+ if lastDotIndex != -1 {
+ fileBaseName = fileBaseName[0:lastDotIndex]
+ }
+ log.Logvf(log.Always, "using filename '%v' as collection", fileBaseName)
+ imp.ToolOptions.Collection = fileBaseName
+ }
+ err = util.ValidateCollectionName(imp.ToolOptions.Collection)
+ if err != nil {
+ return fmt.Errorf("invalid collection name: %v", err)
+ }
+ return nil
+}
+
+// getSourceReader returns an io.Reader to read from the input source. Also
+// returns a progress.Progressor which can be used to track progress if the
+// reader supports it.
+func (imp *MongoImport) getSourceReader() (io.ReadCloser, int64, error) {
+ if imp.InputOptions.File != "" {
+ file, err := os.Open(util.ToUniversalPath(imp.InputOptions.File))
+ if err != nil {
+ return nil, -1, err
+ }
+ fileStat, err := file.Stat()
+ if err != nil {
+ return nil, -1, err
+ }
+ log.Logvf(log.Info, "filesize: %v bytes", fileStat.Size())
+ return file, int64(fileStat.Size()), err
+ }
+
+ log.Logvf(log.Info, "reading from stdin")
+
+ // Stdin has undefined max size, so return 0
+ return os.Stdin, 0, nil
+}
+
+// fileSizeProgressor implements Progressor to allow a sizeTracker to hook up with a
+// progress.Bar instance, so that the progress bar can report the percentage of the file read.
+type fileSizeProgressor struct {
+ max int64
+ sizeTracker
+}
+
+func (fsp *fileSizeProgressor) Progress() (int64, int64) {
+ return fsp.max, fsp.sizeTracker.Size()
+}
+
+// ImportDocuments is used to write input data to the database. It returns the
+// number of documents successfully imported to the appropriate namespace and
+// any error encountered in doing this
+func (imp *MongoImport) ImportDocuments() (uint64, error) {
+ source, fileSize, err := imp.getSourceReader()
+ if err != nil {
+ return 0, err
+ }
+ defer source.Close()
+
+ inputReader, err := imp.getInputReader(source)
+ if err != nil {
+ return 0, err
+ }
+
+ if imp.InputOptions.HeaderLine {
+ if imp.InputOptions.ColumnsHaveTypes {
+ err = inputReader.ReadAndValidateTypedHeader(ParsePG(imp.InputOptions.ParseGrace))
+ } else {
+ err = inputReader.ReadAndValidateHeader()
+ }
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ bar := &progress.Bar{
+ Name: fmt.Sprintf("%v.%v", imp.ToolOptions.DB, imp.ToolOptions.Collection),
+ Watching: &fileSizeProgressor{fileSize, inputReader},
+ Writer: log.Writer(0),
+ BarLength: progressBarLength,
+ IsBytes: true,
+ }
+ bar.Start()
+ defer bar.Stop()
+ return imp.importDocuments(inputReader)
+}
+
+// importDocuments is a helper to ImportDocuments and does all the ingestion
+// work by taking data from the inputReader source and writing it to the
+// appropriate namespace
+func (imp *MongoImport) importDocuments(inputReader InputReader) (numImported uint64, retErr error) {
+ session, err := imp.SessionProvider.GetSession()
+ if err != nil {
+ return 0, err
+ }
+ defer session.Close()
+
+ connURL := imp.ToolOptions.Host
+ if connURL == "" {
+ connURL = util.DefaultHost
+ }
+ if imp.ToolOptions.Port != "" {
+ connURL = connURL + ":" + imp.ToolOptions.Port
+ }
+ log.Logvf(log.Always, "connected to: %v", connURL)
+
+ log.Logvf(log.Info, "ns: %v.%v",
+ imp.ToolOptions.Namespace.DB,
+ imp.ToolOptions.Namespace.Collection)
+
+ // check if the server is a replica set, mongos, or standalone
+ imp.nodeType, err = imp.SessionProvider.GetNodeType()
+ if err != nil {
+ return 0, fmt.Errorf("error checking connected node type: %v", err)
+ }
+ log.Logvf(log.Info, "connected to node type: %v", imp.nodeType)
+
+ if err = imp.configureSession(session); err != nil {
+ return 0, fmt.Errorf("error configuring session: %v", err)
+ }
+
+ // drop the database if necessary
+ if imp.IngestOptions.Drop {
+ log.Logvf(log.Always, "dropping: %v.%v",
+ imp.ToolOptions.DB,
+ imp.ToolOptions.Collection)
+ collection := session.DB(imp.ToolOptions.DB).
+ C(imp.ToolOptions.Collection)
+ if err := collection.DropCollection(); err != nil {
+ if err.Error() != db.ErrNsNotFound {
+ return 0, err
+ }
+ }
+ }
+
+ readDocs := make(chan bson.D, workerBufferSize)
+ processingErrChan := make(chan error)
+ ordered := imp.IngestOptions.MaintainInsertionOrder
+
+ // read and process from the input reader
+ go func() {
+ processingErrChan <- inputReader.StreamDocument(ordered, readDocs)
+ }()
+
+ // insert documents into the target database
+ go func() {
+ processingErrChan <- imp.ingestDocuments(readDocs)
+ }()
+
+ e1 := channelQuorumError(processingErrChan, 2)
+ insertionCount := atomic.LoadUint64(&imp.insertionCount)
+ return insertionCount, e1
+}
+
+// ingestDocuments accepts a channel from which it reads documents to be inserted
+// into the target collection. It spreads the insert/upsert workload across one
+// or more workers.
+func (imp *MongoImport) ingestDocuments(readDocs chan bson.D) (retErr error) {
+ numInsertionWorkers := imp.IngestOptions.NumInsertionWorkers
+ if numInsertionWorkers <= 0 {
+ numInsertionWorkers = 1
+ }
+
+ // Each ingest worker will return an error which will
+ // be set in the following cases:
+ //
+ // 1. There is a problem connecting with the server
+ // 2. The server becomes unreachable
+ // 3. There is an insertion/update error - e.g. duplicate key
+ // error - and stopOnError is set to true
+
+ wg := new(sync.WaitGroup)
+ for i := 0; i < numInsertionWorkers; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ // only set the first insertion error and cause sibling goroutines to terminate immediately
+ err := imp.runInsertionWorker(readDocs)
+ if err != nil && retErr == nil {
+ retErr = err
+ imp.Kill(err)
+ }
+ }()
+ }
+ wg.Wait()
+ return
+}
+
+// configureSession takes in a session and modifies it with properly configured
+// settings. It does the following configurations:
+//
+// 1. Sets the session to not timeout
+// 2. Sets the write concern on the session
+// 3. Sets the session safety
+//
+// returns an error if it's unable to set the write concern
+func (imp *MongoImport) configureSession(session *mgo.Session) error {
+ // sockets to the database will never be forcibly closed
+ session.SetSocketTimeout(0)
+ sessionSafety, err := db.BuildWriteConcern(imp.IngestOptions.WriteConcern, imp.nodeType)
+ if err != nil {
+ return fmt.Errorf("write concern error: %v", err)
+ }
+ session.SetSafe(sessionSafety)
+
+ return nil
+}
+
+type flushInserter interface {
+ Insert(doc interface{}) error
+ Flush() error
+}
+
+// runInsertionWorker is a helper to InsertDocuments - it reads document off
+// the read channel and prepares then in batches for insertion into the databas
+func (imp *MongoImport) runInsertionWorker(readDocs chan bson.D) (err error) {
+ session, err := imp.SessionProvider.GetSession()
+ if err != nil {
+ return fmt.Errorf("error connecting to mongod: %v", err)
+ }
+ defer session.Close()
+ if err = imp.configureSession(session); err != nil {
+ return fmt.Errorf("error configuring session: %v", err)
+ }
+ collection := session.DB(imp.ToolOptions.DB).C(imp.ToolOptions.Collection)
+
+ var inserter flushInserter
+ if imp.IngestOptions.Upsert {
+ inserter = imp.newUpserter(collection)
+ } else {
+ inserter = db.NewBufferedBulkInserter(collection, imp.IngestOptions.BulkBufferSize, !imp.IngestOptions.StopOnError)
+ if !imp.IngestOptions.MaintainInsertionOrder {
+ inserter.(*db.BufferedBulkInserter).Unordered()
+ }
+ }
+
+readLoop:
+ for {
+ select {
+ case document, alive := <-readDocs:
+ if !alive {
+ break readLoop
+ }
+ err = filterIngestError(imp.IngestOptions.StopOnError, inserter.Insert(document))
+ if err != nil {
+ return err
+ }
+ atomic.AddUint64(&imp.insertionCount, 1)
+ case <-imp.Dying():
+ return nil
+ }
+ }
+
+ err = inserter.Flush()
+ // TOOLS-349 correct import count for bulk operations
+ if bulkError, ok := err.(*mgo.BulkError); ok {
+ failedDocs := make(map[int]bool) // index of failures
+ for _, failure := range bulkError.Cases() {
+ failedDocs[failure.Index] = true
+ }
+ numFailures := len(failedDocs)
+ if numFailures > 0 {
+ log.Logvf(log.Always, "num failures: %d", numFailures)
+ atomic.AddUint64(&imp.insertionCount, ^uint64(numFailures-1))
+ }
+ }
+ return filterIngestError(imp.IngestOptions.StopOnError, err)
+}
+
+type upserter struct {
+ imp *MongoImport
+ collection *mgo.Collection
+}
+
+func (imp *MongoImport) newUpserter(collection *mgo.Collection) *upserter {
+ return &upserter{
+ imp: imp,
+ collection: collection,
+ }
+}
+
+// Insert is part of the flushInserter interface and performs
+// upserts or inserts.
+func (up *upserter) Insert(doc interface{}) error {
+ document := doc.(bson.D)
+ selector := constructUpsertDocument(up.imp.upsertFields, document)
+ var err error
+ if selector == nil {
+ err = up.collection.Insert(document)
+ } else {
+ _, err = up.collection.Upsert(selector, document)
+ }
+ return err
+}
+
+// Flush is needed so that upserter implements flushInserter, but upserter
+// doesn't buffer anything so we don't need to do anything in Flush.
+func (up *upserter) Flush() error {
+ return nil
+}
+
+func splitInlineHeader(header string) (headers []string) {
+ var level uint8
+ var currentField string
+ for _, c := range header {
+ if c == '(' {
+ level++
+ } else if c == ')' && level > 0 {
+ level--
+ }
+ if c == ',' && level == 0 {
+ headers = append(headers, currentField)
+ currentField = ""
+ } else {
+ currentField = currentField + string(c)
+ }
+ }
+ headers = append(headers, currentField) // add last field
+ return
+}
+
+// getInputReader returns an implementation of InputReader based on the input type
+func (imp *MongoImport) getInputReader(in io.Reader) (InputReader, error) {
+ var colSpecs []ColumnSpec
+ var headers []string
+ var err error
+ if imp.InputOptions.Fields != nil {
+ headers = splitInlineHeader(*imp.InputOptions.Fields)
+ } else if imp.InputOptions.FieldFile != nil {
+ headers, err = util.GetFieldsFromFile(*imp.InputOptions.FieldFile)
+ if err != nil {
+ return nil, err
+ }
+ }
+ if imp.InputOptions.ColumnsHaveTypes {
+ colSpecs, err = ParseTypedHeaders(headers, ParsePG(imp.InputOptions.ParseGrace))
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ colSpecs = ParseAutoHeaders(headers)
+ }
+
+ // header fields validation can only happen once we have an input reader
+ if !imp.InputOptions.HeaderLine {
+ if err = validateReaderFields(ColumnNames(colSpecs)); err != nil {
+ return nil, err
+ }
+ }
+
+ out := os.Stdout
+
+ ignoreBlanks := imp.IngestOptions.IgnoreBlanks && imp.InputOptions.Type != JSON
+ if imp.InputOptions.Type == CSV {
+ return NewCSVInputReader(colSpecs, in, out, imp.IngestOptions.NumDecodingWorkers, ignoreBlanks), nil
+ } else if imp.InputOptions.Type == TSV {
+ return NewTSVInputReader(colSpecs, in, out, imp.IngestOptions.NumDecodingWorkers, ignoreBlanks), nil
+ }
+ return NewJSONInputReader(imp.InputOptions.JSONArray, in, imp.IngestOptions.NumDecodingWorkers), nil
+}
diff --git a/src/mongo/gotools/mongoimport/mongoimport_test.go b/src/mongo/gotools/mongoimport/mongoimport_test.go
new file mode 100644
index 00000000000..ff10ec5525c
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/mongoimport_test.go
@@ -0,0 +1,757 @@
+package mongoimport
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "reflect"
+ "strings"
+ "testing"
+
+ "github.com/mongodb/mongo-tools/common/db"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+)
+
+const (
+ testDb = "db"
+ testCollection = "c"
+)
+
+// checkOnlyHasDocuments returns an error if the documents in the test
+// collection don't exactly match those that are passed in
+func checkOnlyHasDocuments(sessionProvider db.SessionProvider, expectedDocuments []bson.M) error {
+ session, err := sessionProvider.GetSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+
+ collection := session.DB(testDb).C(testCollection)
+ dbDocuments := []bson.M{}
+ err = collection.Find(nil).Sort("_id").All(&dbDocuments)
+ if err != nil {
+ return err
+ }
+ if len(dbDocuments) != len(expectedDocuments) {
+ return fmt.Errorf("document count mismatch: expected %#v, got %#v",
+ len(expectedDocuments), len(dbDocuments))
+ }
+ for index := range dbDocuments {
+ if !reflect.DeepEqual(dbDocuments[index], expectedDocuments[index]) {
+ return fmt.Errorf("document mismatch: expected %#v, got %#v",
+ expectedDocuments[index], dbDocuments[index])
+ }
+ }
+ return nil
+}
+
+// getBasicToolOptions returns a test helper to instantiate the session provider
+// for calls to StreamDocument
+func getBasicToolOptions() *options.ToolOptions {
+ general := &options.General{}
+ ssl := testutil.GetSSLOptions()
+ auth := testutil.GetAuthOptions()
+ namespace := &options.Namespace{
+ DB: testDb,
+ Collection: testCollection,
+ }
+ connection := &options.Connection{
+ Host: "localhost",
+ Port: db.DefaultTestPort,
+ }
+ return &options.ToolOptions{
+ General: general,
+ SSL: &ssl,
+ Namespace: namespace,
+ Connection: connection,
+ Auth: &auth,
+ }
+}
+
+func NewMongoImport() (*MongoImport, error) {
+ toolOptions := getBasicToolOptions()
+ inputOptions := &InputOptions{
+ ParseGrace: "stop",
+ }
+ ingestOptions := &IngestOptions{}
+ provider, err := db.NewSessionProvider(*toolOptions)
+ if err != nil {
+ return nil, err
+ }
+ return &MongoImport{
+ ToolOptions: toolOptions,
+ InputOptions: inputOptions,
+ IngestOptions: ingestOptions,
+ SessionProvider: provider,
+ }, nil
+}
+
+func TestSplitInlineHeader(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("handle normal, untyped headers", t, func() {
+ fields := []string{"foo.bar", "baz", "boo"}
+ header := strings.Join(fields, ",")
+ Convey("with '"+header+"'", func() {
+ So(splitInlineHeader(header), ShouldResemble, fields)
+ })
+ })
+ Convey("handle typed headers", t, func() {
+ fields := []string{"foo.bar.string()", "baz.date(January 2 2006)", "boo.binary(hex)"}
+ header := strings.Join(fields, ",")
+ Convey("with '"+header+"'", func() {
+ So(splitInlineHeader(header), ShouldResemble, fields)
+ })
+ })
+ Convey("handle typed headers that include commas", t, func() {
+ fields := []string{"foo.bar.date(,,,,)", "baz.date(January 2, 2006)", "boo.binary(hex)"}
+ header := strings.Join(fields, ",")
+ Convey("with '"+header+"'", func() {
+ So(splitInlineHeader(header), ShouldResemble, fields)
+ })
+ })
+}
+
+func TestMongoImportValidateSettings(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Given a mongoimport instance for validation, ", t, func() {
+ Convey("an error should be thrown if no collection is given", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.ToolOptions.Namespace.DB = ""
+ imp.ToolOptions.Namespace.Collection = ""
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if an invalid type is given", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = "invalid"
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if neither --headerline is supplied "+
+ "nor --fields/--fieldFile", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("no error should be thrown if --headerline is not supplied "+
+ "but --fields is supplied", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fields := "a,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.InputOptions.Type = CSV
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("no error should be thrown if no input type is supplied", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("no error should be thrown if there's just one positional argument", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ So(imp.ValidateSettings([]string{"a"}), ShouldBeNil)
+ })
+
+ Convey("an error should be thrown if --file is used with one positional argument", func() {
+ imp, err := NewMongoImport()
+ imp.InputOptions.File = "abc"
+ So(err, ShouldBeNil)
+ So(imp.ValidateSettings([]string{"a"}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if there's more than one positional argument", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ So(imp.ValidateSettings([]string{"a", "b"}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if --headerline is used with JSON input", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.HeaderLine = true
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if --fields is used with JSON input", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fields := ""
+ imp.InputOptions.Fields = &fields
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ fields = "a,b,c"
+ imp.InputOptions.Fields = &fields
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if --fieldFile is used with JSON input", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := ""
+ imp.InputOptions.FieldFile = &fieldFile
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ fieldFile = "test.csv"
+ imp.InputOptions.FieldFile = &fieldFile
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("an error should be thrown if --ignoreBlanks is used with JSON input", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.IngestOptions.IgnoreBlanks = true
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("no error should be thrown if --headerline is not supplied "+
+ "but --fieldFile is supplied", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "test.csv"
+ imp.InputOptions.FieldFile = &fieldFile
+ imp.InputOptions.Type = CSV
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("an error should be thrown if a field in the --upsertFields "+
+ "argument starts with a dollar sign", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.HeaderLine = true
+ imp.InputOptions.Type = CSV
+ imp.IngestOptions.Upsert = true
+ imp.IngestOptions.UpsertFields = "a,$b,c"
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ imp.IngestOptions.UpsertFields = "a,.b,c"
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("no error should be thrown if --upsertFields is supplied without "+
+ "--upsert", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.HeaderLine = true
+ imp.InputOptions.Type = CSV
+ imp.IngestOptions.UpsertFields = "a,b,c"
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("if --upsert is used without --upsertFields, _id should be set as "+
+ "the upsert field", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.HeaderLine = true
+ imp.InputOptions.Type = CSV
+ imp.IngestOptions.Upsert = true
+ imp.IngestOptions.UpsertFields = ""
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ So(imp.upsertFields, ShouldResemble, []string{"_id"})
+ })
+
+ Convey("no error should be thrown if all fields in the --upsertFields "+
+ "argument are valid", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.HeaderLine = true
+ imp.InputOptions.Type = CSV
+ imp.IngestOptions.Upsert = true
+ imp.IngestOptions.UpsertFields = "a,b,c"
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("no error should be thrown if --fields is supplied with CSV import", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fields := "a,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.InputOptions.Type = CSV
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("an error should be thrown if an empty --fields is supplied with CSV import", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fields := ""
+ imp.InputOptions.Fields = &fields
+ imp.InputOptions.Type = CSV
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("no error should be thrown if --fieldFile is supplied with CSV import", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "test.csv"
+ imp.InputOptions.FieldFile = &fieldFile
+ imp.InputOptions.Type = CSV
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ })
+
+ Convey("an error should be thrown if no collection and no file is supplied", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "test.csv"
+ imp.InputOptions.FieldFile = &fieldFile
+ imp.InputOptions.Type = CSV
+ imp.ToolOptions.Namespace.Collection = ""
+ So(imp.ValidateSettings([]string{}), ShouldNotBeNil)
+ })
+
+ Convey("no error should be thrown if --file is used (without -c) supplied "+
+ "- the file name should be used as the collection name", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "input"
+ imp.InputOptions.HeaderLine = true
+ imp.InputOptions.Type = CSV
+ imp.ToolOptions.Namespace.Collection = ""
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ So(imp.ToolOptions.Namespace.Collection, ShouldEqual,
+ imp.InputOptions.File)
+ })
+
+ Convey("with no collection name and a file name the base name of the "+
+ "file (without the extension) should be used as the collection name", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "/path/to/input/file/dot/input.txt"
+ imp.InputOptions.HeaderLine = true
+ imp.InputOptions.Type = CSV
+ imp.ToolOptions.Namespace.Collection = ""
+ So(imp.ValidateSettings([]string{}), ShouldBeNil)
+ So(imp.ToolOptions.Namespace.Collection, ShouldEqual, "input")
+ })
+ })
+}
+
+func TestGetSourceReader(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("Given a mongoimport instance, on calling getSourceReader", t,
+ func() {
+ Convey("an error should be thrown if the given file referenced by "+
+ "the reader does not exist", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "/path/to/input/file/dot/input.txt"
+ imp.InputOptions.Type = CSV
+ imp.ToolOptions.Namespace.Collection = ""
+ _, _, err = imp.getSourceReader()
+ So(err, ShouldNotBeNil)
+ })
+
+ Convey("no error should be thrown if the file exists", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "testdata/test_array.json"
+ imp.InputOptions.Type = JSON
+ _, _, err = imp.getSourceReader()
+ So(err, ShouldBeNil)
+ })
+
+ Convey("no error should be thrown if stdin is used", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = ""
+ _, _, err = imp.getSourceReader()
+ So(err, ShouldBeNil)
+ })
+ })
+}
+
+func TestGetInputReader(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("Given a io.Reader on calling getInputReader", t, func() {
+ Convey("should parse --fields using valid csv escaping", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Fields = new(string)
+ *imp.InputOptions.Fields = "foo.auto(),bar.date(January 2, 2006)"
+ imp.InputOptions.File = "/path/to/input/file/dot/input.txt"
+ imp.InputOptions.ColumnsHaveTypes = true
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("should complain about non-escaped new lines in --fields", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Fields = new(string)
+ *imp.InputOptions.Fields = "foo.auto(),\nblah.binary(hex),bar.date(January 2, 2006)"
+ imp.InputOptions.File = "/path/to/input/file/dot/input.txt"
+ imp.InputOptions.ColumnsHaveTypes = true
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("no error should be thrown if neither --fields nor --fieldFile "+
+ "is used", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "/path/to/input/file/dot/input.txt"
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("no error should be thrown if --fields is used", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fields := "a,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.InputOptions.File = "/path/to/input/file/dot/input.txt"
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("no error should be thrown if --fieldFile is used and it "+
+ "references a valid file", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "testdata/test.csv"
+ imp.InputOptions.FieldFile = &fieldFile
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("an error should be thrown if --fieldFile is used and it "+
+ "references an invalid file", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "/path/to/input/file/dot/input.txt"
+ imp.InputOptions.FieldFile = &fieldFile
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldNotBeNil)
+ })
+ Convey("no error should be thrown for CSV import inputs", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("no error should be thrown for TSV import inputs", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = TSV
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("no error should be thrown for JSON import inputs", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = JSON
+ _, err = imp.getInputReader(&os.File{})
+ So(err, ShouldBeNil)
+ })
+ Convey("an error should be thrown if --fieldFile fields are invalid", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "testdata/test_fields_invalid.txt"
+ imp.InputOptions.FieldFile = &fieldFile
+ file, err := os.Open(fieldFile)
+ So(err, ShouldBeNil)
+ _, err = imp.getInputReader(file)
+ So(err, ShouldNotBeNil)
+ })
+ Convey("no error should be thrown if --fieldFile fields are valid", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ fieldFile := "testdata/test_fields_valid.txt"
+ imp.InputOptions.FieldFile = &fieldFile
+ file, err := os.Open(fieldFile)
+ So(err, ShouldBeNil)
+ _, err = imp.getInputReader(file)
+ So(err, ShouldBeNil)
+ })
+ })
+}
+
+func TestImportDocuments(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.IntegrationTestType)
+ Convey("With a mongoimport instance", t, func() {
+ Reset(func() {
+ sessionProvider, err := db.NewSessionProvider(*getBasicToolOptions())
+ if err != nil {
+ t.Fatalf("error getting session provider session: %v", err)
+ }
+ session, err := sessionProvider.GetSession()
+ if err != nil {
+ t.Fatalf("error getting session: %v", err)
+ }
+ defer session.Close()
+ session.DB(testDb).C(testCollection).DropCollection()
+ })
+ Convey("no error should be thrown for CSV import on test data and all "+
+ "CSV data lines should be imported correctly", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test.csv"
+ fields := "a,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.WriteConcern = "majority"
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ })
+ Convey("an error should be thrown for JSON import on test data that is "+
+ "JSON array", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "testdata/test_array.json"
+ imp.IngestOptions.WriteConcern = "majority"
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldNotBeNil)
+ So(numImported, ShouldEqual, 0)
+ })
+ Convey("TOOLS-247: no error should be thrown for JSON import on test "+
+ "data and all documents should be imported correctly", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "testdata/test_plain2.json"
+ imp.IngestOptions.WriteConcern = "majority"
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 10)
+ })
+ Convey("CSV import with --ignoreBlanks should import only non-blank fields", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test_blanks.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.IgnoreBlanks = true
+
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2)},
+ bson.M{"_id": 5, "c": "6e"},
+ bson.M{"_id": 7, "b": int(8), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("CSV import without --ignoreBlanks should include blanks", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test_blanks.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": ""},
+ bson.M{"_id": 5, "b": "", "c": "6e"},
+ bson.M{"_id": 7, "b": int(8), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("no error should be thrown for CSV import on test data with --upsertFields", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.UpsertFields = "b,c"
+ imp.IngestOptions.MaintainInsertionOrder = true
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": int(3)},
+ bson.M{"_id": 3, "b": 5.4, "c": "string"},
+ bson.M{"_id": 5, "b": int(6), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("no error should be thrown for CSV import on test data with "+
+ "--stopOnError. Only documents before error should be imported", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.StopOnError = true
+ imp.IngestOptions.MaintainInsertionOrder = true
+ imp.IngestOptions.WriteConcern = "majority"
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": int(3)},
+ bson.M{"_id": 3, "b": 5.4, "c": "string"},
+ bson.M{"_id": 5, "b": int(6), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("CSV import with duplicate _id's should not error if --stopOnError is not set", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test_duplicate.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.StopOnError = false
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 5)
+
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": int(3)},
+ bson.M{"_id": 3, "b": 5.4, "c": "string"},
+ bson.M{"_id": 5, "b": int(6), "c": int(6)},
+ bson.M{"_id": 8, "b": int(6), "c": int(6)},
+ }
+ // all docs except the one with duplicate _id - should be imported
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("no error should be thrown for CSV import on test data with --drop", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.Drop = true
+ imp.IngestOptions.MaintainInsertionOrder = true
+ imp.IngestOptions.WriteConcern = "majority"
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": int(3)},
+ bson.M{"_id": 3, "b": 5.4, "c": "string"},
+ bson.M{"_id": 5, "b": int(6), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("CSV import on test data with --headerLine should succeed", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.InputOptions.HeaderLine = true
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 2)
+ })
+ Convey("EOF should be thrown for CSV import with --headerLine if file is empty", func() {
+ csvFile, err := ioutil.TempFile("", "mongoimport_")
+ So(err, ShouldBeNil)
+ csvFile.Close()
+
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = csvFile.Name()
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.InputOptions.HeaderLine = true
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldEqual, io.EOF)
+ So(numImported, ShouldEqual, 0)
+ })
+ Convey("CSV import with --upsert and --upsertFields should succeed", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test.csv"
+ fields := "_id,c,b"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.UpsertFields = "_id"
+ imp.IngestOptions.MaintainInsertionOrder = true
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 3)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "c": int(2), "b": int(3)},
+ bson.M{"_id": 3, "c": 5.4, "b": "string"},
+ bson.M{"_id": 5, "c": int(6), "b": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("CSV import with --upsert/--upsertFields with duplicate id should succeed "+
+ "if stopOnError is not set", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test_duplicate.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.Upsert = true
+ imp.upsertFields = []string{"_id"}
+ numImported, err := imp.ImportDocuments()
+ So(err, ShouldBeNil)
+ So(numImported, ShouldEqual, 5)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": int(3)},
+ bson.M{"_id": 3, "b": 5.4, "c": "string"},
+ bson.M{"_id": 5, "b": int(6), "c": int(9)},
+ bson.M{"_id": 8, "b": int(6), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("an error should be thrown for CSV import on test data with "+
+ "duplicate _id if --stopOnError is set", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test_duplicate.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.StopOnError = true
+ imp.IngestOptions.WriteConcern = "1"
+ imp.IngestOptions.MaintainInsertionOrder = true
+ _, err = imp.ImportDocuments()
+ So(err, ShouldNotBeNil)
+ expectedDocuments := []bson.M{
+ bson.M{"_id": 1, "b": int(2), "c": int(3)},
+ bson.M{"_id": 3, "b": 5.4, "c": "string"},
+ bson.M{"_id": 5, "b": int(6), "c": int(6)},
+ }
+ So(checkOnlyHasDocuments(*imp.SessionProvider, expectedDocuments), ShouldBeNil)
+ })
+ Convey("an error should be thrown for JSON import on test data that "+
+ "is a JSON array without passing --jsonArray", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.File = "testdata/test_array.json"
+ imp.IngestOptions.WriteConcern = "1"
+ _, err = imp.ImportDocuments()
+ So(err, ShouldNotBeNil)
+ })
+ Convey("an error should be thrown if a plain JSON file is supplied", func() {
+ fileHandle, err := os.Open("testdata/test_plain.json")
+ So(err, ShouldBeNil)
+ jsonInputReader := NewJSONInputReader(true, fileHandle, 1)
+ docChan := make(chan bson.D, 1)
+ So(jsonInputReader.StreamDocument(true, docChan), ShouldNotBeNil)
+ })
+ Convey("an error should be thrown for invalid CSV import on test data", func() {
+ imp, err := NewMongoImport()
+ So(err, ShouldBeNil)
+ imp.InputOptions.Type = CSV
+ imp.InputOptions.File = "testdata/test_bad.csv"
+ fields := "_id,b,c"
+ imp.InputOptions.Fields = &fields
+ imp.IngestOptions.StopOnError = true
+ imp.IngestOptions.WriteConcern = "1"
+ imp.IngestOptions.MaintainInsertionOrder = true
+ _, err = imp.ImportDocuments()
+ So(err, ShouldNotBeNil)
+ })
+ })
+}
diff --git a/src/mongo/gotools/mongoimport/options.go b/src/mongo/gotools/mongoimport/options.go
new file mode 100644
index 00000000000..2fe252fb527
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/options.go
@@ -0,0 +1,79 @@
+package mongoimport
+
+var Usage = `<options> <file>
+
+Import CSV, TSV or JSON data into MongoDB. If no file is provided, mongoimport reads from stdin.
+
+See http://docs.mongodb.org/manual/reference/program/mongoimport/ for more information.`
+
+// InputOptions defines the set of options for reading input data.
+type InputOptions struct {
+ // Fields is an option to directly specify comma-separated fields to import to CSV.
+ Fields *string `long:"fields" value-name:"<field>[,<field>]*" short:"f" description:"comma separated list of fields, e.g. -f name,age"`
+
+ // FieldFile is a filename that refers to a list of fields to import, 1 per line.
+ FieldFile *string `long:"fieldFile" value-name:"<filename>" description:"file with field names - 1 per line"`
+
+ // Specifies the location and name of a file containing the data to import.
+ File string `long:"file" value-name:"<filename>" description:"file to import from; if not specified, stdin is used"`
+
+ // Treats the input source's first line as field list (csv and tsv only).
+ HeaderLine bool `long:"headerline" description:"use first line in input source as the field list (CSV and TSV only)"`
+
+ // Indicates that the underlying input source contains a single JSON array with the documents to import.
+ JSONArray bool `long:"jsonArray" description:"treat input source as a JSON array"`
+
+ // Indicates how to handle type coercion failures
+ ParseGrace string `long:"parseGrace" value-name:"<grace>" default:"stop" description:"controls behavior when type coercion fails - one of: autoCast, skipField, skipRow, stop (defaults to 'stop')"`
+
+ // Specifies the file type to import. The default format is JSON, but it’s possible to import CSV and TSV files.
+ Type string `long:"type" value-name:"<type>" default:"json" default-mask:"-" description:"input format to import: json, csv, or tsv (defaults to 'json')"`
+
+ // Indicates that field names include type descriptions
+ ColumnsHaveTypes bool `long:"columnsHaveTypes" description:"indicated that the field list (from --fields, --fieldsFile, or --headerline) specifies types; They must be in the form of '<colName>.<type>(<arg>)'. The type can be one of: auto, binary, bool, date, date_go, date_ms, date_oracle, double, int32, int64, string. For each of the date types, the argument is a datetime layout string. For the binary type, the argument can be one of: base32, base64, hex. All other types take an empty argument. Only valid for CSV and TSV imports. e.g. zipcode.string(), thumbnail.binary(base64)"`
+}
+
+// Name returns a description of the InputOptions struct.
+func (_ *InputOptions) Name() string {
+ return "input"
+}
+
+// IngestOptions defines the set of options for storing data.
+type IngestOptions struct {
+ // Drops target collection before importing.
+ Drop bool `long:"drop" description:"drop collection before inserting documents"`
+
+ // Ignores fields with empty values in CSV and TSV imports.
+ IgnoreBlanks bool `long:"ignoreBlanks" description:"ignore fields with empty values in CSV and TSV"`
+
+ // Indicates that documents will be inserted in the order of their appearance in the input source.
+ MaintainInsertionOrder bool `long:"maintainInsertionOrder" description:"insert documents in the order of their appearance in the input source"`
+
+ // Sets the number of insertion routines to use
+ NumInsertionWorkers int `short:"j" value-name:"<number>" long:"numInsertionWorkers" description:"number of insert operations to run concurrently (defaults to 1)" default:"1" default-mask:"-"`
+
+ // Forces mongoimport to halt the import operation at the first insert or upsert error.
+ StopOnError bool `long:"stopOnError" description:"stop importing at first insert/upsert error"`
+
+ // Modifies the import process to update existing objects in the database if they match --upsertFields.
+ Upsert bool `long:"upsert" description:"insert or update objects that already exist"`
+
+ // Specifies a list of fields for the query portion of the upsert; defaults to _id field.
+ UpsertFields string `long:"upsertFields" value-name:"<field>[,<field>]*" description:"comma-separated fields for the query part of the upsert"`
+
+ // Sets write concern level for write operations.
+ WriteConcern string `long:"writeConcern" default:"majority" value-name:"<write-concern-specifier>" default-mask:"-" description:"write concern options e.g. --writeConcern majority, --writeConcern '{w: 3, wtimeout: 500, fsync: true, j: true}' (defaults to 'majority')"`
+
+ // Indicates that the server should bypass document validation on import.
+ BypassDocumentValidation bool `long:"bypassDocumentValidation" description:"bypass document validation"`
+
+ // Specifies the number of threads to use in processing data read from the input source
+ NumDecodingWorkers int `long:"numDecodingWorkers" default:"0" hidden:"true"`
+
+ BulkBufferSize int `long:"batchSize" default:"1000" hidden:"true"`
+}
+
+// Name returns a description of the IngestOptions struct.
+func (_ *IngestOptions) Name() string {
+ return "ingest"
+}
diff --git a/src/mongo/gotools/mongoimport/testdata/test.csv b/src/mongo/gotools/mongoimport/testdata/test.csv
new file mode 100644
index 00000000000..357d40e6da3
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test.csv
@@ -0,0 +1,3 @@
+1,2,3
+3,5.4,string
+5,6,6
diff --git a/src/mongo/gotools/mongoimport/testdata/test.tsv b/src/mongo/gotools/mongoimport/testdata/test.tsv
new file mode 100644
index 00000000000..a6d5298b40a
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test.tsv
@@ -0,0 +1,3 @@
+1 2 3
+3 4.6 5
+5 string 6
diff --git a/src/mongo/gotools/mongoimport/testdata/test_array.json b/src/mongo/gotools/mongoimport/testdata/test_array.json
new file mode 100644
index 00000000000..c4642157433
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_array.json
@@ -0,0 +1,31 @@
+[ {"a": 1.2, "b":"a", "c": 0.4} ,
+
+
+{"a": 2.4, "b":"string", "c": 52.9},
+{"a": 3, "b":"string", "c": 52},
+{"a": 4, "b":"string", "c": 52},
+{"a": 5, "b":"string", "c": 52},
+{"a": 6, "b":"string", "c": 52},
+{"a": 7, "b":"string", "c": 52} ,
+{"a": 8, "b":"string", "c": 52},
+{"a": 9, "b":"string", "c": 52},
+{"a": 10, "b":"string", "c": 52},
+{"a": 11, "b":"string", "c": 52},
+{"a": 12, "b":"string", "c": 52},
+{"a": 13, "b":"string", "c": 52},
+{"a": 14, "b":"string", "c": 52},
+{"a": 15, "b":"string", "c": 52},
+ {"a": 16, "b":"string", "c": 52},
+{"a": 17, "b":"string", "c": 52},
+{"a": 18, "b":"string", "c": 52},
+{"a": 29, "b":"string", "c": 52},
+{"a": 20, "b":"string", "c": 52},
+{"a": 21, "b":"string", "c": 52}
+
+ ,
+{"a": 22, "b":"string", "c": 52},
+{"a": 23, "b":"string", "c": 52},
+{"a": 24, "b":"string", "c": 52},
+{"a": 25, "b":"string", "c": 52},
+{"a": 25, "b":"string", "c": 52},
+{"a": 27, "b":"value", "c": 65}] \ No newline at end of file
diff --git a/src/mongo/gotools/mongoimport/testdata/test_bad.csv b/src/mongo/gotools/mongoimport/testdata/test_bad.csv
new file mode 100644
index 00000000000..c1d6aeeca88
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_bad.csv
@@ -0,0 +1,3 @@
+1,2,3
+3,5".4,string
+5,6,6
diff --git a/src/mongo/gotools/mongoimport/testdata/test_blanks.csv b/src/mongo/gotools/mongoimport/testdata/test_blanks.csv
new file mode 100644
index 00000000000..e94daca6d0d
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_blanks.csv
@@ -0,0 +1,3 @@
+1,2,
+5,,6e
+7,8,6 \ No newline at end of file
diff --git a/src/mongo/gotools/mongoimport/testdata/test_bom.csv b/src/mongo/gotools/mongoimport/testdata/test_bom.csv
new file mode 100644
index 00000000000..eef9b0a80c5
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_bom.csv
@@ -0,0 +1,2 @@
+1,2,3
+4,5,6
diff --git a/src/mongo/gotools/mongoimport/testdata/test_bom.json b/src/mongo/gotools/mongoimport/testdata/test_bom.json
new file mode 100644
index 00000000000..e813e78d234
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_bom.json
@@ -0,0 +1,2 @@
+{"a": 1, "b": 2, "c": 3}
+{"a": 4, "b": 5, "c": 6}
diff --git a/src/mongo/gotools/mongoimport/testdata/test_bom.tsv b/src/mongo/gotools/mongoimport/testdata/test_bom.tsv
new file mode 100644
index 00000000000..4c117a5ca88
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_bom.tsv
@@ -0,0 +1 @@
+1 2 3
diff --git a/src/mongo/gotools/mongoimport/testdata/test_duplicate.csv b/src/mongo/gotools/mongoimport/testdata/test_duplicate.csv
new file mode 100644
index 00000000000..137f668e25a
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_duplicate.csv
@@ -0,0 +1,5 @@
+1,2,3
+3,5.4,string
+5,6,6
+5,6,9
+8,6,6 \ No newline at end of file
diff --git a/src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt b/src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt
new file mode 100644
index 00000000000..90505050d51
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_fields_invalid.txt
@@ -0,0 +1,3 @@
+a
+$
+b
diff --git a/src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt b/src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt
new file mode 100644
index 00000000000..de980441c3a
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_fields_valid.txt
@@ -0,0 +1,3 @@
+a
+b
+c
diff --git a/src/mongo/gotools/mongoimport/testdata/test_plain.json b/src/mongo/gotools/mongoimport/testdata/test_plain.json
new file mode 100644
index 00000000000..ce158ad792b
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_plain.json
@@ -0,0 +1,3 @@
+{"a": 4, "b":"string value", "c": 1}
+{"a": 5, "b":"string value", "c": 2}
+{"a": 6, "b":"string value", "c": 3} \ No newline at end of file
diff --git a/src/mongo/gotools/mongoimport/testdata/test_plain2.json b/src/mongo/gotools/mongoimport/testdata/test_plain2.json
new file mode 100644
index 00000000000..84efc925a9d
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_plain2.json
@@ -0,0 +1,10 @@
+{"body": "'''Wei-chi''' may refer to:\n*The [[game of go]]\n*The [[Chinese word for \"crisis\"]]\n\n{{dab}}", "timestamp": {"$date": 1409836258000}, "page_id": 747205, "user": "TheQ Editor", "title": "Wei-chi"}
+{"body": "'''Frameset''' may refer to:\n\n* [[Framing (World Wide Web)]]\n* [[Bicycle frame]]\n\n{{disambig}}", "timestamp": {"$date": 1409836258000}, "page_id": 1450638, "user": "Kkj11210", "title": "Frameset"}
+{"body": "'''Poenit''' may refer to:\n*[[Poenitentiaria]] or Apostolic Penitentiary\n*[[Phut]]\n\n{{disambig}}", "timestamp": {"$date": 1409836258000}, "page_id": 379316, "user": "Omnipaedista", "title": "Poenit"}
+{"body": "In Malawi, '''Tonga''' may be:\n* [[Tonga people (Malawi)]]\n* [[Tonga language (Malawi)]]\n\n{{dab}}", "timestamp": {"$date": 1409836258000}, "page_id": 3750295, "user": "Kwamikagami", "title": "Tonga (Malawi)"}
+{"body": "'''Windows NT 6.0''' can refer to:\n*[[Windows Vista]]\n*[[Windows Server 2008]]\n\n{{disambiguation}}", "timestamp": {"$date": 1409836258000}, "page_id": 3875545, "user": "Codename Lisa", "title": "Windows NT 6.0"}
+{"body": "'''Poyen''' may refer to:\n*[[Poyen, Arkansas]], United States\n* [[Poyen, Kargil]], India\n\n{{geodis}}", "timestamp": {"$date": 1409836258000}, "page_id": 1889856, "user": "PamD", "title": "Poyen"}
+{"body": "'''Body check''' may refer to:\n*[[Checking (ice hockey)]]\n*[[Physical examination]]\n{{Disambiguation}}", "timestamp": {"$date": 1409836258000}, "page_id": 3555067, "user": "Bgheard", "title": "Body check"}
+{"body": "'''Yevtushenko''' may refer to:\n\n* [[Yevgeny Yevtushenko]]\n* [[Vadym Yevtushenko]]\n\n{{disambiguation}}", "timestamp": {"$date": 1409836258000}, "page_id": 4842284, "user": "Kkj11210", "title": "Yevtushenko"}
+{"body": "'''Tuks''' may refer to:\n*[[Tuks Senganga]], South African rapper\n* [[University of Pretoria]]\n\n{{dab}}", "timestamp": {"$date": 1409836258000}, "page_id": 490212, "user": "PamD", "title": "Tuks"}
+{"body": "'''Ethanedithiol''' may refer to:\n\n* [[1,1-Ethanedithiol]]\n* [[1,2-Ethanedithiol]]\n\n{{chemistry index}}", "timestamp": {"$date": 1409836258000}, "page_id": 4514054, "user": "Kkj11210", "title": "Ethanedithiol"} \ No newline at end of file
diff --git a/src/mongo/gotools/mongoimport/testdata/test_type.csv b/src/mongo/gotools/mongoimport/testdata/test_type.csv
new file mode 100644
index 00000000000..444321ee570
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/testdata/test_type.csv
@@ -0,0 +1,4 @@
+zip.string(),number.double()
+12345,20.2
+12345-1234,40.4
+23455,BLAH
diff --git a/src/mongo/gotools/mongoimport/tsv.go b/src/mongo/gotools/mongoimport/tsv.go
new file mode 100644
index 00000000000..09aadc67fdc
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/tsv.go
@@ -0,0 +1,163 @@
+package mongoimport
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "strings"
+
+ "gopkg.in/mgo.v2/bson"
+)
+
+const (
+ entryDelimiter = '\n'
+ tokenSeparator = "\t"
+)
+
+// TSVInputReader is a struct that implements the InputReader interface for a
+// TSV input source.
+type TSVInputReader struct {
+ // colSpecs is a list of column specifications in the BSON documents to be imported
+ colSpecs []ColumnSpec
+
+ // tsvReader is the underlying reader used to read data in from the TSV
+ // or TSV file
+ tsvReader *bufio.Reader
+
+ // tsvRejectWriter is where coercion-failed rows are written, if applicable
+ tsvRejectWriter io.Writer
+
+ // tsvRecord stores each line of input we read from the underlying reader
+ tsvRecord string
+
+ // numProcessed tracks the number of TSV records processed by the underlying reader
+ numProcessed uint64
+
+ // numDecoders is the number of concurrent goroutines to use for decoding
+ numDecoders int
+
+ // embedded sizeTracker exposes the Size() method to check the number of bytes read so far
+ sizeTracker
+
+ // ignoreBlanks is whether empty fields should be ignored
+ ignoreBlanks bool
+}
+
+// TSVConverter implements the Converter interface for TSV input.
+type TSVConverter struct {
+ colSpecs []ColumnSpec
+ data string
+ index uint64
+ ignoreBlanks bool
+ rejectWriter io.Writer
+}
+
+// NewTSVInputReader returns a TSVInputReader configured to read input from the
+// given io.Reader, extracting the specified columns only.
+func NewTSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, numDecoders int, ignoreBlanks bool) *TSVInputReader {
+ szCount := newSizeTrackingReader(newBomDiscardingReader(in))
+ return &TSVInputReader{
+ colSpecs: colSpecs,
+ tsvReader: bufio.NewReader(szCount),
+ tsvRejectWriter: rejects,
+ numProcessed: uint64(0),
+ numDecoders: numDecoders,
+ sizeTracker: szCount,
+ ignoreBlanks: ignoreBlanks,
+ }
+}
+
+// ReadAndValidateHeader reads the header from the underlying reader and validates
+// the header fields. It sets err if the read/validation fails.
+func (r *TSVInputReader) ReadAndValidateHeader() (err error) {
+ header, err := r.tsvReader.ReadString(entryDelimiter)
+ if err != nil {
+ return err
+ }
+ for _, field := range strings.Split(header, tokenSeparator) {
+ r.colSpecs = append(r.colSpecs, ColumnSpec{
+ Name: strings.TrimRight(field, "\r\n"),
+ Parser: new(FieldAutoParser),
+ })
+ }
+ return validateReaderFields(ColumnNames(r.colSpecs))
+}
+
+// ReadAndValidateTypedHeader reads the header from the underlying reader and validates
+// the header fields. It sets err if the read/validation fails.
+func (r *TSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) {
+ header, err := r.tsvReader.ReadString(entryDelimiter)
+ if err != nil {
+ return err
+ }
+ var headerFields []string
+ for _, field := range strings.Split(header, tokenSeparator) {
+ headerFields = append(headerFields, strings.TrimRight(field, "\r\n"))
+ }
+ r.colSpecs, err = ParseTypedHeaders(headerFields, parseGrace)
+ if err != nil {
+ return err
+ }
+ return validateReaderFields(ColumnNames(r.colSpecs))
+}
+
+// StreamDocument takes a boolean indicating if the documents should be streamed
+// in read order and a channel on which to stream the documents processed from
+// the underlying reader. Returns a non-nil error if streaming fails.
+func (r *TSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (retErr error) {
+ tsvRecordChan := make(chan Converter, r.numDecoders)
+ tsvErrChan := make(chan error)
+
+ // begin reading from source
+ go func() {
+ var err error
+ for {
+ r.tsvRecord, err = r.tsvReader.ReadString(entryDelimiter)
+ if err != nil {
+ close(tsvRecordChan)
+ if err == io.EOF {
+ tsvErrChan <- nil
+ } else {
+ r.numProcessed++
+ tsvErrChan <- fmt.Errorf("read error on entry #%v: %v", r.numProcessed, err)
+ }
+ return
+ }
+ tsvRecordChan <- TSVConverter{
+ colSpecs: r.colSpecs,
+ data: r.tsvRecord,
+ index: r.numProcessed,
+ ignoreBlanks: r.ignoreBlanks,
+ rejectWriter: r.tsvRejectWriter,
+ }
+ r.numProcessed++
+ }
+ }()
+
+ // begin processing read bytes
+ go func() {
+ tsvErrChan <- streamDocuments(ordered, r.numDecoders, tsvRecordChan, readDocs)
+ }()
+
+ return channelQuorumError(tsvErrChan, 2)
+}
+
+// Convert implements the Converter interface for TSV input. It converts a
+// TSVConverter struct to a BSON document.
+func (c TSVConverter) Convert() (b bson.D, err error) {
+ b, err = tokensToBSON(
+ c.colSpecs,
+ strings.Split(strings.TrimRight(c.data, "\r\n"), tokenSeparator),
+ c.index,
+ c.ignoreBlanks,
+ )
+ if _, ok := err.(coercionError); ok {
+ c.Print()
+ err = nil
+ }
+ return
+}
+
+func (c TSVConverter) Print() {
+ c.rejectWriter.Write([]byte(c.data + "\n"))
+}
diff --git a/src/mongo/gotools/mongoimport/tsv_test.go b/src/mongo/gotools/mongoimport/tsv_test.go
new file mode 100644
index 00000000000..7ea33248f5a
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/tsv_test.go
@@ -0,0 +1,232 @@
+package mongoimport
+
+import (
+ "bytes"
+ "os"
+ "testing"
+
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+)
+
+func TestTSVStreamDocument(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a TSV input reader", t, func() {
+ Convey("integer valued strings should be converted tsv1", func() {
+ contents := "1\t2\t3e\n"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", "3e"},
+ }
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("valid TSV input file that starts with the UTF-8 BOM should "+
+ "not raise an error", func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }
+ fileHandle, err := os.Open("testdata/test_bom.tsv")
+ So(err, ShouldBeNil)
+ r := NewTSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false)
+ docChan := make(chan bson.D, 2)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("integer valued strings should be converted tsv2", func() {
+ contents := "a\tb\t\"cccc,cccc\"\td\n"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", "a"},
+ {"b", "b"},
+ {"c", `"cccc,cccc"`},
+ {"field3", "d"},
+ }
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("extra columns should be prefixed with 'field'", func() {
+ contents := "1\t2\t3e\t may\n"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", "3e"},
+ {"field3", " may"},
+ }
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("mixed values should be parsed correctly", func() {
+ contents := "12\t13.3\tInline\t14\n"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"d", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedRead := bson.D{
+ {"a", int32(12)},
+ {"b", 13.3},
+ {"c", "Inline"},
+ {"d", int32(14)},
+ }
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 1)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedRead)
+ })
+
+ Convey("calling StreamDocument() in succession for TSVs should "+
+ "return the correct next set of values", func() {
+ contents := "1\t2\t3\n4\t5\t6\n"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedReads := []bson.D{
+ {
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }, {
+ {"a", int32(4)},
+ {"b", int32(5)},
+ {"c", int32(6)},
+ },
+ }
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, len(expectedReads))
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ for i := 0; i < len(expectedReads); i++ {
+ for j, readDocument := range <-docChan {
+ So(readDocument.Name, ShouldEqual, expectedReads[i][j].Name)
+ So(readDocument.Value, ShouldEqual, expectedReads[i][j].Value)
+ }
+ }
+ })
+
+ Convey("calling StreamDocument() in succession for TSVs that contain "+
+ "quotes should return the correct next set of values", func() {
+ contents := "1\t2\t3\n4\t\"\t6\n"
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedReadOne := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }
+ expectedReadTwo := bson.D{
+ {"a", int32(4)},
+ {"b", `"`},
+ {"c", int32(6)},
+ }
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ docChan := make(chan bson.D, 2)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedReadOne)
+ So(<-docChan, ShouldResemble, expectedReadTwo)
+ })
+
+ Convey("plain TSV input file sources should be parsed correctly and "+
+ "subsequent imports should parse correctly",
+ func() {
+ colSpecs := []ColumnSpec{
+ {"a", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"b", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"c", new(FieldAutoParser), pgAutoCast, "auto"},
+ }
+ expectedReadOne := bson.D{
+ {"a", int32(1)},
+ {"b", int32(2)},
+ {"c", int32(3)},
+ }
+ expectedReadTwo := bson.D{
+ {"a", int32(3)},
+ {"b", 4.6},
+ {"c", int32(5)},
+ }
+ fileHandle, err := os.Open("testdata/test.tsv")
+ So(err, ShouldBeNil)
+ r := NewTSVInputReader(colSpecs, fileHandle, os.Stdout, 1, false)
+ docChan := make(chan bson.D, 50)
+ So(r.StreamDocument(true, docChan), ShouldBeNil)
+ So(<-docChan, ShouldResemble, expectedReadOne)
+ So(<-docChan, ShouldResemble, expectedReadTwo)
+ })
+ })
+}
+
+func TestTSVReadAndValidateHeader(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a TSV input reader", t, func() {
+ Convey("setting the header should read the first line of the TSV", func() {
+ contents := "extraHeader1\textraHeader2\textraHeader3\n"
+ colSpecs := []ColumnSpec{}
+ r := NewTSVInputReader(colSpecs, bytes.NewReader([]byte(contents)), os.Stdout, 1, false)
+ So(r.ReadAndValidateHeader(), ShouldBeNil)
+ So(len(r.colSpecs), ShouldEqual, 3)
+ })
+ })
+}
+
+func TestTSVConvert(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+ Convey("With a TSV input reader", t, func() {
+ Convey("calling convert on a TSVConverter should return the expected BSON document", func() {
+ tsvConverter := TSVConverter{
+ colSpecs: []ColumnSpec{
+ {"field1", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field2", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"field3", new(FieldAutoParser), pgAutoCast, "auto"},
+ },
+ data: "a\tb\tc",
+ index: uint64(0),
+ }
+ expectedDocument := bson.D{
+ {"field1", "a"},
+ {"field2", "b"},
+ {"field3", "c"},
+ }
+ document, err := tsvConverter.Convert()
+ So(err, ShouldBeNil)
+ So(document, ShouldResemble, expectedDocument)
+ })
+ })
+}
diff --git a/src/mongo/gotools/mongoimport/typed_fields.go b/src/mongo/gotools/mongoimport/typed_fields.go
new file mode 100644
index 00000000000..aa1bb1c0e0a
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/typed_fields.go
@@ -0,0 +1,284 @@
+package mongoimport
+
+import (
+ "encoding/base32"
+ "encoding/base64"
+ "encoding/hex"
+ "fmt"
+ "math"
+ "regexp"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/mongodb/mongo-tools/mongoimport/dateconv"
+ "gopkg.in/mgo.v2/bson"
+)
+
+// columnType defines different types for columns that can be parsed distinctly
+type columnType int
+
+const (
+ ctAuto columnType = iota
+ ctBinary
+ ctBoolean
+ ctDate
+ ctDateGo
+ ctDateMS
+ ctDateOracle
+ ctDouble
+ ctInt32
+ ctInt64
+ ctDecimal
+ ctString
+)
+
+var (
+ columnTypeRE = regexp.MustCompile(`(?s)^(.*)\.(\w+)\((.*)\)$`)
+ columnTypeNameMap = map[string]columnType{
+ "auto": ctAuto,
+ "binary": ctBinary,
+ "boolean": ctBoolean,
+ "date": ctDate,
+ "decimal": ctDecimal,
+ "date_go": ctDateGo,
+ "date_ms": ctDateMS,
+ "date_oracle": ctDateOracle,
+ "double": ctDouble,
+ "int32": ctInt32,
+ "int64": ctInt64,
+ "string": ctString,
+ }
+)
+
+type binaryEncoding int
+
+const (
+ beBase64 binaryEncoding = iota
+ beBase32
+ beHex
+)
+
+var binaryEncodingNameMap = map[string]binaryEncoding{
+ "base64": beBase64,
+ "base32": beBase32,
+ "hex": beHex,
+}
+
+// ColumnSpec keeps information for each 'column' of import.
+type ColumnSpec struct {
+ Name string
+ Parser FieldParser
+ ParseGrace ParseGrace
+ TypeName string
+}
+
+// ColumnNames maps a ColumnSpec slice to their associated names
+func ColumnNames(fs []ColumnSpec) (s []string) {
+ for _, f := range fs {
+ s = append(s, f.Name)
+ }
+ return
+}
+
+// ParseTypedHeader produces a ColumnSpec from a header item, extracting type
+// information from the it. The parseGrace is passed along to the new ColumnSpec.
+func ParseTypedHeader(header string, parseGrace ParseGrace) (f ColumnSpec, err error) {
+ match := columnTypeRE.FindStringSubmatch(header)
+ if len(match) != 4 {
+ err = fmt.Errorf("could not parse type from header %s", header)
+ return
+ }
+ t, ok := columnTypeNameMap[match[2]]
+ if !ok {
+ err = fmt.Errorf("invalid type %s in header %s", match[2], header)
+ return
+ }
+ p, err := NewFieldParser(t, match[3])
+ if err != nil {
+ return
+ }
+ return ColumnSpec{match[1], p, parseGrace, match[2]}, nil
+}
+
+// ParseTypedHeaders performs ParseTypedHeader on each item, returning an
+// error if any single one fails.
+func ParseTypedHeaders(headers []string, parseGrace ParseGrace) (fs []ColumnSpec, err error) {
+ fs = make([]ColumnSpec, len(headers))
+ for i, f := range headers {
+ fs[i], err = ParseTypedHeader(f, parseGrace)
+ if err != nil {
+ return
+ }
+ }
+ return
+}
+
+// ParseAutoHeaders converts a list of header items to ColumnSpec objects, with
+// automatic parsers.
+func ParseAutoHeaders(headers []string) (fs []ColumnSpec) {
+ fs = make([]ColumnSpec, len(headers))
+ for i, f := range headers {
+ fs[i] = ColumnSpec{f, new(FieldAutoParser), pgAutoCast, "auto"}
+ }
+ return
+}
+
+// FieldParser is the interface for any parser of a field item.
+type FieldParser interface {
+ Parse(in string) (interface{}, error)
+}
+
+var (
+ escapeReplacements = []string{
+ `\\`, `\`,
+ `\(`, "(",
+ `\)`, ")",
+ `\`, "",
+ }
+ escapeReplacer = strings.NewReplacer(escapeReplacements...)
+)
+
+// NewFieldParser yields a FieldParser corresponding to the given columnType.
+// arg is passed along to the specific type's parser, if it permits an
+// argument. An error will be raised if arg is not valid for the type's
+// parser.
+func NewFieldParser(t columnType, arg string) (parser FieldParser, err error) {
+ arg = escapeReplacer.Replace(arg)
+
+ switch t { // validate argument
+ case ctBinary:
+ case ctDate:
+ case ctDateGo:
+ case ctDateMS:
+ case ctDateOracle:
+ default:
+ if arg != "" {
+ err = fmt.Errorf("type %v does not support arguments", t)
+ return
+ }
+ }
+
+ switch t {
+ case ctBinary:
+ parser, err = NewFieldBinaryParser(arg)
+ case ctBoolean:
+ parser = new(FieldBooleanParser)
+ case ctDate:
+ fallthrough
+ case ctDateGo:
+ parser = &FieldDateParser{arg}
+ case ctDateMS:
+ parser = &FieldDateParser{dateconv.FromMS(arg)}
+ case ctDateOracle:
+ parser = &FieldDateParser{dateconv.FromOracle(arg)}
+ case ctDouble:
+ parser = new(FieldDoubleParser)
+ case ctInt32:
+ parser = new(FieldInt32Parser)
+ case ctInt64:
+ parser = new(FieldInt64Parser)
+ case ctDecimal:
+ parser = new(FieldDecimalParser)
+ case ctString:
+ parser = new(FieldStringParser)
+ default: // ctAuto
+ parser = new(FieldAutoParser)
+ }
+ return
+}
+
+func autoParse(in string) interface{} {
+ parsedInt, err := strconv.ParseInt(in, 10, 64)
+ if err == nil {
+ if math.MinInt32 <= parsedInt && parsedInt <= math.MaxInt32 {
+ return int32(parsedInt)
+ }
+ return parsedInt
+ }
+ parsedFloat, err := strconv.ParseFloat(in, 64)
+ if err == nil {
+ return parsedFloat
+ }
+ return in
+}
+
+type FieldAutoParser struct{}
+
+func (ap *FieldAutoParser) Parse(in string) (interface{}, error) {
+ return autoParse(in), nil
+}
+
+type FieldBinaryParser struct {
+ enc binaryEncoding
+}
+
+func (bp *FieldBinaryParser) Parse(in string) (interface{}, error) {
+ switch bp.enc {
+ case beBase32:
+ return base32.StdEncoding.DecodeString(in)
+ case beBase64:
+ return base64.StdEncoding.DecodeString(in)
+ default: // beHex
+ return hex.DecodeString(in)
+ }
+}
+
+func NewFieldBinaryParser(arg string) (*FieldBinaryParser, error) {
+ enc, ok := binaryEncodingNameMap[arg]
+ if !ok {
+ return nil, fmt.Errorf("invalid binary encoding: %s", arg)
+ }
+ return &FieldBinaryParser{enc}, nil
+}
+
+type FieldBooleanParser struct{}
+
+func (bp *FieldBooleanParser) Parse(in string) (interface{}, error) {
+ if strings.ToLower(in) == "true" || in == "1" {
+ return true, nil
+ }
+ if strings.ToLower(in) == "false" || in == "0" {
+ return false, nil
+ }
+ return nil, fmt.Errorf("failed to parse boolean: %s", in)
+}
+
+type FieldDateParser struct {
+ layout string
+}
+
+func (dp *FieldDateParser) Parse(in string) (interface{}, error) {
+ return time.Parse(dp.layout, in)
+}
+
+type FieldDoubleParser struct{}
+
+func (dp *FieldDoubleParser) Parse(in string) (interface{}, error) {
+ return strconv.ParseFloat(in, 64)
+}
+
+type FieldInt32Parser struct{}
+
+func (ip *FieldInt32Parser) Parse(in string) (interface{}, error) {
+ value, err := strconv.ParseInt(in, 10, 32)
+ return int32(value), err
+}
+
+type FieldInt64Parser struct{}
+
+func (ip *FieldInt64Parser) Parse(in string) (interface{}, error) {
+ return strconv.ParseInt(in, 10, 64)
+}
+
+type FieldDecimalParser struct{}
+
+func (ip *FieldDecimalParser) Parse(in string) (interface{}, error) {
+ return bson.ParseDecimal128(in)
+}
+
+type FieldStringParser struct{}
+
+func (sp *FieldStringParser) Parse(in string) (interface{}, error) {
+ return in, nil
+}
diff --git a/src/mongo/gotools/mongoimport/typed_fields_test.go b/src/mongo/gotools/mongoimport/typed_fields_test.go
new file mode 100644
index 00000000000..073710c5762
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/typed_fields_test.go
@@ -0,0 +1,407 @@
+package mongoimport
+
+import (
+ "github.com/mongodb/mongo-tools/common/log"
+ "github.com/mongodb/mongo-tools/common/options"
+ "github.com/mongodb/mongo-tools/common/testutil"
+ . "github.com/smartystreets/goconvey/convey"
+ "gopkg.in/mgo.v2/bson"
+ "testing"
+ "time"
+)
+
+func init() {
+ log.SetVerbosity(&options.Verbosity{
+ VLevel: 4,
+ })
+}
+
+func TestTypedHeaderParser(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Using 'zip.string(),number.double(),foo.auto()'", t, func() {
+ var headers = []string{"zip.string()", "number.double()", "foo.auto()", `bar.date(January 2\, \(2006\))`}
+ var colSpecs []ColumnSpec
+ var err error
+
+ Convey("with parse grace: auto", func() {
+ colSpecs, err = ParseTypedHeaders(headers, pgAutoCast)
+ So(colSpecs, ShouldResemble, []ColumnSpec{
+ {"zip", new(FieldStringParser), pgAutoCast, "string"},
+ {"number", new(FieldDoubleParser), pgAutoCast, "double"},
+ {"foo", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"bar", &FieldDateParser{"January 2, (2006)"}, pgAutoCast, "date"},
+ })
+ So(err, ShouldBeNil)
+ })
+ Convey("with parse grace: skipRow", func() {
+ colSpecs, err = ParseTypedHeaders(headers, pgSkipRow)
+ So(colSpecs, ShouldResemble, []ColumnSpec{
+ {"zip", new(FieldStringParser), pgSkipRow, "string"},
+ {"number", new(FieldDoubleParser), pgSkipRow, "double"},
+ {"foo", new(FieldAutoParser), pgSkipRow, "auto"},
+ {"bar", &FieldDateParser{"January 2, (2006)"}, pgSkipRow, "date"},
+ })
+ So(err, ShouldBeNil)
+ })
+ })
+
+ Convey("Using various bad headers", t, func() {
+ var err error
+
+ Convey("with non-empty arguments for types that don't want them", func() {
+ _, err = ParseTypedHeader("zip.string(blah)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.string(0)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.int32(0)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.int64(0)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.double(0)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.auto(0)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ })
+ Convey("with bad arguments for the binary type", func() {
+ _, err = ParseTypedHeader("zip.binary(blah)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.binary(binary)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ _, err = ParseTypedHeader("zip.binary(decimal)", pgAutoCast)
+ So(err, ShouldNotBeNil)
+ })
+ })
+}
+
+func TestAutoHeaderParser(t *testing.T) {
+ Convey("Using 'zip,number'", t, func() {
+ var headers = []string{"zip", "number", "foo"}
+ var colSpecs = ParseAutoHeaders(headers)
+ So(colSpecs, ShouldResemble, []ColumnSpec{
+ {"zip", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"number", new(FieldAutoParser), pgAutoCast, "auto"},
+ {"foo", new(FieldAutoParser), pgAutoCast, "auto"},
+ })
+ })
+}
+
+func TestFieldParsers(t *testing.T) {
+ testutil.VerifyTestType(t, testutil.UnitTestType)
+
+ Convey("Using FieldAutoParser", t, func() {
+ var p, _ = NewFieldParser(ctAuto, "")
+ var value interface{}
+ var err error
+
+ Convey("parses integers when it can", func() {
+ value, err = p.Parse("2147483648")
+ So(value.(int64), ShouldEqual, int64(2147483648))
+ So(err, ShouldBeNil)
+ value, err = p.Parse("42")
+ So(value.(int32), ShouldEqual, 42)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-2147483649")
+ So(value.(int64), ShouldEqual, int64(-2147483649))
+ })
+ Convey("parses decimals when it can", func() {
+ value, err = p.Parse("3.14159265")
+ So(value.(float64), ShouldEqual, 3.14159265)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("0.123123")
+ So(value.(float64), ShouldEqual, 0.123123)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-123456.789")
+ So(value.(float64), ShouldEqual, -123456.789)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-1.")
+ So(value.(float64), ShouldEqual, -1.0)
+ So(err, ShouldBeNil)
+ })
+ Convey("leaves everything else as a string", func() {
+ value, err = p.Parse("12345-6789")
+ So(value.(string), ShouldEqual, "12345-6789")
+ So(err, ShouldBeNil)
+ value, err = p.Parse("06/02/1997")
+ So(value.(string), ShouldEqual, "06/02/1997")
+ So(err, ShouldBeNil)
+ value, err = p.Parse("")
+ So(value.(string), ShouldEqual, "")
+ So(err, ShouldBeNil)
+ })
+ })
+
+ Convey("Using FieldBooleanParser", t, func() {
+ var p, _ = NewFieldParser(ctBoolean, "")
+ var value interface{}
+ var err error
+
+ Convey("parses representations of true correctly", func() {
+ value, err = p.Parse("true")
+ So(value.(bool), ShouldBeTrue)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("TrUe")
+ So(value.(bool), ShouldBeTrue)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("1")
+ So(value.(bool), ShouldBeTrue)
+ So(err, ShouldBeNil)
+ })
+ Convey("parses representations of false correctly", func() {
+ value, err = p.Parse("false")
+ So(value.(bool), ShouldBeFalse)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("FaLsE")
+ So(value.(bool), ShouldBeFalse)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("0")
+ So(value.(bool), ShouldBeFalse)
+ So(err, ShouldBeNil)
+ })
+ Convey("does not parse other boolean representations", func() {
+ _, err = p.Parse("")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("t")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("f")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("yes")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("no")
+ So(err, ShouldNotBeNil)
+ })
+ })
+
+ Convey("Using FieldBinaryParser", t, func() {
+ var value interface{}
+ var err error
+
+ Convey("using hex encoding", func() {
+ var p, _ = NewFieldParser(ctBinary, "hex")
+ Convey("parses valid hex values correctly", func() {
+ value, err = p.Parse("400a11")
+ So(value.([]byte), ShouldResemble, []byte{64, 10, 17})
+ So(err, ShouldBeNil)
+ value, err = p.Parse("400A11")
+ So(value.([]byte), ShouldResemble, []byte{64, 10, 17})
+ So(err, ShouldBeNil)
+ value, err = p.Parse("0b400A11")
+ So(value.([]byte), ShouldResemble, []byte{11, 64, 10, 17})
+ So(err, ShouldBeNil)
+ value, err = p.Parse("")
+ So(value.([]byte), ShouldResemble, []byte{})
+ So(err, ShouldBeNil)
+ })
+ })
+ Convey("using base32 encoding", func() {
+ var p, _ = NewFieldParser(ctBinary, "base32")
+ Convey("parses valid base32 values correctly", func() {
+ value, err = p.Parse("")
+ So(value.([]uint8), ShouldResemble, []uint8{})
+ So(err, ShouldBeNil)
+ value, err = p.Parse("MZXW6YTBOI======")
+ So(value.([]uint8), ShouldResemble, []uint8{102, 111, 111, 98, 97, 114})
+ So(err, ShouldBeNil)
+ })
+ })
+ Convey("using base64 encoding", func() {
+ var p, _ = NewFieldParser(ctBinary, "base64")
+ Convey("parses valid base64 values correctly", func() {
+ value, err = p.Parse("")
+ So(value.([]uint8), ShouldResemble, []uint8{})
+ So(err, ShouldBeNil)
+ value, err = p.Parse("Zm9vYmFy")
+ So(value.([]uint8), ShouldResemble, []uint8{102, 111, 111, 98, 97, 114})
+ So(err, ShouldBeNil)
+ })
+ })
+ })
+
+ Convey("Using FieldDateParser", t, func() {
+ var value interface{}
+ var err error
+
+ Convey("with Go's format", func() {
+ var p, _ = NewFieldParser(ctDateGo, "01/02/2006 3:04:05pm MST")
+ Convey("parses valid timestamps correctly", func() {
+ value, err = p.Parse("01/04/2000 5:38:10pm UTC")
+ So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC))
+ So(err, ShouldBeNil)
+ })
+ Convey("does not parse invalid dates", func() {
+ _, err = p.Parse("01/04/2000 5:38:10pm")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000 5:38:10 pm UTC")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000")
+ So(err, ShouldNotBeNil)
+ })
+ })
+ Convey("with MS's format", func() {
+ var p, _ = NewFieldParser(ctDateMS, "MM/dd/yyyy h:mm:sstt")
+ Convey("parses valid timestamps correctly", func() {
+ value, err = p.Parse("01/04/2000 5:38:10PM")
+ So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC))
+ So(err, ShouldBeNil)
+ })
+ Convey("does not parse invalid dates", func() {
+ _, err = p.Parse("01/04/2000 :) 05:38:10PM")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000 005:38:10PM")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000 5:38:10 PM")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000")
+ So(err, ShouldNotBeNil)
+ })
+ })
+ Convey("with Oracle's format", func() {
+ var p, _ = NewFieldParser(ctDateOracle, "mm/Dd/yYYy hh:MI:SsAm")
+ Convey("parses valid timestamps correctly", func() {
+ value, err = p.Parse("01/04/2000 05:38:10PM")
+ So(value.(time.Time), ShouldResemble, time.Date(2000, 1, 4, 17, 38, 10, 0, time.UTC))
+ So(err, ShouldBeNil)
+ })
+ Convey("does not parse invalid dates", func() {
+ _, err = p.Parse("01/04/2000 :) 05:38:10PM")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000 005:38:10PM")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000 5:38:10 PM")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("01/04/2000")
+ So(err, ShouldNotBeNil)
+ })
+ })
+ })
+
+ Convey("Using FieldDoubleParser", t, func() {
+ var p, _ = NewFieldParser(ctDouble, "")
+ var value interface{}
+ var err error
+
+ Convey("parses valid decimal values correctly", func() {
+ value, err = p.Parse("3.14159265")
+ So(value.(float64), ShouldEqual, 3.14159265)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("0.123123")
+ So(value.(float64), ShouldEqual, 0.123123)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-123456.789")
+ So(value.(float64), ShouldEqual, -123456.789)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-1.")
+ So(value.(float64), ShouldEqual, -1.0)
+ So(err, ShouldBeNil)
+ })
+ Convey("does not parse invalid numbers", func() {
+ _, err = p.Parse("")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("1.1.1")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("1-2.0")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("80-")
+ So(err, ShouldNotBeNil)
+ })
+ })
+
+ Convey("Using FieldInt32Parser", t, func() {
+ var p, _ = NewFieldParser(ctInt32, "")
+ var value interface{}
+ var err error
+
+ Convey("parses valid integer values correctly", func() {
+ value, err = p.Parse("2147483647")
+ So(value.(int32), ShouldEqual, 2147483647)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("42")
+ So(value.(int32), ShouldEqual, 42)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-2147483648")
+ So(value.(int32), ShouldEqual, -2147483648)
+ })
+ Convey("does not parse invalid numbers", func() {
+ _, err = p.Parse("")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("42.0")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("1-2")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("80-")
+ So(err, ShouldNotBeNil)
+ value, err = p.Parse("2147483648")
+ So(err, ShouldNotBeNil)
+ value, err = p.Parse("-2147483649")
+ So(err, ShouldNotBeNil)
+ })
+ })
+
+ Convey("Using FieldInt64Parser", t, func() {
+ var p, _ = NewFieldParser(ctInt64, "")
+ var value interface{}
+ var err error
+
+ Convey("parses valid integer values correctly", func() {
+ value, err = p.Parse("2147483648")
+ So(value.(int64), ShouldEqual, int64(2147483648))
+ So(err, ShouldBeNil)
+ value, err = p.Parse("42")
+ So(value.(int64), ShouldEqual, 42)
+ So(err, ShouldBeNil)
+ value, err = p.Parse("-2147483649")
+ So(value.(int64), ShouldEqual, int64(-2147483649))
+ })
+ Convey("does not parse invalid numbers", func() {
+ _, err = p.Parse("")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("42.0")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("1-2")
+ So(err, ShouldNotBeNil)
+ _, err = p.Parse("80-")
+ So(err, ShouldNotBeNil)
+ })
+ })
+
+ Convey("Using FieldDecimalParser", t, func() {
+ var p, _ = NewFieldParser(ctDecimal, "")
+ var err error
+
+ Convey("parses valid decimal values correctly", func() {
+ for _, ts := range []string{"12235.2355", "42", "0", "-124", "-124.55"} {
+ testVal, err := bson.ParseDecimal128(ts)
+ So(err, ShouldBeNil)
+ parsedValue, err := p.Parse(ts)
+ So(err, ShouldBeNil)
+
+ So(testVal, ShouldResemble, parsedValue.(bson.Decimal128))
+ }
+ })
+ Convey("does not parse invalid decimal values", func() {
+ for _, ts := range []string{"", "1-2", "abcd"} {
+ _, err = p.Parse(ts)
+ So(err, ShouldNotBeNil)
+ }
+ })
+ })
+
+ Convey("Using FieldStringParser", t, func() {
+ var p, _ = NewFieldParser(ctString, "")
+ var value interface{}
+ var err error
+
+ Convey("parses strings as strings only", func() {
+ value, err = p.Parse("42")
+ So(value.(string), ShouldEqual, "42")
+ So(err, ShouldBeNil)
+ value, err = p.Parse("true")
+ So(value.(string), ShouldEqual, "true")
+ So(err, ShouldBeNil)
+ value, err = p.Parse("")
+ So(value.(string), ShouldEqual, "")
+ So(err, ShouldBeNil)
+ })
+ })
+
+}