summaryrefslogtreecommitdiff
path: root/src/mongo/executor/network_interface_mock.h
blob: f87b18c27b3f0c908b872547bd6b7de51a75e5a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568

/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include <memory>
#include <queue>
#include <utility>
#include <vector>

#include "mongo/base/disallow_copying.h"
#include "mongo/executor/network_interface.h"
#include "mongo/rpc/metadata/metadata_hook.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/stdx/unordered_set.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/time_support.h"

namespace mongo {

class BSONObj;

namespace executor {

using ResponseStatus = TaskExecutor::ResponseStatus;
class NetworkConnectionHook;

/**
 * Mock network implementation for use in unit tests.
 *
 * To use, construct a new instance on the heap, and keep a pointer to it.  Pass
 * the pointer to the instance into the TaskExecutor constructor, transferring
 * ownership.  Start the executor's run() method in a separate thread, schedule the
 * work you want to test into the executor, then while the test is still going, iterate
 * through the ready network requests, servicing them and advancing time as needed.
 *
 * The mock has a fully virtualized notion of time and the the network.  When the
 * executor under test schedules a network operation, the startCommand
 * method of this class adds an entry to the _unscheduled queue for immediate consideration.
 * The test driver loop, when it examines the request, may schedule a response, ask the
 * interface to redeliver the request at a later virtual time, or to swallow the virtual
 * request until the end of the simulation.  The test driver loop can also instruct the
 * interface to run forward through virtual time until there are operations ready to
 * consider, via runUntil.
 *
 * The thread acting as the "network" and the executor run thread are highly synchronized
 * by this code, allowing for deterministic control of operation interleaving.
 */
class NetworkInterfaceMock : public NetworkInterface {
public:
    class NetworkOperation;
    using NetworkOperationList = stdx::list<NetworkOperation>;
    using NetworkOperationIterator = NetworkOperationList::iterator;

    NetworkInterfaceMock();
    virtual ~NetworkInterfaceMock();
    virtual void appendConnectionStats(ConnectionPoolStats* stats) const;
    virtual std::string getDiagnosticString();
    Counters getCounters() const override {
        return Counters();
    }

    /**
     * Logs the contents of the queues for diagnostics.
     */
    virtual void logQueues();

    ////////////////////////////////////////////////////////////////////////////////
    //
    // NetworkInterface methods
    //
    ////////////////////////////////////////////////////////////////////////////////

    virtual void startup();
    virtual void shutdown();
    virtual bool inShutdown() const;
    virtual void waitForWork();
    virtual void waitForWorkUntil(Date_t when);
    virtual void setConnectionHook(std::unique_ptr<NetworkConnectionHook> hook);
    virtual void setEgressMetadataHook(std::unique_ptr<rpc::EgressMetadataHook> metadataHook);
    virtual void signalWorkAvailable();
    virtual Date_t now();
    virtual std::string getHostName();
    virtual Status startCommand(const TaskExecutor::CallbackHandle& cbHandle,
                                RemoteCommandRequest& request,
                                const RemoteCommandCompletionFn& onFinish,
                                const transport::BatonHandle& baton = nullptr);

    /**
     * If the network operation is in the _unscheduled or _processing queues, moves the operation
     * into the _scheduled queue with ErrorCodes::CallbackCanceled. If the operation is already in
     * the _scheduled queue, does nothing. The latter simulates the case where cancelCommand() is
     * called after the task has already completed, but its callback has not yet been run.
     */
    virtual void cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
                               const transport::BatonHandle& baton = nullptr);

    /**
     * Not implemented.
     */
    virtual Status setAlarm(Date_t when,
                            const stdx::function<void()>& action,
                            const transport::BatonHandle& baton = nullptr);

    virtual bool onNetworkThread();

    void dropConnections(const HostAndPort&) override {}


    ////////////////////////////////////////////////////////////////////////////////
    //
    // Methods for simulating network operations and the passage of time.
    //
    // Methods in this section are to be called by the thread currently simulating
    // the network.
    //
    ////////////////////////////////////////////////////////////////////////////////

    /**
     * RAII-style class for entering and exiting network.
     */
    class InNetworkGuard;

    /**
     * Causes the currently running (non-executor) thread to assume the mantle of the network
     * simulation thread.
     *
     * Call this before calling any of the other methods in this section.
     */
    void enterNetwork();

    /**
     * Causes the currently running thread to drop the mantle of "network simulation thread".
     *
     * Call this before calling any methods that might block waiting for the
     * executor thread.
     *
     * It is safe to call exitNetwork() even if enterNetwork() has not been called - it will just
     * be a no-op.
     */
    void exitNetwork();

    /**
     * Returns true if there are unscheduled network requests to be processed.
     */
    bool hasReadyRequests();

    /**
     * Gets the next unscheduled request to process, blocking until one is available.
     *
     * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
     */
    NetworkOperationIterator getNextReadyRequest();

    /**
     * Gets the first unscheduled request. There must be at least one unscheduled request in the
     * queue. Equivalent to getNthUnscheduledRequest(0).
     */
    NetworkOperationIterator getFrontOfUnscheduledQueue();

    /**
     * Get the nth (starting at 0) unscheduled request. Assumes there are at least n+1 unscheduled
     * requests in the queue.
     */
    NetworkOperationIterator getNthUnscheduledRequest(size_t n);

    /**
     * Schedules "response" in response to "noi" at virtual time "when".
     */
    void scheduleResponse(NetworkOperationIterator noi,
                          Date_t when,
                          const ResponseStatus& response);

    /**
     * Schedules a successful "response" to "noi" at virtual time "when".
     * "noi" defaults to next ready request.
     * "when" defaults to now().
     * Returns the "request" that the response was scheduled for.
     */
    RemoteCommandRequest scheduleSuccessfulResponse(const BSONObj& response);
    RemoteCommandRequest scheduleSuccessfulResponse(const RemoteCommandResponse& response);
    RemoteCommandRequest scheduleSuccessfulResponse(NetworkOperationIterator noi,
                                                    const RemoteCommandResponse& response);
    RemoteCommandRequest scheduleSuccessfulResponse(NetworkOperationIterator noi,
                                                    Date_t when,
                                                    const RemoteCommandResponse& response);

    /**
     * Schedules an error "response" to "noi" at virtual time "when".
     * "noi" defaults to next ready request.
     * "when" defaults to now().
     */
    RemoteCommandRequest scheduleErrorResponse(const Status& response);
    RemoteCommandRequest scheduleErrorResponse(const ResponseStatus response);
    RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
                                               const Status& response);
    RemoteCommandRequest scheduleErrorResponse(NetworkOperationIterator noi,
                                               Date_t when,
                                               const Status& response);


    /**
     * Swallows "noi", causing the network interface to not respond to it until
     * shutdown() is called.
     */
    void blackHole(NetworkOperationIterator noi);

    /**
     * Defers decision making on "noi" until virtual time "dontAskUntil".  Use
     * this when getNextReadyRequest() returns a request you want to deal with
     * after looking at other requests.
     */
    void requeueAt(NetworkOperationIterator noi, Date_t dontAskUntil);

    /**
     * Runs the simulator forward until now() == until or hasReadyRequests() is true.
     * Returns now().
     *
     * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
     */
    Date_t runUntil(Date_t until);

    /**
     * Processes all ready, scheduled network operations.
     *
     * Will not return until the executor thread is blocked in waitForWorkUntil or waitForWork.
     */
    void runReadyNetworkOperations();

    /**
     * Sets the reply of the 'isMaster' handshake for a specific host. This reply will only
     * be given to the 'validateHost' method of the ConnectionHook set on this object - NOT
     * to the completion handlers of any 'isMaster' commands scheduled with 'startCommand'.
     *
     * This reply will persist until it is changed again using this method.
     *
     * If the NetworkInterfaceMock conducts a handshake with a simulated host which has not
     * had a handshake reply set, a default constructed RemoteCommandResponse will be passed
     * to validateHost if a hook is set.
     */
    void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply);

    /**
     * Deliver the response to the callback handle if the handle is present in queuesToCheck.
     * This represents interrupting the regular flow with, for example, a NetworkTimeout or
     * CallbackCanceled error.
     */
    void _interruptWithResponse_inlock(const TaskExecutor::CallbackHandle& cbHandle,
                                       const std::vector<NetworkOperationList*> queuesToCheck,
                                       const ResponseStatus& response);

private:
    /**
     * Information describing a scheduled alarm.
     */
    struct AlarmInfo {
        using AlarmAction = stdx::function<void()>;
        AlarmInfo(Date_t inWhen, AlarmAction inAction)
            : when(inWhen), action(std::move(inAction)) {}
        bool operator>(const AlarmInfo& rhs) const {
            return when > rhs.when;
        }

        Date_t when;
        AlarmAction action;
    };

    /**
     * Type used to identify which thread (network mock or executor) is currently executing.
     *
     * Values are used in a bitmask, as well.
     */
    enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 };

    /**
     * Implementation of startup behavior.
     */
    void _startup_inlock();

    /**
     * Returns information about the state of this mock for diagnostic purposes.
     */
    std::string _getDiagnosticString_inlock() const;

    /**
     * Logs the contents of the queues for diagnostics.
     */
    void _logQueues_inlock() const;
    /**
     * Returns the current virtualized time.
     */
    Date_t _now_inlock() const {
        return _now;
    }

    /**
     * Implementation of waitForWork*.
     */
    void _waitForWork_inlock(stdx::unique_lock<stdx::mutex>* lk);

    /**
     * Returns true if there are ready requests for the network thread to service.
     */
    bool _hasReadyRequests_inlock();

    /**
     * Returns true if the network thread could run right now.
     */
    bool _isNetworkThreadRunnable_inlock();

    /**
     * Returns true if the executor thread could run right now.
     */
    bool _isExecutorThreadRunnable_inlock();

    /**
     * Enqueues a network operation to run in order of 'consideration date'.
     */
    void _enqueueOperation_inlock(NetworkOperation&& op);

    /**
     * "Connects" to a remote host, and then enqueues the provided operation.
     */
    void _connectThenEnqueueOperation_inlock(const HostAndPort& target, NetworkOperation&& op);

    /**
     * Runs all ready network operations, called while holding "lk".  May drop and
     * reaquire "lk" several times, but will not return until the executor has blocked
     * in waitFor*.
     */
    void _runReadyNetworkOperations_inlock(stdx::unique_lock<stdx::mutex>* lk);

    // Mutex that synchronizes access to mutable data in this class and its subclasses.
    // Fields guarded by the mutex are labled (M), below, and those that are read-only
    // in multi-threaded execution, and so unsynchronized, are labeled (R).
    stdx::mutex _mutex;

    // Condition signaled to indicate that the network processing thread should wake up.
    stdx::condition_variable _shouldWakeNetworkCondition;  // (M)

    // Condition signaled to indicate that the executor run thread should wake up.
    stdx::condition_variable _shouldWakeExecutorCondition;  // (M)

    // Bitmask indicating which threads are runnable.
    int _waitingToRunMask;  // (M)

    // Indicator of which thread, if any, is currently running.
    ThreadType _currentlyRunning;  // (M)

    // The current time reported by this instance of NetworkInterfaceMock.
    Date_t _now;  // (M)

    // Set to true by "startUp()"
    bool _hasStarted;  // (M)

    // Set to true by "shutDown()".
    AtomicWord<bool> _inShutdown;  // (M)

    // Next date that the executor expects to wake up at (due to a scheduleWorkAt() call).
    Date_t _executorNextWakeupDate;  // (M)

    // List of network operations whose responses haven't been scheduled or blackholed.  This is
    // where network requests are first queued.  It is sorted by
    // NetworkOperation::_nextConsiderationDate, which is set to now() when startCommand() is
    // called, and adjusted by requeueAt().
    NetworkOperationList _unscheduled;  // (M)

    // List of network operations that have been returned by getNextReadyRequest() but not
    // yet scheudled, black-holed or requeued.
    NetworkOperationList _processing;  // (M)

    // List of network operations whose responses have been scheduled but not delivered, sorted
    // by NetworkOperation::_responseDate.  These operations will have their responses delivered
    // when now() == getResponseDate().
    NetworkOperationList _scheduled;  // (M)

    // List of network operations that will not be responded to until shutdown() is called.
    NetworkOperationList _blackHoled;  // (M)

    // Heap of alarms, with the next alarm always on top.
    std::priority_queue<AlarmInfo, std::vector<AlarmInfo>, std::greater<AlarmInfo>> _alarms;  // (M)

    // The connection hook.
    std::unique_ptr<NetworkConnectionHook> _hook;  // (R)

    // The metadata hook.
    std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;  // (R)

    // The set of hosts we have seen so far. If we see a new host, we will execute the
    // ConnectionHook's validation and post-connection logic.
    //
    // TODO: provide a way to simulate disconnections.
    stdx::unordered_set<HostAndPort> _connections;  // (M)

    // The handshake replies set for each host.
    stdx::unordered_map<HostAndPort, RemoteCommandResponse> _handshakeReplies;  // (M)
};

/**
 * Representation of an in-progress network operation.
 */
class NetworkInterfaceMock::NetworkOperation {
public:
    NetworkOperation();
    NetworkOperation(const TaskExecutor::CallbackHandle& cbHandle,
                     const RemoteCommandRequest& theRequest,
                     Date_t theRequestDate,
                     const RemoteCommandCompletionFn& onFinish);
    ~NetworkOperation();

    /**
     * Adjusts the stored virtual time at which this entry will be subject to consideration
     * by the test harness.
     */
    void setNextConsiderationDate(Date_t nextConsiderationDate);

    /**
     * Sets the response and thet virtual time at which it will be delivered.
     */
    void setResponse(Date_t responseDate, const ResponseStatus& response);

    /**
     * Predicate that returns true if cbHandle equals the executor's handle for this network
     * operation.  Used for searching lists of NetworkOperations.
     */
    bool isForCallback(const TaskExecutor::CallbackHandle& cbHandle) const {
        return cbHandle == _cbHandle;
    }

    const TaskExecutor::CallbackHandle& getCallbackHandle() const {
        return _cbHandle;
    }

    /**
     * Gets the request that initiated this operation.
     */
    const RemoteCommandRequest& getRequest() const {
        return _request;
    }

    /**
     * Gets the virtual time at which the operation was started.
     */
    Date_t getRequestDate() const {
        return _requestDate;
    }

    /**
     * Gets the virtual time at which the test harness should next consider what to do
     * with this request.
     */
    Date_t getNextConsiderationDate() const {
        return _nextConsiderationDate;
    }

    /**
     * After setResponse() has been called, returns the virtual time at which
     * the response should be delivered.
     */
    Date_t getResponseDate() const {
        return _responseDate;
    }

    /**
     * Delivers the response, by invoking the onFinish callback passed into the constructor.
     */
    void finishResponse();

    /**
     * Returns a printable diagnostic string.
     */
    std::string getDiagnosticString() const;

private:
    Date_t _requestDate;
    Date_t _nextConsiderationDate;
    Date_t _responseDate;
    TaskExecutor::CallbackHandle _cbHandle;
    RemoteCommandRequest _request;
    ResponseStatus _response;
    RemoteCommandCompletionFn _onFinish;
};

/**
 * RAII type to enter and exit network on construction/destruction.
 *
 * Calls enterNetwork on construction, and exitNetwork during destruction,
 * unless dismissed.
 *
 * Not thread-safe.
 */
class NetworkInterfaceMock::InNetworkGuard {
    MONGO_DISALLOW_COPYING(InNetworkGuard);

public:
    /**
     * Calls enterNetwork.
     */
    explicit InNetworkGuard(NetworkInterfaceMock* net);
    /**
     * Calls exitNetwork, and disables the destructor from calling.
     */
    void dismiss();
    /**
     * Calls exitNetwork, unless dismiss has been called.
     */
    ~InNetworkGuard();

    /**
     * Returns network interface mock pointer.
     */
    NetworkInterfaceMock* operator->() const;

private:
    NetworkInterfaceMock* _net;
    bool _callExitNetwork = true;
};

class NetworkInterfaceMockClockSource : public ClockSource {
public:
    explicit NetworkInterfaceMockClockSource(NetworkInterfaceMock* net);

    Milliseconds getPrecision() override {
        return Milliseconds{1};
    }
    Date_t now() override {
        return _net->now();
    }
    Status setAlarm(Date_t when, stdx::function<void()> action) override {
        return _net->setAlarm(when, action);
    }

private:
    NetworkInterfaceMock* _net;
};

}  // namespace executor
}  // namespace mongo