summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/shard.h
blob: f0d25f997555b2d25e386c8081c78281944eff3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include <boost/optional.hpp>

#include "mongo/bson/bsonobj.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/read_preference.h"
#include "mongo/db/logical_time.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/executor/remote_command_response.h"
#include "mongo/s/shard_id.h"

namespace mongo {

class BatchedCommandRequest;
class BatchedCommandResponse;
class OperationContext;
class RemoteCommandTargeter;

/**
 * Presents an interface for talking to shards, regardless of whether that shard is remote or is
 * the current (local) shard.
 */
class Shard {
public:
    struct CommandResponse {
        CommandResponse(boost::optional<HostAndPort> hostAndPort,
                        BSONObj response,
                        Status commandStatus,
                        Status writeConcernStatus)
            : hostAndPort(std::move(hostAndPort)),
              response(std::move(response)),
              commandStatus(std::move(commandStatus)),
              writeConcernStatus(std::move(writeConcernStatus)) {}

        /**
         * Takes the response from running a batch write command and writes the appropriate response
         * into batchResponse, while also returning the Status of the operation.
         */
        static Status processBatchWriteResponse(StatusWith<CommandResponse> response,
                                                BatchedCommandResponse* batchResponse);

        /**
         * Returns an error status if either commandStatus or writeConcernStatus has an error.
         */
        static Status getEffectiveStatus(const StatusWith<CommandResponse>& swResponse);

        boost::optional<HostAndPort> hostAndPort;
        BSONObj response;
        Status commandStatus;
        Status writeConcernStatus;
    };

    struct QueryResponse {
        std::vector<BSONObj> docs;
        repl::OpTime opTime;
    };

    enum class RetryPolicy {
        kIdempotent,
        kNotIdempotent,
        kNoRetry,
    };

    virtual ~Shard() = default;

    const ShardId& getId() const {
        return _id;
    }

    /**
     * Returns true if this shard object represents the config server.
     */
    bool isConfig() const;

    /**
     * Returns the current connection string for the shard.
     */
    virtual const ConnectionString getConnString() const = 0;

    /**
     * Returns the connection string that was used to create the Shard from the ShardFactory.  The
     * current connection string may be different.
     * NOTE: Chances are this isn't the method you want.  When in doubt, prefer to use
     * getConnString() instead.
     */
    virtual const ConnectionString originalConnString() const = 0;

    /**
     * Returns the RemoteCommandTargeter for the hosts in this shard.
     *
     * This is only valid to call on ShardRemote instances.
     */
    virtual std::shared_ptr<RemoteCommandTargeter> getTargeter() const = 0;

    /**
     * Notifies the RemoteCommandTargeter owned by the shard of a particular mode of failure for
     * the specified host.
     *
     * This is only valid to call on ShardRemote instances.
     */
    virtual void updateReplSetMonitor(const HostAndPort& remoteHost,
                                      const Status& remoteCommandStatus) = 0;

    /**
     * Returns a string description of this shard entry.
     */
    virtual std::string toString() const = 0;

    /**
     * Returns whether a server operation which failed with the given error code should be retried
     * (i.e. is safe to retry and has the potential to succeed next time).  The 'options' argument
     * describes whether the operation that generated the given code was idempotent, which affects
     * which codes are safe to retry on.
     */
    virtual bool isRetriableError(ErrorCodes::Error code, RetryPolicy options) = 0;

    /**
     * Runs the specified command returns the BSON command response plus parsed out Status of this
     * response and write concern error (if present). Retries failed operations according to the
     * given "retryPolicy".  Retries indefinitely until/unless a non-retriable error is encountered,
     * the maxTimeMs on the OperationContext expires, or the operation is interrupted.
     */
    StatusWith<CommandResponse> runCommand(OperationContext* opCtx,
                                           const ReadPreferenceSetting& readPref,
                                           const std::string& dbName,
                                           const BSONObj& cmdObj,
                                           RetryPolicy retryPolicy);

    /**
     * Same as the other variant of runCommand, but allows the operation timeout to be overriden.
     * Runs for the lesser of the remaining time on the operation context or the specified maxTimeMS
     * override.
     */
    StatusWith<CommandResponse> runCommand(OperationContext* opCtx,
                                           const ReadPreferenceSetting& readPref,
                                           const std::string& dbName,
                                           const BSONObj& cmdObj,
                                           Milliseconds maxTimeMSOverride,
                                           RetryPolicy retryPolicy);

    /**
     * Same as runCommand, but will only retry failed operations up to 3 times, regardless of
     * the retryPolicy or the remaining maxTimeMs.
     * Wherever possible this method should be avoided in favor of runCommand.
     */
    StatusWith<CommandResponse> runCommandWithFixedRetryAttempts(
        OperationContext* opCtx,
        const ReadPreferenceSetting& readPref,
        const std::string& dbName,
        const BSONObj& cmdObj,
        RetryPolicy retryPolicy);

    /**
     * Same as runCommand, but will only retry failed operations up to 3 times, regardless of
     * the retryPolicy or the remaining maxTimeMs.
     * Wherever possible this method should be avoided in favor of runCommand.
     */
    StatusWith<CommandResponse> runCommandWithFixedRetryAttempts(
        OperationContext* opCtx,
        const ReadPreferenceSetting& readPref,
        const std::string& dbName,
        const BSONObj& cmdObj,
        Milliseconds maxTimeMSOverride,
        RetryPolicy retryPolicy);

    /**
     * Schedules the command to be sent to the shard asynchronously. Does not provide any guarantee
     * on whether the command is actually sent or even scheduled successfully.
     */
    virtual void runFireAndForgetCommand(OperationContext* opCtx,
                                         const ReadPreferenceSetting& readPref,
                                         const std::string& dbName,
                                         const BSONObj& cmdObj) = 0;

    /**
    * Runs a cursor command, exhausts the cursor, and pulls all data into memory. Performs retries
    * if the command fails in accordance with the kIdempotent RetryPolicy.
    */
    StatusWith<QueryResponse> runExhaustiveCursorCommand(OperationContext* opCtx,
                                                         const ReadPreferenceSetting& readPref,
                                                         const std::string& dbName,
                                                         const BSONObj& cmdObj,
                                                         Milliseconds maxTimeMSOverride);

    /**
     * Runs a write command against a shard. This is separate from runCommand, because write
     * commands return errors in a different format than regular commands do, so checking for
     * retriable errors must be done differently.
     */
    BatchedCommandResponse runBatchWriteCommand(OperationContext* opCtx,
                                                const Milliseconds maxTimeMS,
                                                const BatchedCommandRequest& batchRequest,
                                                RetryPolicy retryPolicy);

    /**
    * Warning: This method exhausts the cursor and pulls all data into memory.
    * Do not use other than for very small (i.e., admin or metadata) collections.
    * Performs retries if the query fails in accordance with the kIdempotent RetryPolicy.
    *
    * ShardRemote instances expect "readConcernLevel" to always be kMajorityReadConcern, whereas
    * ShardLocal instances expect either kLocalReadConcern or kMajorityReadConcern.
    */
    StatusWith<QueryResponse> exhaustiveFindOnConfig(OperationContext* opCtx,
                                                     const ReadPreferenceSetting& readPref,
                                                     const repl::ReadConcernLevel& readConcernLevel,
                                                     const NamespaceString& nss,
                                                     const BSONObj& query,
                                                     const BSONObj& sort,
                                                     const boost::optional<long long> limit);

    /**
     * Builds an index on a config server collection.
     * Creates the collection if it doesn't yet exist.  Does not error if the index already exists,
     * so long as the options are the same.
     * NOTE: Currently only supported for LocalShard.
     */
    virtual Status createIndexOnConfig(OperationContext* opCtx,
                                       const NamespaceString& ns,
                                       const BSONObj& keys,
                                       bool unique) = 0;

    // This timeout will be used by default in operations against the config server, unless
    // explicitly overridden
    static const Milliseconds kDefaultConfigCommandTimeout;

    /**
     * Returns false if the error is a retriable error and/or causes a replset monitor update. These
     * errors, if from a remote call, should not be further propagated back to another server
     * because that server will interpret them as orignating on this server rather than the one this
     * server called.
     */
    static bool shouldErrorBePropagated(ErrorCodes::Error code);

    /**
     * Updates this shard's lastCommittedOpTime timestamp, if the given value is greater than the
     * currently stored value.
     *
     * This is only valid to call on ShardRemote instances.
     */
    virtual void updateLastCommittedOpTime(LogicalTime lastCommittedOpTime) = 0;

    /**
     * Returns the latest lastCommittedOpTime timestamp returned by the underlying shard. This
     * represents the latest opTime timestamp known to be in this shard's majority committed
     * snapshot.
     *
     * This is only valid to call on ShardRemote instances.
     */
    virtual LogicalTime getLastCommittedOpTime() const = 0;

protected:
    Shard(const ShardId& id);

private:
    /**
     * Runs the specified command against the shard backed by this object with a timeout set to the
     * minimum of maxTimeMSOverride or the timeout of the OperationContext.
     *
     * The return value exposes RemoteShard's host for calls to updateReplSetMonitor.
     *
     * NOTE: LocalShard implementation will not return a valid host and so should be ignored.
     */
    virtual StatusWith<CommandResponse> _runCommand(OperationContext* opCtx,
                                                    const ReadPreferenceSetting& readPref,
                                                    const std::string& dbname,
                                                    Milliseconds maxTimeMSOverride,
                                                    const BSONObj& cmdObj) = 0;

    virtual StatusWith<QueryResponse> _runExhaustiveCursorCommand(
        OperationContext* opCtx,
        const ReadPreferenceSetting& readPref,
        const std::string& dbName,
        Milliseconds maxTimeMSOverride,
        const BSONObj& cmdObj) = 0;

    virtual StatusWith<QueryResponse> _exhaustiveFindOnConfig(
        OperationContext* opCtx,
        const ReadPreferenceSetting& readPref,
        const repl::ReadConcernLevel& readConcernLevel,
        const NamespaceString& nss,
        const BSONObj& query,
        const BSONObj& sort,
        boost::optional<long long> limit) = 0;

    /**
     * Identifier of the shard as obtained from the configuration data (i.e. shard0000).
     */
    const ShardId _id;
};

}  // namespace mongo