summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/legacystore/jrnl/jcntl.h
blob: 294e9ced05e2c24be14c8b42a65aa6733aabbe5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/**
 * \file jcntl.h
 *
 * Qpid asynchronous store plugin library
 *
 * Messaging journal top-level control and interface class
 * mrg::journal::jcntl. See class documentation for details.
 *
 * \author Kim van der Riet
 */

#ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H
#define QPID_LEGACYSTORE_JRNL_JCNTL_H

namespace mrg
{
namespace journal
{
    class jcntl;
}
}

#include <cstddef>
#include <deque>
#include "qpid/legacystore/jrnl/jdir.h"
#include "qpid/legacystore/jrnl/fcntl.h"
#include "qpid/legacystore/jrnl/lpmgr.h"
#include "qpid/legacystore/jrnl/rcvdat.h"
#include "qpid/legacystore/jrnl/slock.h"
#include "qpid/legacystore/jrnl/smutex.h"
#include "qpid/legacystore/jrnl/rmgr.h"
#include "qpid/legacystore/jrnl/wmgr.h"
#include "qpid/legacystore/jrnl/wrfc.h"

namespace mrg
{
namespace journal
{

    /**
    * \brief Access and control interface for the journal. This is the top-level class for the
    *     journal.
    *
    * This is the top-level journal class; one instance of this class controls one instance of the
    * journal and all its files and associated control structures. Besides this class, the only
    * other class that needs to be used at a higher level is the data_tok class, one instance of
    * which is used per data block written to the journal, and is used to track its status through
    * the AIO enqueue, read and dequeue process.
    */
    class jcntl
    {
    protected:
        /**
        * \brief Journal ID
        *
        * This string uniquely identifies this journal instance. It will most likely be associated
        * with the identity of the message queue with which it is associated.
        */
        // TODO: This is not included in any files at present, add to file_hdr?
        std::string _jid;

        /**
        * \brief Journal directory
        *
        * This string stores the path to the journal directory. It may be absolute or relative, and
        * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct,
        * "/fastdisk/jdata/" is not.)
        */
        jdir _jdir;

        /**
        * \brief Base filename
        *
        * This string contains the base filename used for the journal files. The filenames will
        * start with this base, and have various sections added to it to derive the final file names
        * that will be written to disk. No file separator characters should be included here, but
        * all other legal filename characters are valid.
        */
        std::string _base_filename;

        /**
        * \brief Initialized flag
        *
        * This flag starts out set to false, is set to true once this object has been initialized,
        * either by calling initialize() or recover().
        */
        bool _init_flag;

        /**
        * \brief Stopped flag
        *
        * This flag starts out false, and is set to true when stop() is called. At this point, the
        * journal will no longer accept messages until either initialize() or recover() is called.
        * There is no way other than through initialization to reset this flag.
        */
        // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
        // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
        // this period, however, no new enqueue/dequeue/read requests may be accepted.
        bool _stop_flag;

        /**
        * \brief Read-only state flag used during recover.
        *
        * When true, this flag prevents journal write operations (enqueue and dequeue), but
        * allows read to occur. It is used during recovery, and is reset when recovered() is
        * called.
        */
        bool _readonly_flag;

        /**
        * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low water
        *     marker. If not set, then attempts to write will throw exceptions until the journal
        *     file low water marker moves to the next journal file.
        */
        bool _autostop;             ///< Autostop flag - stops journal when overrun occurs

        // Journal control structures
        u_int32_t _jfsize_sblks;    ///< Journal file size in sblks
        lpmgr _lpmgr;               ///< LFID-PFID manager tracks inserted journal files
        enq_map _emap;              ///< Enqueue map for low water mark management
        txn_map _tmap;              ///< Transaction map open transactions
        rrfc _rrfc;                 ///< Read journal rotating file controller
        wrfc _wrfc;                 ///< Write journal rotating file controller
        rmgr _rmgr;                 ///< Read page manager which manages AIO
        wmgr _wmgr;                 ///< Write page manager which manages AIO
        rcvdat _rcvdat;             ///< Recovery data used for recovery
        smutex _wr_mutex;           ///< Mutex for journal writes

    public:
        static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns
        static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing

        /**
        * \brief Journal constructor.
        *
        * Constructor which sets the physical file location and base name.
        *
        * \param jid A unique identifier for this journal instance.
        * \param jdir The directory which will contain the journal files.
        * \param base_filename The string which will be used to start all journal filenames.
        */
        jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename);

        /**
        * \brief Destructor.
        */
        virtual ~jcntl();

        inline const std::string& id() const { return _jid; }
        inline const std::string& jrnl_dir() const { return _jdir.dirname(); }

        /**
        * \brief Initialize the journal for storing data.
        *
        * Initialize the journal by creating new journal data files and initializing internal
        * control structures. When complete, the journal will be empty, and ready to store data.
        *
        * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover
        * the data from an existing journal, use recover().
        *
        * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
        * and deleted.</b>
        *
        * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
        * used.</b>
        *
        * \param num_jfiles The number of journal files to be created.
        * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
        *     add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
        *     no files are added and an exception will be thrown if the journal runs out of file space.
        * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
        *     maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
        *     this number of files exist and the journal runs out of space, an exception will be thrown. This number
        *     must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
        *     single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
        * \param jfsize_sblks The size of each journal file expressed in softblocks.
        * \param wcache_num_pages The number of write cache pages to create.
        * \param wcache_pgsize_sblks The size in sblks of each write cache page.
        * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
        *
        * \exception TODO
        */
        void initialize(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles,
                const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
                aio_callback* const cbp);

        /**
        * /brief Initialize journal by recovering state from previously written journal.
        *
        * Initialize journal by recovering state from previously written journal. The journal files
        * are analyzed, and all records that have not been dequeued and that remain in the journal
        * will be available for reading. The journal is placed in a read-only state until
        * recovered() is called; any calls to enqueue or dequeue will fail with an exception
        * in this state.
        *
        * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created
        * and deleted.</b>
        *
        * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be
        * used.</b>
        *
        * \param num_jfiles The number of journal files to be created.
        * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically
        *     add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then
        *     no files are added and an exception will be thrown if the journal runs out of file space.
        * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the
        *     maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If
        *     this number of files exist and the journal runs out of space, an exception will be thrown. This number
        *     must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a
        *     single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled.
        * \param jfsize_sblks The size of each journal file expressed in softblocks.
        * \param wcache_num_pages The number of write cache pages to create.
        * \param wcache_pgsize_sblks The size in sblks of each write cache page.
        * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL).
        * \param prep_txn_list_ptr
        * \param highest_rid Returns the highest rid found in the journal during recover
        *
        * \exception TODO
        */
        void recover(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles,
                const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks,
                aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid);

        /**
        * \brief Notification to the journal that recovery is complete and that normal operation
        *     may resume.
        *
        * This call notifies the journal that recovery is complete and that normal operation
        * may resume. The read pointers are reset so that all records read as a part of recover
        * may  be re-read during normal operation. The read-only flag is then reset, allowing
        * enqueue and dequeue operations to resume.
        *
        * \exception TODO
        */
        void recover_complete();

        /**
        * \brief Stops journal and deletes all journal files.
        *
        * Clear the journal directory of all journal files matching the base filename.
        *
        * \exception TODO
        */
        void delete_jrnl_files();

        /**
        * \brief Enqueue data.
        *
        * Enqueue data or part thereof. If a large data block is being written, then it may be
        * enqueued in parts by setting this_data_len to the size of the data being written in this
        * call. The total data size must be known in advance, however, as this is written into the
        * record header on the first record write. The state of the write (i.e. how much has been
        * written so far) is maintained in the data token dtokp. Partial writes will return in state
        * ENQ_PART.
        *
        * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write
        * operation did not complete successfully or was partially completed. The action taken under
        * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT
        * implies that all pages in the write page cache are waiting for AIO operations to return,
        * and that the call should be remade after waiting a bit.
        *
        * Example: If a write of 99 kB is divided into three equal parts, then the following states
        * and returns would characterize a successful operation:
        * <pre>
        *                            dtok.    dtok.   dtok.
        * Pperation         Return   wstate() dsize() written() Comment
        * -----------------+--------+--------+-------+---------+------------------------------------
        *                            NONE     0       0         Value of dtok before op
        * edr(99000, 33000) SUCCESS  ENQ_PART 99000   33000     Enqueue part 1
        * edr(99000, 33000) AIO_WAIT ENQ_PART 99000   50000     Enqueue part 2, not completed
        * edr(99000, 33000) SUCCESS  ENQ_PART 99000   66000     Enqueue part 2 again
        * edr(99000, 33000) SUCCESS  ENQ      99000   99000     Enqueue part 3
        * </pre>
        *
        * \param data_buff Pointer to data to be enqueued for this enqueue operation.
        * \param tot_data_len Total data length.
        * \param this_data_len Amount to be written in this enqueue operation.
        * \param dtokp Pointer to data token which contains the details of the enqueue operation.
        * \param transient Flag indicating transient persistence (ie, ignored on recover).
        *
        * \exception TODO
        */
        iores enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
                const std::size_t this_data_len, data_tok* dtokp, const bool transient = false);

        iores enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp,
                const bool transient = false);

        /**
        * \brief Enqueue data.
        *
        * \param data_buff Pointer to data to be enqueued for this enqueue operation.
        * \param tot_data_len Total data length.
        * \param this_data_len Amount to be written in this enqueue operation.
        * \param dtokp Pointer to data token which contains the details of the enqueue operation.
        * \param xid String containing xid. An empty string (i.e. length=0) will be considered
        *     non-transactional.
        * \param transient Flag indicating transient persistence (ie, ignored on recover).
        *
        * \exception TODO
        */
        iores enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len,
                const std::size_t this_data_len, data_tok* dtokp, const std::string& xid,
                const bool transient = false);
        iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp,
                const std::string& xid, const bool transient = false);

        /* TODO
        **
        * \brief Retrieve details of next record to be read without consuming the record.
        *
        * Retrieve information about current read record. A pointer to the data is returned, along
        * with the data size and available data size. Data is considered "available" when the AIO
        * operations to fill page-cache pages from disk have returned, and is ready for consumption.
        *
        * If <i>dsize_avail</i> &lt; <i>dsize</i>, then not all of the data is available or part of
        * the data is in non-contiguous memory, and a subsequent call will update both the pointer
        * and <i>dsize_avail</i> if more pages have returned from AIO.
        *
        * The <i>dsize_avail</i> parameter will return the amount of data from this record that is
        * available in the page cache as contiguous memory, even if it spans page cache boundaries.
        * However, if a record spans the end of the page cache and continues at the beginning, even
        * if both parts are ready for consumption, then this must be divided into at least two
        * get_data_record() operations, as the data is contained in at least two non-contiguous
        * segments of the page cache.
        *
        * Once all the available data for a record is exposed, it can not be read again using
        * this function. It must be consumed prior to getting the next record. This can be done by
        * calling discard_data_record() or read_data_record(). However, if parameter
        * <i>auto_discard</i> is set to <b><i>true</i></b>, then this record will be automatically
        * consumed when the entire record has become available without having to explicitly call
        * discard_next_data_record() or read_data_record().
        *
        * If the current record is an open transactional record, then it cannot be read until it is
        * committed. If it is aborted, it can never be read. Under this condition, get_data_record()
        * will return RHM_IORES_TXPENDING, the data pointer will be set to NULL and all data
        * lengths will be set to 0.
        *
        * Example: Read a record of 30k. Assume a read page cache of 10 pages of size 10k starting
        * at address base_ptr (page0 = base_ptr, page1 = page_ptr+10k, etc.). The first 15k of
        * the record falls at the end of the page cache, the remaining 15k folded to the beginning.
        * The current page (page 8) containing 5k is available, the remaining pages which contain
        * this record are pending AIO return:
        * <pre>
        * call       dsize
        * no.  dsize avail data ptr     Return   Comment
        * ----+-----+-----+------------+--------+--------------------------------------------------
        * 1    30k   5k    base_ptr+85k SUCCESS  Initial call, read first 5k
        * 2    30k   0k    base_ptr+90k AIO_WAIT AIO still pending; no further pages avail
        * 3    30k   10k   base_ptr+90k SUCCESS  AIO now returned; now read till end of page cache
        * 4    30k   15k   base_ptr     SUCCESS  data_ptr now pointing to start of page cache
        * </pre>
        *
        * \param rid Reference that returns the record ID (rid)
        * \param dsize Reference that returns the total data size of the record data .
        * \param dsize_avail Reference that returns the amount of the data that is available for
        *     consumption.
        * \param data Pointer to data pointer which will point to the first byte of the next record
        *     data.
        * \param auto_discard If <b><i>true</i></b>, automatically discard the record being read if
        *     the entire record is available (i.e. dsize == dsize_avail). Otherwise
        *     discard_next_data_record() must be explicitly called.
        *
        * \exception TODO
        *
        // *** NOT YET IMPLEMENTED ***
        iores get_data_record(const u_int64_t& rid, const std::size_t& dsize,
                const std::size_t& dsize_avail, const void** const data, bool auto_discard = false);
        */

        /* TODO
        **
        * \brief Discard (skip) next record to be read without reading or retrieving it.
        *
        * \exception TODO
        *
        // *** NOT YET IMPLEMENTED ***
        iores discard_data_record(data_tok* const dtokp);
        */

        /**
        * \brief Reads data from the journal. It is the responsibility of the reader to free
        *     the memory that is allocated through this call - see below for details.
        *
        * Reads the next non-dequeued data record from the journal.
        *
        * <b>Note</b> that this call allocates memory into which the data and XID are copied. It
        * is the responsibility of the caller to free this memory. The memory for the data and
        * XID are allocated in a single call, and the XID precedes the data in the memory space.
        * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory.
        * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL,
        * and it is the data pointer datapp that must be freed. Should neither an XID nor data be
        * present (ie an empty record), then no memory is allocated, and both pointers will be NULL.
        * In this case, there is no need to free memory.
        *
        * TODO: Fix this lousy interface. The caller should NOT be required to clean up these
        * pointers! Rather use a struct, or better still, let the data token carry the data and
        * xid pointers and lengths, and have the data token both allocate and delete.
        *
        * \param datapp Pointer to pointer that will be set to point to memory allocated and
        *     containing the data. Will be set to NULL if the call fails or there is no data
        *     in the record.
        * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call
        *     fails or if there is no data in the record.
        * \param xidpp Pointer to pointer that will be set to point to memory allocated and
        *     containing the XID. Will be set to NULL if the call fails or there is no XID attached
        *     to this record.
        * \param xidsize Ref that will be set to the size of the XID.
        * \param transient Ref that will be set true if record is transient.
        * \param external Ref that will be set true if record is external. In this case, the data
        *     pointer datapp will be set to NULL, but dsize will contain the size of the data.
        *     NOTE: If there is an xid, then xidpp must be freed.
        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
        *     through journal.
        * \param ignore_pending_txns When false (default), if the next record to be read is locked
        *     by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set
        *     to true, then locks are ignored. This is required for reading of the Transaction
        *     Prepared List (TPL) which may have its entries locked, but may be read from
        *     time-to-time, and needs all its records (locked and unlocked) to be available.
        *
        * \exception TODO
        */
        iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp,
                std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp,
                bool ignore_pending_txns = false);

        /**
        * \brief Dequeues (marks as no longer needed) data record in journal.
        *
        * Dequeues (marks as no longer needed) data record in journal. Note that it is possible
        * to use the same data token instance used to enqueue this data; it contains the record ID
        * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the
        * record to be dequeued and the write state of ENQ must be manually set in a new or reset
        * instance of data_tok.
        *
        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
        *     through journal.
        * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
        *     prepared XID list items, sets whether the complete() was called in commit or abort
        *     mode.
        *
        * \exception TODO
        */
        iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false);

        /**
        * \brief Dequeues (marks as no longer needed) data record in journal.
        *
        * Dequeues (marks as no longer needed) data record in journal as part of a transaction.
        * Note that it is possible to use the same data token instance used to enqueue this data;
        * it contains the RID needed to correctly mark this data as dequeued in the journal.
        * Otherwise the RID of the record to be dequeued and the write state of ENQ must be
        * manually set in a new or reset instance of data_tok.
        *
        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
        *     through journal.
        * \param xid String containing xid. An empty string (i.e. length=0) will be considered
        *     non-transactional.
        * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing
        *     prepared XID list items, sets whether the complete() was called in commit or abort
        *     mode.
        *
        * \exception TODO
        */
        iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);

        /**
        * \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
        *
        * Abort the transaction for all records enqueued with the matching xid. All enqueued records
        * are effectively deleted from the journal, and can not be read. All dequeued records remain
        * as though they had never been dequeued.
        *
        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
        *     through journal.
        * \param xid String containing xid.
        *
        * \exception TODO
        */
        iores txn_abort(data_tok* const dtokp, const std::string& xid);

        /**
        * \brief Commit the transaction for all records enqueued or dequeued with the matching xid.
        *
        * Commit the transaction for all records enqueued with the matching xid. All enqueued
        * records are effectively released for reading and dequeueing. All dequeued records are
        * removed and can no longer be accessed.
        *
        * \param dtokp Pointer to data_tok instance for this data, used to track state of data
        *     through journal.
        * \param xid String containing xid.
        *
        * \exception TODO
        */
        iores txn_commit(data_tok* const dtokp, const std::string& xid);

        /**
        * \brief Check whether all the enqueue records for the given xid have reached disk.
        *
        * \param xid String containing xid.
        *
        * \exception TODO
        */
        bool is_txn_synced(const std::string& xid);

        /**
        * \brief Forces a check for returned AIO write events.
        *
        * Forces a check for returned AIO write events. This is normally performed by enqueue() and
        * dequeue() operations, but if these operations cease, then this call needs to be made to
        * force the processing of any outstanding AIO operations.
        */
        int32_t get_wr_events(timespec* const timeout);

        /**
        * \brief Forces a check for returned AIO read events.
        *
        * Forces a check for returned AIO read events. This is normally performed by read_data()
        * operations, but if these operations cease, then this call needs to be made to force the
        * processing of any outstanding AIO operations.
        */
        int32_t get_rd_events(timespec* const timeout);

        /**
        * \brief Stop the journal from accepting any further requests to read or write data.
        *
        * This operation is used to stop the journal. This is the normal mechanism for bringing the
        * journal to an orderly stop. Any outstanding AIO operations or partially written pages in
        * the write page cache will by flushed and will complete.
        *
        * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring
        *     it.
        *
        * \param block_till_aio_cmpl If true, will block the thread while waiting for all
        *     outstanding AIO operations to complete.
        */
        void stop(const bool block_till_aio_cmpl = false);

        /**
        * \brief Force a flush of the write page cache, creating a single AIO write operation.
        */
        iores flush(const bool block_till_aio_cmpl = false);

        inline u_int32_t get_enq_cnt() const { return _emap.size(); }

        inline u_int32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); }

        inline u_int32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); }

        inline u_int32_t get_wr_outstanding_aio_dblks() const
                { return _wrfc.aio_outstanding_dblks(); }

        inline u_int32_t get_wr_outstanding_aio_dblks(u_int16_t lfid) const
                { return _lpmgr.get_fcntlp(lfid)->wr_aio_outstanding_dblks(); }

        inline u_int32_t get_rd_outstanding_aio_dblks() const
                { return _rrfc.aio_outstanding_dblks(); }

        inline u_int32_t get_rd_outstanding_aio_dblks(u_int16_t lfid) const
                { return _lpmgr.get_fcntlp(lfid)->rd_aio_outstanding_dblks(); }

        inline u_int16_t get_rd_fid() const { return _rrfc.index(); }
        inline u_int16_t get_wr_fid() const { return _wrfc.index(); }
        u_int16_t get_earliest_fid();

        /**
        * \brief Check if a particular rid is enqueued. Note that this function will return
        *     false if the rid is transactionally enqueued and is not committed, or if it is
        *     locked (i.e. transactionally dequeued, but the dequeue has not been committed).
        */
        inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false)
                { return _emap.is_enqueued(rid, ignore_lock); }
        inline bool is_locked(const u_int64_t rid)
                { if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) return false; return _emap.is_locked(rid) == enq_map::EMAP_TRUE; }
        inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); }
        inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); }
        inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); }
        // TODO Make this a const, but txn_map must support const first.
        inline txn_map& get_txn_map() { return _tmap; }

        /**
        * \brief Check if the journal is stopped.
        *
        * \return <b><i>true</i></b> if the jouranl is stopped;
        *     <b><i>false</i></b> otherwise.
        */
        inline bool is_stopped() { return _stop_flag; }

        /**
        * \brief Check if the journal is ready to read and write data.
        *
        * Checks if the journal is ready to read and write data. This function will return
        * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop()
        * function has not been called since the initialization.
        *
        * Note that the journal may also be stopped if an internal error occurs (such as running out
        * of data journal file space).
        *
        * \return <b><i>true</i></b> if the journal is ready to read and write data;
        *     <b><i>false</i></b> otherwise.
        */
        inline bool is_ready() const { return _init_flag && !_stop_flag; }

        inline bool is_read_only() const { return _readonly_flag; }

        /**
        * \brief Get the journal directory.
        *
        * This returns the journal directory as set during initialization. This is the directory
        * into which the journal files will be written.
        */
        inline const std::string& dirname() const { return _jdir.dirname(); }

        /**
        * \brief Get the journal base filename.
        *
        * Get the journal base filename as set during initialization. This is the prefix used in all
        * journal files of this instance. Note that if more than one instance of the journal shares
        * the same directory, their base filenames <b>MUST</b> be different or else the instances
        * will overwrite one another.
        */
        inline const std::string& base_filename() const { return _base_filename; }

        inline u_int16_t num_jfiles() const { return _lpmgr.num_jfiles(); }

        inline fcntl* get_fcntlp(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid); }

        inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; }

        // Logging
        virtual void log(log_level level, const std::string& log_stmt) const;
        virtual void log(log_level level, const char* const log_stmt) const;

        // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr:
        void chk_wr_frot();
        inline u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); }
        void fhdr_wr_sync(const u_int16_t lid);
        inline u_int32_t wr_subm_cnt_dblks(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid)->wr_subm_cnt_dblks(); }

        // Management instrumentation callbacks
        inline virtual void instr_incr_outstanding_aio_cnt() {}
        inline virtual void instr_decr_outstanding_aio_cnt() {}

        /**
        * /brief Static function for creating new fcntl objects for use with obj_arr.
        */
        static fcntl* new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t fid, const rcvdat* const rdp);

    protected:
        static bool _init;
        static bool init_statics();

        /**
        * \brief Check status of journal before allowing write operations.
        */
        void check_wstatus(const char* fn_name) const;

        /**
        * \brief Check status of journal before allowing read operations.
        */
        void check_rstatus(const char* fn_name) const;

        /**
        * \brief Write info file &lt;basefilename&gt;.jinf to disk
        */
        void write_infofile() const;

        /**
        * \brief Call that blocks while waiting for all outstanding AIOs to complete
        */
        void aio_cmpl_wait();

        /**
        * \brief Call that blocks until at least one message returns; used to wait for
        *     AIO wait conditions to clear.
        */
        bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp);

        /**
        * \brief Analyze journal for recovery.
        */
        void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr);

        bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd);

        bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read,
                rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& rec_offset);

        bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd,
                const bool jump_fro);

        bool check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd,
                std::streampos& read_pos);

        void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset, rcvdat& rd);
    };

} // namespace journal
} // namespace mrg

#endif // ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H