summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/mongoimport/json.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/gotools/mongoimport/json.go')
-rw-r--r--src/mongo/gotools/mongoimport/json.go239
1 files changed, 239 insertions, 0 deletions
diff --git a/src/mongo/gotools/mongoimport/json.go b/src/mongo/gotools/mongoimport/json.go
new file mode 100644
index 00000000000..caa0dce3121
--- /dev/null
+++ b/src/mongo/gotools/mongoimport/json.go
@@ -0,0 +1,239 @@
+package mongoimport
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/mongodb/mongo-tools/common/bsonutil"
+ "github.com/mongodb/mongo-tools/common/json"
+ "github.com/mongodb/mongo-tools/common/log"
+ "gopkg.in/mgo.v2/bson"
+)
+
+// JSONInputReader is an implementation of InputReader that reads documents
+// in JSON format.
+type JSONInputReader struct {
+ // isArray indicates if the JSON import is an array of JSON documents
+ // or not
+ isArray bool
+
+ // decoder is used to read the next valid JSON documents from the input source
+ decoder *json.Decoder
+
+ // numProcessed indicates the number of JSON documents processed
+ numProcessed uint64
+
+ // readOpeningBracket indicates if the underlying io.Reader has consumed
+ // an opening bracket from the input source. Used to prevent errors when
+ // a JSON input source contains just '[]'
+ readOpeningBracket bool
+
+ // expectedByte is used to store the next expected valid character for JSON
+ // array imports
+ expectedByte byte
+
+ // bytesFromReader is used to store the next byte read from the Reader for
+ // JSON array imports
+ bytesFromReader []byte
+
+ // separatorReader is used for JSON arrays to look for a valid array
+ // separator. It is a reader consisting of the decoder's buffer and the
+ // underlying reader
+ separatorReader io.Reader
+
+ // embedded sizeTracker exposes the Size() method to check the number of bytes read so far
+ sizeTracker
+
+ // numDecoders is the number of concurrent goroutines to use for decoding
+ numDecoders int
+}
+
+// JSONConverter implements the Converter interface for JSON input.
+type JSONConverter struct {
+ data []byte
+ index uint64
+}
+
+var (
+ // ErrNoOpeningBracket means that the input source did not contain any
+ // opening brace - returned only if --jsonArray is passed in.
+ ErrNoOpeningBracket = errors.New("bad JSON array format - found no " +
+ "opening bracket '[' in input source")
+
+ // ErrNoClosingBracket means that the input source did not contain any
+ // closing brace - returned only if --jsonArray is passed in.
+ ErrNoClosingBracket = errors.New("bad JSON array format - found no " +
+ "closing bracket ']' in input source")
+)
+
+// NewJSONInputReader creates a new JSONInputReader in array mode if specified,
+// configured to read data to the given io.Reader.
+func NewJSONInputReader(isArray bool, in io.Reader, numDecoders int) *JSONInputReader {
+ szCount := newSizeTrackingReader(newBomDiscardingReader(in))
+ return &JSONInputReader{
+ isArray: isArray,
+ sizeTracker: szCount,
+ decoder: json.NewDecoder(szCount),
+ readOpeningBracket: false,
+ bytesFromReader: make([]byte, 1),
+ numDecoders: numDecoders,
+ }
+}
+
+// ReadAndValidateHeader is a no-op for JSON imports; always returns nil.
+func (r *JSONInputReader) ReadAndValidateHeader() error {
+ return nil
+}
+
+// ReadAndValidateTypedHeader is a no-op for JSON imports; always returns nil.
+func (r *JSONInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) error {
+ return nil
+}
+
+// StreamDocument takes a boolean indicating if the documents should be streamed
+// in read order and a channel on which to stream the documents processed from
+// the underlying reader. Returns a non-nil error if encountered
+func (r *JSONInputReader) StreamDocument(ordered bool, readChan chan bson.D) (retErr error) {
+ rawChan := make(chan Converter, r.numDecoders)
+ jsonErrChan := make(chan error)
+
+ // begin reading from source
+ go func() {
+ var err error
+ for {
+ if r.isArray {
+ if err = r.readJSONArraySeparator(); err != nil {
+ close(rawChan)
+ if err == io.EOF {
+ jsonErrChan <- nil
+ } else {
+ r.numProcessed++
+ jsonErrChan <- fmt.Errorf("error reading separator after document #%v: %v", r.numProcessed, err)
+ }
+ return
+ }
+ }
+ rawBytes, err := r.decoder.ScanObject()
+ if err != nil {
+ close(rawChan)
+ if err == io.EOF {
+ jsonErrChan <- nil
+ } else {
+ r.numProcessed++
+ jsonErrChan <- fmt.Errorf("error processing document #%v: %v", r.numProcessed, err)
+ }
+ return
+ }
+ rawChan <- JSONConverter{
+ data: rawBytes,
+ index: r.numProcessed,
+ }
+ r.numProcessed++
+ }
+ }()
+
+ // begin processing read bytes
+ go func() {
+ jsonErrChan <- streamDocuments(ordered, r.numDecoders, rawChan, readChan)
+ }()
+
+ return channelQuorumError(jsonErrChan, 2)
+}
+
+// Convert implements the Converter interface for JSON input. It converts a
+// JSONConverter struct to a BSON document.
+func (c JSONConverter) Convert() (bson.D, error) {
+ document, err := json.UnmarshalBsonD(c.data)
+ if err != nil {
+ return nil, fmt.Errorf("error unmarshaling bytes on document #%v: %v", c.index, err)
+ }
+ log.Logvf(log.DebugHigh, "got line: %v", document)
+
+ bsonD, err := bsonutil.GetExtendedBsonD(document)
+ if err != nil {
+ return nil, fmt.Errorf("error getting extended BSON for document #%v: %v", c.index, err)
+ }
+ log.Logvf(log.DebugHigh, "got extended line: %#v", bsonD)
+ return bsonD, nil
+}
+
+// readJSONArraySeparator is a helper method used to process JSON arrays. It is
+// used to read any of the valid separators for a JSON array and flag invalid
+// characters.
+//
+// It will read a byte at a time until it finds an expected character after
+// which it returns control to the caller.
+//
+// It will also return immediately if it finds any error (including EOF). If it
+// reads a JSON_ARRAY_END byte, as a validity check it will continue to scan the
+// input source until it hits an error (including EOF) to ensure the entire
+// input source content is a valid JSON array
+func (r *JSONInputReader) readJSONArraySeparator() error {
+ r.expectedByte = json.ArraySep
+ if r.numProcessed == 0 {
+ r.expectedByte = json.ArrayStart
+ }
+
+ var readByte byte
+ scanp := 0
+
+ separatorReader := io.MultiReader(
+ r.decoder.Buffered(),
+ r.decoder.R,
+ )
+ for readByte != r.expectedByte {
+ n, err := separatorReader.Read(r.bytesFromReader)
+ scanp += n
+ if n == 0 || err != nil {
+ if err == io.EOF {
+ return ErrNoClosingBracket
+ }
+ return err
+ }
+ readByte = r.bytesFromReader[0]
+
+ if readByte == json.ArrayEnd {
+ // if we read the end of the JSON array, ensure we have no other
+ // non-whitespace characters at the end of the array
+ for {
+ _, err = separatorReader.Read(r.bytesFromReader)
+ if err != nil {
+ // takes care of the '[]' case
+ if !r.readOpeningBracket {
+ return ErrNoOpeningBracket
+ }
+ return err
+ }
+ readString := string(r.bytesFromReader[0])
+ if strings.TrimSpace(readString) != "" {
+ return fmt.Errorf("bad JSON array format - found '%v' "+
+ "after '%v' in input source", readString,
+ string(json.ArrayEnd))
+ }
+ }
+ }
+
+ // this will catch any invalid inter JSON object byte that occurs in the
+ // input source
+ if !(readByte == json.ArraySep ||
+ strings.TrimSpace(string(readByte)) == "" ||
+ readByte == json.ArrayStart ||
+ readByte == json.ArrayEnd) {
+ if r.expectedByte == json.ArrayStart {
+ return ErrNoOpeningBracket
+ }
+ return fmt.Errorf("bad JSON array format - found '%v' outside "+
+ "JSON object/array in input source", string(readByte))
+ }
+ }
+ // adjust the buffer to account for read bytes
+ if scanp < len(r.decoder.Buf) {
+ r.decoder.Buf = r.decoder.Buf[scanp:]
+ } else {
+ r.decoder.Buf = []byte{}
+ }
+ r.readOpeningBracket = true
+ return nil
+}