summaryrefslogtreecommitdiff
path: root/src/mongo/gotools/src/github.com/mongodb/mongo-tools/vendor/github.com/mongodb/mongo-tools-common/db/bson_stream.go
blob: e1579da6028c4ff67d4eabc0fab05ebba0497fab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package db

import (
	"fmt"
	"io"

	"go.mongodb.org/mongo-driver/bson"
)

// BSONSource reads documents from the underlying io.ReadCloser, Stream which
// wraps a stream of BSON documents.
type BSONSource struct {
	reusableBuf []byte
	Stream      io.ReadCloser
	err         error
	MaxBSONSize int32
}

// DecodedBSONSource reads documents from the underlying io.ReadCloser, Stream which
// wraps a stream of BSON documents.
type DecodedBSONSource struct {
	RawDocSource
	err error
}

// RawDocSource wraps basic functions for reading a BSON source file.
type RawDocSource interface {
	LoadNext() []byte
	Close() error
	Err() error
}

// NewBSONSource creates a BSONSource with a reusable I/O buffer
func NewBSONSource(in io.ReadCloser) *BSONSource {
	return &BSONSource{make([]byte, MaxBSONSize), in, nil, MaxBSONSize}
}

// NewBufferlessBSONSource creates a BSONSource without a reusable I/O buffer
func NewBufferlessBSONSource(in io.ReadCloser) *BSONSource {
	return &BSONSource{nil, in, nil, MaxBSONSize}
}

// Close closes the BSONSource, rendering it unusable for I/O.
// It returns an error, if any.
func (bs *BSONSource) Close() error {
	return bs.Stream.Close()
}

func NewDecodedBSONSource(ds RawDocSource) *DecodedBSONSource {
	return &DecodedBSONSource{ds, nil}
}

// Err returns any error in the DecodedBSONSource or its RawDocSource.
func (dbs *DecodedBSONSource) Err() error {
	if dbs.err != nil {
		return dbs.err
	}
	return dbs.RawDocSource.Err()
}

// NextGBSON unmarshals the next BSON document into result using the official go driver. Returns true if no errors are
// encountered and false otherwise. This function does NOT zero out the result before writing to it.
func (dbs *DecodedBSONSource) Next(result interface{}) bool {
	doc := dbs.LoadNext()
	if doc == nil {
		return false
	}
	if err := bson.Unmarshal(doc, result); err != nil {
		dbs.err = err
		return false
	}
	dbs.err = nil
	return true
}

// LoadNext reads and returns the next BSON document in the stream. If the
// BSONSource was created with NewBSONSource then each returned []byte will be
// a slice of a single reused I/O buffer. If the BSONSource was created with
// NewBufferlessBSONSource then each returend []byte will be individually
// allocated
func (bs *BSONSource) LoadNext() []byte {
	var into []byte
	if bs.reusableBuf == nil {
		into = make([]byte, 4)
	} else {
		into = bs.reusableBuf
	}
	// read the bson object size (a 4 byte integer)
	_, err := io.ReadAtLeast(bs.Stream, into[0:4], 4)
	if err != nil {
		if err != io.EOF {
			bs.err = err
			return nil
		}
		// we hit EOF right away, so we're at the end of the stream.
		bs.err = nil
		return nil
	}

	bsonSize := int32(
		(uint32(into[0]) << 0) |
			(uint32(into[1]) << 8) |
			(uint32(into[2]) << 16) |
			(uint32(into[3]) << 24),
	)

	// Verify that the size of the BSON object we are about to read can
	// actually fit into the buffer that was provided. If not, either the BSON is
	// invalid, or the buffer passed in is too small.
	// Verify that we do not have an invalid BSON document with size < 5.
	if bsonSize > bs.MaxBSONSize || bsonSize < 5 {
		bs.err = fmt.Errorf("invalid BSONSize: %v bytes", bsonSize)
		return nil
	}
	if int(bsonSize) > cap(into) {
		bigInto := make([]byte, bsonSize)
		copy(bigInto, into)
		into = bigInto
		if bs.reusableBuf != nil {
			bs.reusableBuf = bigInto
		}
	}
	into = into[:int(bsonSize)]
	_, err = io.ReadAtLeast(bs.Stream, into[4:], int(bsonSize-4))
	if err != nil {
		if err != io.EOF {
			bs.err = err
			return nil
		}
		// this case means we hit EOF but read a partial document,
		// so there's a broken doc in the stream. Treat this as error.
		bs.err = fmt.Errorf("invalid bson: %v", err)
		return nil
	}

	bs.err = nil
	return into
}

func (bs *BSONSource) Err() error {
	return bs.err
}

func (bs *BSONSource) SetMaxBSONSize(size int32) {
	bs.MaxBSONSize = size
}