summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/collection_sharding_state.h
blob: 5aa44336f706184439b2c2df3c4f1d3f83eab677 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/**
 *    Copyright (C) 2015 MongoDB Inc.
 *
 *    This program is free software: you can redistribute it and/or  modify
 *    it under the terms of the GNU Affero General Public License, version 3,
 *    as published by the Free Software Foundation.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    GNU Affero General Public License for more details.
 *
 *    You should have received a copy of the GNU Affero General Public License
 *    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the GNU Affero General Public License in all respects
 *    for all of the code used other than as permitted herein. If you modify
 *    file(s) with this exception, you may extend this exception to your
 *    version of the file(s), but you are not obligated to do so. If you do not
 *    wish to do so, delete this exception statement from your version. If you
 *    delete this exception statement from all source files in the program,
 *    then also delete it in the license file.
 */

#pragma once

#include <memory>
#include <string>

#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/metadata_manager.h"
#include "mongo/util/concurrency/notification.h"

namespace mongo {

// How long to wait before starting cleanup of an emigrated chunk range.
extern AtomicInt32 orphanCleanupDelaySecs;

class BalancerConfiguration;
class BSONObj;
struct ChunkVersion;
class CollectionMetadata;
class MigrationSourceManager;
class OperationContext;

/**
 * Contains all sharding-related runtime state for a given collection. One such object is assigned
 * to each sharded collection known on a mongod instance. A set of these objects is linked off the
 * instance's sharding state.
 *
 * Synchronization rules: In order to look-up this object in the instance's sharding map, one must
 * have some lock on the respective collection.
 */
class CollectionShardingState {
    MONGO_DISALLOW_COPYING(CollectionShardingState);

public:
    using CleanupNotification = CollectionRangeDeleter::DeleteNotification;

    /**
     * Instantiates a new per-collection sharding state as unsharded.
     */
    CollectionShardingState(ServiceContext* sc, NamespaceString nss);
    ~CollectionShardingState();

    /**
     * Details of documents being removed from a sharded collection.
     */
    struct DeleteState {
        // Contains the fields of the document that are in the collection's shard key, and "_id".
        BSONObj documentKey;

        // True if the document being deleted belongs to a chunk which, while still in the shard,
        // is being migrated out. (Not to be confused with "fromMigrate", which tags operations
        // that are steps in performing the migration.)
        bool isMigrating;
    };

    DeleteState makeDeleteState(BSONObj const& doc);

    /**
     * Obtains the sharding state for the specified collection. If it does not exist, it will be
     * created and will remain active until the collection is dropped or unsharded.
     *
     * Must be called with some lock held on the specific collection being looked up and the
     * returned pointer should never be stored.
     */
    static CollectionShardingState* get(OperationContext* opCtx, const NamespaceString& nss);
    static CollectionShardingState* get(OperationContext* opCtx, const std::string& ns);

    /**
     * Returns the chunk metadata for the collection. The metadata it represents lives as long as
     * the object itself, and the collection, exist. After dropping the collection lock, the
     * collection may no longer exist, but it is still safe to destroy the object.
     */
    ScopedCollectionMetadata getMetadata();

    /**
     * BSON output of the pending metadata into a BSONArray
     */
    void toBSONPending(BSONArrayBuilder& bb) const {
        _metadataManager->toBSONPending(bb);
    }

    /**
     * Updates the metadata based on changes received from the config server and also resolves the
     * pending receives map in case some of these pending receives have completed or have been
     * abandoned.  If newMetadata is null, unshard the collection.
     *
     * Must always be called with an exclusive collection lock.
     */
    void refreshMetadata(OperationContext* opCtx, std::unique_ptr<CollectionMetadata> newMetadata);

    /**
     * Marks the collection as not sharded at stepdown time so that no filtering will occur for
     * slaveOk queries.
     */
    void markNotShardedAtStepdown();

    /**
     * Schedules any documents in `range` for immediate cleanup iff no running queries can depend
     * on them, and adds the range to the list of pending ranges. Otherwise, returns a notification
     * that yields bad status immediately.  Does not block.  Call waitStatus(opCtx) on the result
     * to wait for the deletion to complete or fail.  After that, call waitForClean to ensure no
     * other deletions are pending for the range.
     */
    auto beginReceive(ChunkRange const& range) -> CleanupNotification;

    /*
     * Removes `range` from the list of pending ranges, and schedules any documents in the range for
     * immediate cleanup.  Does not block.
     */
    void forgetReceive(const ChunkRange& range);

    /**
     * Schedules documents in `range` for cleanup after any running queries that may depend on them
     * have terminated. Does not block. Fails if range overlaps any current local shard chunk.
     * Passed kDelayed, an additional delay (configured via server parameter orphanCleanupDelaySecs)
     * is added to permit (most) dependent queries on secondaries to complete, too.
     *
     * Call result.waitStatus(opCtx) to wait for the deletion to complete or fail. If that succeeds,
     * waitForClean can be called to ensure no other deletions are pending for the range. Call
     * result.abandon(), instead of waitStatus, to ignore the outcome.
     */
    enum CleanWhen { kNow, kDelayed };
    auto cleanUpRange(ChunkRange const& range, CleanWhen) -> CleanupNotification;

    /**
     * Returns a vector of ScopedCollectionMetadata objects representing metadata instances in use
     * by running queries that overlap the argument range, suitable for identifying and invalidating
     * those queries.
     */
    auto overlappingMetadata(ChunkRange const& range) const
        -> std::vector<ScopedCollectionMetadata>;

    /**
     * Returns the active migration source manager, if one is available.
     */
    MigrationSourceManager* getMigrationSourceManager();

    /**
     * Attaches a migration source manager to this collection's sharding state. Must be called with
     * collection X lock. May not be called if there is a migration source manager already
     * installed. Must be followed by a call to clearMigrationSourceManager.
     */
    void setMigrationSourceManager(OperationContext* opCtx, MigrationSourceManager* sourceMgr);

    /**
     * Removes a migration source manager from this collection's sharding state. Must be called with
     * collection X lock. May not be called if there isn't a migration source manager installed
     * already through a previous call to setMigrationSourceManager.
     */
    void clearMigrationSourceManager(OperationContext* opCtx);

    /**
     * Checks whether the shard version in the context is compatible with the shard version of the
     * collection locally and if not throws StaleConfigException populated with the expected and
     * actual versions.
     *
     * Because StaleConfigException has special semantics in terms of how a sharded command's
     * response is constructed, this function should be the only means of checking for shard version
     * match.
     */
    void checkShardVersionOrThrow(OperationContext* opCtx);

    /**
     * Returns whether this collection is sharded. Valid only if mongoD is primary.
     * TODO SERVER-24960: This method may return a false positive until SERVER-24960 is fixed.
     */
    bool collectionIsSharded();

    /**
     * Tracks deletion of any documents within the range, returning when deletion is complete.
     * Throws if the collection is dropped while it sleeps. Call this with the collection unlocked.
     */
    static Status waitForClean(OperationContext*, NamespaceString, OID const& epoch, ChunkRange);

    /**
     * Reports whether any range still scheduled for deletion overlaps the argument range. If so,
     * it returns a notification n such that n->get(opCtx) will wake when the newest overlapping
     * range's deletion (possibly the one of interest) completes or fails. This should be called
     * again after each wakeup until it returns boost::none, because there can be more than one
     * range scheduled for deletion that overlaps its argument.
     */
    auto trackOrphanedDataCleanup(ChunkRange const& range) -> boost::optional<CleanupNotification>;

    /**
     * Returns a range _not_ owned by this shard that starts no lower than the specified
     * startingFrom key value, if any, or boost::none if there is no such range.
     */
    boost::optional<KeyRange> getNextOrphanRange(BSONObj const& startingFrom);

    /**
     * Replication oplog OpObserver hooks. Informs the sharding system of changes that may be
     * relevant to ongoing operations.
     *
     * The global exclusive lock is expected to be held by the caller of any of these functions.
     */
    void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc);
    void onUpdateOp(OperationContext* opCtx,
                    const BSONObj& query,
                    const BSONObj& update,
                    const BSONObj& updatedDoc);
    void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState);
    void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);

private:
    /**
     * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible
     * to reads -- to notify the catalog cache loader of a new collection version. The catalog
     * cache's routing table for the collection will also be invalidated at that time so that the
     * next caller to the catalog cache for routing information will provoke a routing table
     * refresh.
     *
     * This only runs on secondaries, and only when 'lastRefreshedCollectionVersion' is in 'update',
     * meaning a chunk metadata refresh finished being applied to the collection's locally persisted
     * metadata store.
     *
     * query - BSON with an _id that identifies which collections entry is being updated.
     * update - the update being applied to the collections entry.
     * updatedDoc - the document identified by 'query' with the 'update' applied.
     *
     * The global exclusive lock is expected to be held by the caller.
     */
    void _onConfigRefreshCompleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx,
                                                                   const BSONObj& query,
                                                                   const BSONObj& update,
                                                                   const BSONObj& updatedDoc);

    /**
     * Registers a task on the opCtx -- to run after writes from the oplog are committed and visible
     * to reads -- to notify the catalog cache loader of a new collection version. The catalog
     * cache's routing table for the collection will also be invalidated at that time so that the
     * next caller to the catalog cache for routing information will provoke a routing table
     * refresh.
     *
     * This only runs on secondaries
     *
     * query - BSON with an _id field that identifies which collections entry is being updated.
     *
     * The global exclusive lock is expected to be held by the caller.
     */
    void _onConfigDeleteInvalidateCachedMetadataAndNotify(OperationContext* opCtx,
                                                          const BSONObj& query);

    /**
     * Checks whether the shard version of the operation matches that of the collection.
     *
     * opCtx - Operation context from which to retrieve the operation's expected version.
     * errmsg (out) - On false return contains an explanatory error message.
     * expectedShardVersion (out) - On false return contains the expected collection version on this
     *  shard. Obtained from the operation sharding state.
     * actualShardVersion (out) - On false return contains the actual collection version on this
     *  shard. Obtained from the collection sharding state.
     *
     * Returns true if the expected collection version on the shard matches its actual version on
     * the shard and false otherwise. Upon false return, the output parameters will be set.
     */
    bool _checkShardVersionOk(OperationContext* opCtx,
                              std::string* errmsg,
                              ChunkVersion* expectedShardVersion,
                              ChunkVersion* actualShardVersion);

    /**
     * If the collection is sharded, finds the chunk that contains the specified document, and
     * increments the size tracked for that chunk by the specified amount of data written, in
     * bytes. Returns the number of total bytes on that chunk, after the data is written.
     */
    uint64_t _incrementChunkOnInsertOrUpdate(OperationContext* opCtx,
                                             const BSONObj& document,
                                             long dataWritten);

    /**
     * Returns true if the total number of bytes on the specified chunk nears the max size of
     * a shard.
     */
    bool _shouldSplitChunk(OperationContext* opCtx,
                           const ShardKeyPattern& shardKeyPattern,
                           const Chunk& chunk);

    // Namespace this state belongs to.
    const NamespaceString _nss;

    // Contains all the metadata associated with this collection.
    std::shared_ptr<MetadataManager> _metadataManager;

    // If this collection is serving as a source shard for chunk migration, this value will be
    // non-null. To write this value there needs to be X-lock on the collection in order to
    // synchronize with other callers, which read it.
    //
    // NOTE: The value is not owned by this class.
    MigrationSourceManager* _sourceMgr{nullptr};

    // for access to _metadataManager
    friend auto CollectionRangeDeleter::cleanUpNextRange(OperationContext*,
                                                         NamespaceString const&,
                                                         OID const& epoch,
                                                         int maxToDelete,
                                                         CollectionRangeDeleter*)
        -> boost::optional<Date_t>;
};

}  // namespace mongo