1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package db
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// BufferedBulkInserter implements a bufio.Writer-like design for queuing up
// documents and inserting them in bulk when the given doc limit (or max
// message size) is reached. Must be flushed at the end to ensure that all
// documents are written.
type BufferedBulkInserter struct {
collection *mongo.Collection
writeModels []mongo.WriteModel
docLimit int
docCount int
bulkWriteOpts *options.BulkWriteOptions
upsert bool
}
func newBufferedBulkInserter(collection *mongo.Collection, docLimit int, ordered bool) *BufferedBulkInserter {
bb := &BufferedBulkInserter{
collection: collection,
bulkWriteOpts: options.BulkWrite().SetOrdered(ordered),
docLimit: docLimit,
writeModels: make([]mongo.WriteModel, 0, docLimit),
}
return bb
}
// NewOrderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing ordered bulk writes.
func NewOrderedBufferedBulkInserter(collection *mongo.Collection, docLimit int) *BufferedBulkInserter {
return newBufferedBulkInserter(collection, docLimit, true)
}
// NewOrderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing unordered bulk writes.
func NewUnorderedBufferedBulkInserter(collection *mongo.Collection, docLimit int) *BufferedBulkInserter {
return newBufferedBulkInserter(collection, docLimit, false)
}
func (bb *BufferedBulkInserter) SetOrdered(ordered bool) *BufferedBulkInserter {
bb.bulkWriteOpts.SetOrdered(ordered)
return bb
}
func (bb *BufferedBulkInserter) SetBypassDocumentValidation(bypass bool) *BufferedBulkInserter {
bb.bulkWriteOpts.SetBypassDocumentValidation(bypass)
return bb
}
func (bb *BufferedBulkInserter) SetUpsert(upsert bool) *BufferedBulkInserter {
bb.upsert = upsert
return bb
}
// throw away the old bulk and init a new one
func (bb *BufferedBulkInserter) resetBulk() {
bb.writeModels = bb.writeModels[:0]
bb.docCount = 0
}
// Insert adds a document to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning
// any error that occurs.
func (bb *BufferedBulkInserter) Insert(doc interface{}) (*mongo.BulkWriteResult, error) {
rawBytes, err := bson.Marshal(doc)
if err != nil {
return nil, fmt.Errorf("bson encoding error: %v", err)
}
return bb.InsertRaw(rawBytes)
}
// Update adds a document to the buffer for bulk update. If the buffer becomes full, the bulk write is performed, returning
// any error that occurs.
func (bb *BufferedBulkInserter) Update(selector, update bson.D) (*mongo.BulkWriteResult, error) {
return bb.addModel(mongo.NewUpdateOneModel().SetFilter(selector).SetUpdate(update).SetUpsert(bb.upsert))
}
// Replace adds a document to the buffer for bulk replacement. If the buffer becomes full, the bulk write is performed, returning
// any error that occurs.
func (bb *BufferedBulkInserter) Replace(selector, replacement bson.D) (*mongo.BulkWriteResult, error) {
return bb.addModel(mongo.NewReplaceOneModel().SetFilter(selector).SetReplacement(replacement).SetUpsert(bb.upsert))
}
// InsertRaw adds a document, represented as raw bson bytes, to the buffer for bulk insertion. If the buffer becomes full,
// the bulk write is performed, returning any error that occurs.
func (bb *BufferedBulkInserter) InsertRaw(rawBytes []byte) (*mongo.BulkWriteResult, error) {
return bb.addModel(mongo.NewInsertOneModel().SetDocument(rawBytes))
}
// addModel adds a WriteModel to the buffer. If the buffer becomes full, the bulk write is performed, returning any error
// that occurs.
func (bb *BufferedBulkInserter) addModel(model mongo.WriteModel) (*mongo.BulkWriteResult, error) {
bb.docCount++
bb.writeModels = append(bb.writeModels, model)
if bb.docCount >= bb.docLimit {
return bb.Flush()
}
return nil, nil
}
// Flush writes all buffered documents in one bulk write and then resets the buffer.
func (bb *BufferedBulkInserter) Flush() (*mongo.BulkWriteResult, error) {
if bb.docCount == 0 {
return nil, nil
}
defer bb.resetBulk()
return bb.collection.BulkWrite(context.Background(), bb.writeModels, bb.bulkWriteOpts)
}
|