blob: fb9a64e774da6675bef5ab05cf2c6aae6d030d63 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
/*-
* Copyright (c) 2014-2015 MongoDB, Inc.
* Copyright (c) 2008-2014 WiredTiger, Inc.
* All rights reserved.
*
* See the file LICENSE for redistribution information.
*/
#define MAX_ASYNC_SLEEP_USECS 100000 /* Maximum sleep waiting for work */
#define MAX_ASYNC_YIELD 200 /* Maximum number of yields for work */
#define O2C(op) ((WT_CONNECTION_IMPL *)(op)->iface.connection)
#define O2S(op) \
(((WT_CONNECTION_IMPL *)(op)->iface.connection)->default_session)
/*
* WT_ASYNC_FORMAT --
* The URI/config/format cache.
*/
struct __wt_async_format {
TAILQ_ENTRY(__wt_async_format) q;
const char *config;
uint64_t cfg_hash; /* Config hash */
const char *uri;
uint64_t uri_hash; /* URI hash */
const char *key_format;
const char *value_format;
};
/*
* WT_ASYNC_OP_IMPL --
* Implementation of the WT_ASYNC_OP.
*/
struct __wt_async_op_impl {
WT_ASYNC_OP iface;
WT_ASYNC_CALLBACK *cb;
uint32_t internal_id; /* Array position id. */
uint64_t unique_id; /* Unique identifier. */
WT_ASYNC_FORMAT *format; /* Format structure */
#define WT_ASYNCOP_ENQUEUED 0 /* Placed on the work queue */
#define WT_ASYNCOP_FREE 1 /* Able to be allocated to user */
#define WT_ASYNCOP_READY 2 /* Allocated, ready for user to use */
#define WT_ASYNCOP_WORKING 3 /* Operation in progress by worker */
uint32_t state;
WT_ASYNC_OPTYPE optype; /* Operation type */
};
/*
* Definition of the async subsystem.
*/
struct __wt_async {
/*
* Ops array protected by the ops_lock.
*/
WT_SPINLOCK ops_lock; /* Locked: ops array */
WT_ASYNC_OP_IMPL *async_ops; /* Async ops */
#define OPS_INVALID_INDEX 0xffffffff
uint32_t ops_index; /* Active slot index */
uint64_t op_id; /* Unique ID counter */
WT_ASYNC_OP_IMPL **async_queue; /* Async ops work queue */
uint32_t async_qsize; /* Async work queue size */
/*
* We need to have two head and tail values. All but one is
* maintained as an ever increasing value to ease wrap around.
*
* alloc_head: the next one to allocate for producers.
* head: the current head visible to consumers.
* head is always <= alloc_head.
* alloc_tail: the next slot for consumers to dequeue.
* alloc_tail is always <= head.
* tail_slot: the last slot consumed.
* A producer may need wait for tail_slot to advance.
*/
uint64_t alloc_head; /* Next slot to enqueue */
uint64_t head; /* Head visible to worker */
uint64_t alloc_tail; /* Next slot to dequeue */
uint64_t tail_slot; /* Worker slot consumed */
TAILQ_HEAD(__wt_async_format_qh, __wt_async_format) formatqh;
uint32_t cur_queue; /* Currently enqueued */
uint32_t max_queue; /* Maximum enqueued */
#define WT_ASYNC_FLUSH_NONE 0 /* No flush in progress */
#define WT_ASYNC_FLUSH_COMPLETE 1 /* Notify flush caller done */
#define WT_ASYNC_FLUSH_IN_PROGRESS 2 /* Prevent other callers */
#define WT_ASYNC_FLUSHING 3 /* Notify workers */
uint32_t flush_state;
/* Notify any waiting threads when flushing is done. */
WT_CONDVAR *flush_cond;
WT_ASYNC_OP_IMPL flush_op; /* Special flush op */
uint32_t flush_count; /* Worker count */
uint64_t flush_gen; /* Flush generation number */
#define WT_ASYNC_MAX_WORKERS 20
WT_SESSION_IMPL *worker_sessions[WT_ASYNC_MAX_WORKERS];
/* Async worker threads */
wt_thread_t worker_tids[WT_ASYNC_MAX_WORKERS];
uint32_t flags; /* Currently unused. */
};
/*
* WT_ASYNC_CURSOR --
* Async container for a cursor. Each async worker thread
* has a cache of async cursors to reuse for operations.
*/
struct __wt_async_cursor {
TAILQ_ENTRY(__wt_async_cursor) q; /* Worker cache */
uint64_t cfg_hash; /* Config hash */
uint64_t uri_hash; /* URI hash */
WT_CURSOR *c; /* WT cursor */
};
/*
* WT_ASYNC_WORKER_STATE --
* State for an async worker thread.
*/
struct __wt_async_worker_state {
uint32_t id;
TAILQ_HEAD(__wt_cursor_qh, __wt_async_cursor) cursorqh;
uint32_t num_cursors;
};
|