1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include "mongo/db/catalog/collection.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/platform/mutex.h"
namespace mongo {
class AutoGetCollection;
class ThreadPool;
/**
* Provides an interface for asynchronously adding to a collection.
*
* Allows writes to a collection in a context without appropriate locks by buffering them in memory
* and asynchronously writing them to the backing collection. Useful when an operation with e.g. a
* global MODE_S lock needs to write, but doesn't care that the write shows up immediately.
* Motivated by the local health log. For obvious reasons, cannot provide strong durability
* guarantees, and cannot report whether the insert succeeded--in other words, this class provides
* eventual "best effort" inserts.
*
* Because this class is motivated by the health log and errors cannot be cleanly reported to the
* caller, it cannot report most errors to the client; it instead periodically logs any errors to
* the system log.
*
* Instances of this class are unconditionally thread-safe, and cannot cause deadlock barring
* improper use of the ctor, `flush` and `shutdown` methods below.
*/
class DeferredWriter {
DeferredWriter(const DeferredWriter&) = delete;
DeferredWriter& operator=(const DeferredWriter&) = delete;
public:
/**
* Create a new DeferredWriter for writing to a given collection.
*
* Will not begin writing to the backing collection until `startup` is called.
*
* @param opts The options to use when creating the backing collection if it doesn't exist.
* @param maxSize the maximum number of bytes to store in the buffer.
*/
DeferredWriter(NamespaceString nss, CollectionOptions opts, int64_t maxSize);
/**
* Start the background worker thread writing to the given collection.
*
* @param workerName The name of the client associated with the worker thread.
*/
void startup(std::string workerName);
/**
* Flush the buffer and `join` the worker thread.
*
* IMPORTANT: Must be called before destruction if `startup` has been called.
*
* Blocks until buffered writes complete. Must not be called repeatedly.
*/
void shutdown(void);
/**
* Cleans up the writer.
*
* Does not clean up the worker thread; call `shutdown` for that. Instead, if the worker thread
* is still running calls std::terminate, which crashes the server.
*/
~DeferredWriter();
/**
* Deferred-insert the given object.
*
* Returns whether the object was successfully pushed onto the in-memory buffer (*not* whether
* it was successfully added to the underlying collection). Creates the backing collection if
* it doesn't exist.
*/
bool insertDocument(BSONObj obj);
/**
* Get the number of dropped writes due to a full buffer since the last log
*/
int64_t getDroppedEntries();
private:
/**
* Log failure, but only if a certain interval has passed since the last log.
*/
void _logFailure(const Status& status);
/**
* Log number of entries dropped because of a full buffer. Rate limited and
* each successful log resets the counter.
*/
void _logDroppedEntry();
/**
* Create the backing collection if it doesn't exist.
*
* Return whether creation succeeded.
*/
Status _makeCollection(OperationContext* opCtx);
/**
* Ensure that the backing collection exists, and pass back a lock and handle to it.
*/
StatusWith<std::unique_ptr<AutoGetCollection>> _getCollection(OperationContext* opCtx);
/**
* The method that the worker thread will run.
*/
Status _worker(InsertStatement stmt) noexcept;
/**
* The options for the collection, in case we need to create it.
*/
const CollectionOptions _collectionOptions;
/**
* The size limit of the in-memory buffer.
*/
const int64_t _maxNumBytes;
/**
* The name of the backing collection.
*/
const NamespaceString _nss;
std::unique_ptr<ThreadPool> _pool;
/**
* Guards all non-const, non-thread-safe members.
*/
Mutex _mutex = MONGO_MAKE_LATCH("DeferredWriter::_mutex");
/**
* The number of bytes currently in the in-memory buffer.
*/
int64_t _numBytes;
/**
* The number of deffered entries that have been dropped. Resets when the
* rate-limited system log is written out.
*/
int64_t _droppedEntries;
/**
* Time we last logged that we can't write to the underlying collection.
*
* Ensures we don't flood the log with such entries.
*/
using TimePoint = stdx::chrono::time_point<stdx::chrono::system_clock>;
TimePoint _lastLogged;
TimePoint _lastLoggedDrop;
};
} // namespace mongo
|