diff options
Diffstat (limited to 'qpid/cpp/src/qpid/legacystore/jrnl/jcntl.h')
-rw-r--r-- | qpid/cpp/src/qpid/legacystore/jrnl/jcntl.h | 722 |
1 files changed, 722 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.h b/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.h new file mode 100644 index 0000000000..294e9ced05 --- /dev/null +++ b/qpid/cpp/src/qpid/legacystore/jrnl/jcntl.h @@ -0,0 +1,722 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * \file jcntl.h + * + * Qpid asynchronous store plugin library + * + * Messaging journal top-level control and interface class + * mrg::journal::jcntl. See class documentation for details. + * + * \author Kim van der Riet + */ + +#ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H +#define QPID_LEGACYSTORE_JRNL_JCNTL_H + +namespace mrg +{ +namespace journal +{ + class jcntl; +} +} + +#include <cstddef> +#include <deque> +#include "qpid/legacystore/jrnl/jdir.h" +#include "qpid/legacystore/jrnl/fcntl.h" +#include "qpid/legacystore/jrnl/lpmgr.h" +#include "qpid/legacystore/jrnl/rcvdat.h" +#include "qpid/legacystore/jrnl/slock.h" +#include "qpid/legacystore/jrnl/smutex.h" +#include "qpid/legacystore/jrnl/rmgr.h" +#include "qpid/legacystore/jrnl/wmgr.h" +#include "qpid/legacystore/jrnl/wrfc.h" + +namespace mrg +{ +namespace journal +{ + + /** + * \brief Access and control interface for the journal. This is the top-level class for the + * journal. + * + * This is the top-level journal class; one instance of this class controls one instance of the + * journal and all its files and associated control structures. Besides this class, the only + * other class that needs to be used at a higher level is the data_tok class, one instance of + * which is used per data block written to the journal, and is used to track its status through + * the AIO enqueue, read and dequeue process. + */ + class jcntl + { + protected: + /** + * \brief Journal ID + * + * This string uniquely identifies this journal instance. It will most likely be associated + * with the identity of the message queue with which it is associated. + */ + // TODO: This is not included in any files at present, add to file_hdr? + std::string _jid; + + /** + * \brief Journal directory + * + * This string stores the path to the journal directory. It may be absolute or relative, and + * should not end in a file separator character. (e.g. "/fastdisk/jdata" is correct, + * "/fastdisk/jdata/" is not.) + */ + jdir _jdir; + + /** + * \brief Base filename + * + * This string contains the base filename used for the journal files. The filenames will + * start with this base, and have various sections added to it to derive the final file names + * that will be written to disk. No file separator characters should be included here, but + * all other legal filename characters are valid. + */ + std::string _base_filename; + + /** + * \brief Initialized flag + * + * This flag starts out set to false, is set to true once this object has been initialized, + * either by calling initialize() or recover(). + */ + bool _init_flag; + + /** + * \brief Stopped flag + * + * This flag starts out false, and is set to true when stop() is called. At this point, the + * journal will no longer accept messages until either initialize() or recover() is called. + * There is no way other than through initialization to reset this flag. + */ + // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called, + // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During + // this period, however, no new enqueue/dequeue/read requests may be accepted. + bool _stop_flag; + + /** + * \brief Read-only state flag used during recover. + * + * When true, this flag prevents journal write operations (enqueue and dequeue), but + * allows read to occur. It is used during recovery, and is reset when recovered() is + * called. + */ + bool _readonly_flag; + + /** + * \brief If set, calls stop() if the jouranl write pointer overruns dequeue low water + * marker. If not set, then attempts to write will throw exceptions until the journal + * file low water marker moves to the next journal file. + */ + bool _autostop; ///< Autostop flag - stops journal when overrun occurs + + // Journal control structures + u_int32_t _jfsize_sblks; ///< Journal file size in sblks + lpmgr _lpmgr; ///< LFID-PFID manager tracks inserted journal files + enq_map _emap; ///< Enqueue map for low water mark management + txn_map _tmap; ///< Transaction map open transactions + rrfc _rrfc; ///< Read journal rotating file controller + wrfc _wrfc; ///< Write journal rotating file controller + rmgr _rmgr; ///< Read page manager which manages AIO + wmgr _wmgr; ///< Write page manager which manages AIO + rcvdat _rcvdat; ///< Recovery data used for recovery + smutex _wr_mutex; ///< Mutex for journal writes + + public: + static timespec _aio_cmpl_timeout; ///< Timeout for blocking libaio returns + static timespec _final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing + + /** + * \brief Journal constructor. + * + * Constructor which sets the physical file location and base name. + * + * \param jid A unique identifier for this journal instance. + * \param jdir The directory which will contain the journal files. + * \param base_filename The string which will be used to start all journal filenames. + */ + jcntl(const std::string& jid, const std::string& jdir, const std::string& base_filename); + + /** + * \brief Destructor. + */ + virtual ~jcntl(); + + inline const std::string& id() const { return _jid; } + inline const std::string& jrnl_dir() const { return _jdir.dirname(); } + + /** + * \brief Initialize the journal for storing data. + * + * Initialize the journal by creating new journal data files and initializing internal + * control structures. When complete, the journal will be empty, and ready to store data. + * + * <b>NOTE: Any existing journal will be ignored by this operation.</b> To use recover + * the data from an existing journal, use recover(). + * + * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created + * and deleted.</b> + * + * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be + * used.</b> + * + * \param num_jfiles The number of journal files to be created. + * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically + * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then + * no files are added and an exception will be thrown if the journal runs out of file space. + * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the + * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If + * this number of files exist and the journal runs out of space, an exception will be thrown. This number + * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a + * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled. + * \param jfsize_sblks The size of each journal file expressed in softblocks. + * \param wcache_num_pages The number of write cache pages to create. + * \param wcache_pgsize_sblks The size in sblks of each write cache page. + * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL). + * + * \exception TODO + */ + void initialize(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, + aio_callback* const cbp); + + /** + * /brief Initialize journal by recovering state from previously written journal. + * + * Initialize journal by recovering state from previously written journal. The journal files + * are analyzed, and all records that have not been dequeued and that remain in the journal + * will be available for reading. The journal is placed in a read-only state until + * recovered() is called; any calls to enqueue or dequeue will fail with an exception + * in this state. + * + * <b>NOTE: If <i>NULL</i> is passed to the deque pointers, they will be internally created + * and deleted.</b> + * + * <b>NOTE: If <i>NULL</i> is passed to the callbacks, internal default callbacks will be + * used.</b> + * + * \param num_jfiles The number of journal files to be created. + * \param auto_expand If true, allows journal file auto-expansion. In this mode, the journal will automatically + * add files to the journal if it runs out of space. No more than ae_max_jfiles may be added. If false, then + * no files are added and an exception will be thrown if the journal runs out of file space. + * \param ae_max_jfiles Upper limit of journal files for auto-expand mode. When auto_expand is true, this is the + * maximum total number of files allowed in the journal (original plus those added by auto-expand mode). If + * this number of files exist and the journal runs out of space, an exception will be thrown. This number + * must be greater than the num_jfiles parameter value but cannot exceed the maximum number of files for a + * single journal; if num_jfiles is already at its maximum value, then auto-expand will be disabled. + * \param jfsize_sblks The size of each journal file expressed in softblocks. + * \param wcache_num_pages The number of write cache pages to create. + * \param wcache_pgsize_sblks The size in sblks of each write cache page. + * \param cbp Pointer to object containing callback functions for read and write operations. May be 0 (NULL). + * \param prep_txn_list_ptr + * \param highest_rid Returns the highest rid found in the journal during recover + * + * \exception TODO + */ + void recover(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, + aio_callback* const cbp, const std::vector<std::string>* prep_txn_list_ptr, u_int64_t& highest_rid); + + /** + * \brief Notification to the journal that recovery is complete and that normal operation + * may resume. + * + * This call notifies the journal that recovery is complete and that normal operation + * may resume. The read pointers are reset so that all records read as a part of recover + * may be re-read during normal operation. The read-only flag is then reset, allowing + * enqueue and dequeue operations to resume. + * + * \exception TODO + */ + void recover_complete(); + + /** + * \brief Stops journal and deletes all journal files. + * + * Clear the journal directory of all journal files matching the base filename. + * + * \exception TODO + */ + void delete_jrnl_files(); + + /** + * \brief Enqueue data. + * + * Enqueue data or part thereof. If a large data block is being written, then it may be + * enqueued in parts by setting this_data_len to the size of the data being written in this + * call. The total data size must be known in advance, however, as this is written into the + * record header on the first record write. The state of the write (i.e. how much has been + * written so far) is maintained in the data token dtokp. Partial writes will return in state + * ENQ_PART. + * + * Note that a return value of anything other than RHM_IORES_SUCCESS implies that this write + * operation did not complete successfully or was partially completed. The action taken under + * these conditions depends on the value of the return. For example, RHM_IORES_AIO_WAIT + * implies that all pages in the write page cache are waiting for AIO operations to return, + * and that the call should be remade after waiting a bit. + * + * Example: If a write of 99 kB is divided into three equal parts, then the following states + * and returns would characterize a successful operation: + * <pre> + * dtok. dtok. dtok. + * Pperation Return wstate() dsize() written() Comment + * -----------------+--------+--------+-------+---------+------------------------------------ + * NONE 0 0 Value of dtok before op + * edr(99000, 33000) SUCCESS ENQ_PART 99000 33000 Enqueue part 1 + * edr(99000, 33000) AIO_WAIT ENQ_PART 99000 50000 Enqueue part 2, not completed + * edr(99000, 33000) SUCCESS ENQ_PART 99000 66000 Enqueue part 2 again + * edr(99000, 33000) SUCCESS ENQ 99000 99000 Enqueue part 3 + * </pre> + * + * \param data_buff Pointer to data to be enqueued for this enqueue operation. + * \param tot_data_len Total data length. + * \param this_data_len Amount to be written in this enqueue operation. + * \param dtokp Pointer to data token which contains the details of the enqueue operation. + * \param transient Flag indicating transient persistence (ie, ignored on recover). + * + * \exception TODO + */ + iores enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len, + const std::size_t this_data_len, data_tok* dtokp, const bool transient = false); + + iores enqueue_extern_data_record(const std::size_t tot_data_len, data_tok* dtokp, + const bool transient = false); + + /** + * \brief Enqueue data. + * + * \param data_buff Pointer to data to be enqueued for this enqueue operation. + * \param tot_data_len Total data length. + * \param this_data_len Amount to be written in this enqueue operation. + * \param dtokp Pointer to data token which contains the details of the enqueue operation. + * \param xid String containing xid. An empty string (i.e. length=0) will be considered + * non-transactional. + * \param transient Flag indicating transient persistence (ie, ignored on recover). + * + * \exception TODO + */ + iores enqueue_txn_data_record(const void* const data_buff, const std::size_t tot_data_len, + const std::size_t this_data_len, data_tok* dtokp, const std::string& xid, + const bool transient = false); + iores enqueue_extern_txn_data_record(const std::size_t tot_data_len, data_tok* dtokp, + const std::string& xid, const bool transient = false); + + /* TODO + ** + * \brief Retrieve details of next record to be read without consuming the record. + * + * Retrieve information about current read record. A pointer to the data is returned, along + * with the data size and available data size. Data is considered "available" when the AIO + * operations to fill page-cache pages from disk have returned, and is ready for consumption. + * + * If <i>dsize_avail</i> < <i>dsize</i>, then not all of the data is available or part of + * the data is in non-contiguous memory, and a subsequent call will update both the pointer + * and <i>dsize_avail</i> if more pages have returned from AIO. + * + * The <i>dsize_avail</i> parameter will return the amount of data from this record that is + * available in the page cache as contiguous memory, even if it spans page cache boundaries. + * However, if a record spans the end of the page cache and continues at the beginning, even + * if both parts are ready for consumption, then this must be divided into at least two + * get_data_record() operations, as the data is contained in at least two non-contiguous + * segments of the page cache. + * + * Once all the available data for a record is exposed, it can not be read again using + * this function. It must be consumed prior to getting the next record. This can be done by + * calling discard_data_record() or read_data_record(). However, if parameter + * <i>auto_discard</i> is set to <b><i>true</i></b>, then this record will be automatically + * consumed when the entire record has become available without having to explicitly call + * discard_next_data_record() or read_data_record(). + * + * If the current record is an open transactional record, then it cannot be read until it is + * committed. If it is aborted, it can never be read. Under this condition, get_data_record() + * will return RHM_IORES_TXPENDING, the data pointer will be set to NULL and all data + * lengths will be set to 0. + * + * Example: Read a record of 30k. Assume a read page cache of 10 pages of size 10k starting + * at address base_ptr (page0 = base_ptr, page1 = page_ptr+10k, etc.). The first 15k of + * the record falls at the end of the page cache, the remaining 15k folded to the beginning. + * The current page (page 8) containing 5k is available, the remaining pages which contain + * this record are pending AIO return: + * <pre> + * call dsize + * no. dsize avail data ptr Return Comment + * ----+-----+-----+------------+--------+-------------------------------------------------- + * 1 30k 5k base_ptr+85k SUCCESS Initial call, read first 5k + * 2 30k 0k base_ptr+90k AIO_WAIT AIO still pending; no further pages avail + * 3 30k 10k base_ptr+90k SUCCESS AIO now returned; now read till end of page cache + * 4 30k 15k base_ptr SUCCESS data_ptr now pointing to start of page cache + * </pre> + * + * \param rid Reference that returns the record ID (rid) + * \param dsize Reference that returns the total data size of the record data . + * \param dsize_avail Reference that returns the amount of the data that is available for + * consumption. + * \param data Pointer to data pointer which will point to the first byte of the next record + * data. + * \param auto_discard If <b><i>true</i></b>, automatically discard the record being read if + * the entire record is available (i.e. dsize == dsize_avail). Otherwise + * discard_next_data_record() must be explicitly called. + * + * \exception TODO + * + // *** NOT YET IMPLEMENTED *** + iores get_data_record(const u_int64_t& rid, const std::size_t& dsize, + const std::size_t& dsize_avail, const void** const data, bool auto_discard = false); + */ + + /* TODO + ** + * \brief Discard (skip) next record to be read without reading or retrieving it. + * + * \exception TODO + * + // *** NOT YET IMPLEMENTED *** + iores discard_data_record(data_tok* const dtokp); + */ + + /** + * \brief Reads data from the journal. It is the responsibility of the reader to free + * the memory that is allocated through this call - see below for details. + * + * Reads the next non-dequeued data record from the journal. + * + * <b>Note</b> that this call allocates memory into which the data and XID are copied. It + * is the responsibility of the caller to free this memory. The memory for the data and + * XID are allocated in a single call, and the XID precedes the data in the memory space. + * Thus, where an XID exists, freeing the XID pointer will free both the XID and data memory. + * However, if an XID does not exist for the message, the XID pointer xidpp is set to NULL, + * and it is the data pointer datapp that must be freed. Should neither an XID nor data be + * present (ie an empty record), then no memory is allocated, and both pointers will be NULL. + * In this case, there is no need to free memory. + * + * TODO: Fix this lousy interface. The caller should NOT be required to clean up these + * pointers! Rather use a struct, or better still, let the data token carry the data and + * xid pointers and lengths, and have the data token both allocate and delete. + * + * \param datapp Pointer to pointer that will be set to point to memory allocated and + * containing the data. Will be set to NULL if the call fails or there is no data + * in the record. + * \param dsize Ref that will be set to the size of the data. Will be set to 0 if the call + * fails or if there is no data in the record. + * \param xidpp Pointer to pointer that will be set to point to memory allocated and + * containing the XID. Will be set to NULL if the call fails or there is no XID attached + * to this record. + * \param xidsize Ref that will be set to the size of the XID. + * \param transient Ref that will be set true if record is transient. + * \param external Ref that will be set true if record is external. In this case, the data + * pointer datapp will be set to NULL, but dsize will contain the size of the data. + * NOTE: If there is an xid, then xidpp must be freed. + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param ignore_pending_txns When false (default), if the next record to be read is locked + * by a pending transaction, the read fails with RHM_IORES_TXPENDING. However, if set + * to true, then locks are ignored. This is required for reading of the Transaction + * Prepared List (TPL) which may have its entries locked, but may be read from + * time-to-time, and needs all its records (locked and unlocked) to be available. + * + * \exception TODO + */ + iores read_data_record(void** const datapp, std::size_t& dsize, void** const xidpp, + std::size_t& xidsize, bool& transient, bool& external, data_tok* const dtokp, + bool ignore_pending_txns = false); + + /** + * \brief Dequeues (marks as no longer needed) data record in journal. + * + * Dequeues (marks as no longer needed) data record in journal. Note that it is possible + * to use the same data token instance used to enqueue this data; it contains the record ID + * needed to correctly mark this data as dequeued in the journal. Otherwise the RID of the + * record to be dequeued and the write state of ENQ must be manually set in a new or reset + * instance of data_tok. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing + * prepared XID list items, sets whether the complete() was called in commit or abort + * mode. + * + * \exception TODO + */ + iores dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit = false); + + /** + * \brief Dequeues (marks as no longer needed) data record in journal. + * + * Dequeues (marks as no longer needed) data record in journal as part of a transaction. + * Note that it is possible to use the same data token instance used to enqueue this data; + * it contains the RID needed to correctly mark this data as dequeued in the journal. + * Otherwise the RID of the record to be dequeued and the write state of ENQ must be + * manually set in a new or reset instance of data_tok. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param xid String containing xid. An empty string (i.e. length=0) will be considered + * non-transactional. + * \param txn_coml_commit Only used for preparedXID journal. When used for dequeueing + * prepared XID list items, sets whether the complete() was called in commit or abort + * mode. + * + * \exception TODO + */ + iores dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); + + /** + * \brief Abort the transaction for all records enqueued or dequeued with the matching xid. + * + * Abort the transaction for all records enqueued with the matching xid. All enqueued records + * are effectively deleted from the journal, and can not be read. All dequeued records remain + * as though they had never been dequeued. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param xid String containing xid. + * + * \exception TODO + */ + iores txn_abort(data_tok* const dtokp, const std::string& xid); + + /** + * \brief Commit the transaction for all records enqueued or dequeued with the matching xid. + * + * Commit the transaction for all records enqueued with the matching xid. All enqueued + * records are effectively released for reading and dequeueing. All dequeued records are + * removed and can no longer be accessed. + * + * \param dtokp Pointer to data_tok instance for this data, used to track state of data + * through journal. + * \param xid String containing xid. + * + * \exception TODO + */ + iores txn_commit(data_tok* const dtokp, const std::string& xid); + + /** + * \brief Check whether all the enqueue records for the given xid have reached disk. + * + * \param xid String containing xid. + * + * \exception TODO + */ + bool is_txn_synced(const std::string& xid); + + /** + * \brief Forces a check for returned AIO write events. + * + * Forces a check for returned AIO write events. This is normally performed by enqueue() and + * dequeue() operations, but if these operations cease, then this call needs to be made to + * force the processing of any outstanding AIO operations. + */ + int32_t get_wr_events(timespec* const timeout); + + /** + * \brief Forces a check for returned AIO read events. + * + * Forces a check for returned AIO read events. This is normally performed by read_data() + * operations, but if these operations cease, then this call needs to be made to force the + * processing of any outstanding AIO operations. + */ + int32_t get_rd_events(timespec* const timeout); + + /** + * \brief Stop the journal from accepting any further requests to read or write data. + * + * This operation is used to stop the journal. This is the normal mechanism for bringing the + * journal to an orderly stop. Any outstanding AIO operations or partially written pages in + * the write page cache will by flushed and will complete. + * + * <b>Note:</b> The journal cannot be restarted without either initializing it or restoring + * it. + * + * \param block_till_aio_cmpl If true, will block the thread while waiting for all + * outstanding AIO operations to complete. + */ + void stop(const bool block_till_aio_cmpl = false); + + /** + * \brief Force a flush of the write page cache, creating a single AIO write operation. + */ + iores flush(const bool block_till_aio_cmpl = false); + + inline u_int32_t get_enq_cnt() const { return _emap.size(); } + + inline u_int32_t get_wr_aio_evt_rem() const { slock l(_wr_mutex); return _wmgr.get_aio_evt_rem(); } + + inline u_int32_t get_rd_aio_evt_rem() const { return _rmgr.get_aio_evt_rem(); } + + inline u_int32_t get_wr_outstanding_aio_dblks() const + { return _wrfc.aio_outstanding_dblks(); } + + inline u_int32_t get_wr_outstanding_aio_dblks(u_int16_t lfid) const + { return _lpmgr.get_fcntlp(lfid)->wr_aio_outstanding_dblks(); } + + inline u_int32_t get_rd_outstanding_aio_dblks() const + { return _rrfc.aio_outstanding_dblks(); } + + inline u_int32_t get_rd_outstanding_aio_dblks(u_int16_t lfid) const + { return _lpmgr.get_fcntlp(lfid)->rd_aio_outstanding_dblks(); } + + inline u_int16_t get_rd_fid() const { return _rrfc.index(); } + inline u_int16_t get_wr_fid() const { return _wrfc.index(); } + u_int16_t get_earliest_fid(); + + /** + * \brief Check if a particular rid is enqueued. Note that this function will return + * false if the rid is transactionally enqueued and is not committed, or if it is + * locked (i.e. transactionally dequeued, but the dequeue has not been committed). + */ + inline bool is_enqueued(const u_int64_t rid, bool ignore_lock = false) + { return _emap.is_enqueued(rid, ignore_lock); } + inline bool is_locked(const u_int64_t rid) + { if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK) return false; return _emap.is_locked(rid) == enq_map::EMAP_TRUE; } + inline void enq_rid_list(std::vector<u_int64_t>& rids) { _emap.rid_list(rids); } + inline void enq_xid_list(std::vector<std::string>& xids) { _tmap.xid_list(xids); } + inline u_int32_t get_open_txn_cnt() const { return _tmap.size(); } + // TODO Make this a const, but txn_map must support const first. + inline txn_map& get_txn_map() { return _tmap; } + + /** + * \brief Check if the journal is stopped. + * + * \return <b><i>true</i></b> if the jouranl is stopped; + * <b><i>false</i></b> otherwise. + */ + inline bool is_stopped() { return _stop_flag; } + + /** + * \brief Check if the journal is ready to read and write data. + * + * Checks if the journal is ready to read and write data. This function will return + * <b><i>true</i></b> if the journal has been either initialized or restored, and the stop() + * function has not been called since the initialization. + * + * Note that the journal may also be stopped if an internal error occurs (such as running out + * of data journal file space). + * + * \return <b><i>true</i></b> if the journal is ready to read and write data; + * <b><i>false</i></b> otherwise. + */ + inline bool is_ready() const { return _init_flag && !_stop_flag; } + + inline bool is_read_only() const { return _readonly_flag; } + + /** + * \brief Get the journal directory. + * + * This returns the journal directory as set during initialization. This is the directory + * into which the journal files will be written. + */ + inline const std::string& dirname() const { return _jdir.dirname(); } + + /** + * \brief Get the journal base filename. + * + * Get the journal base filename as set during initialization. This is the prefix used in all + * journal files of this instance. Note that if more than one instance of the journal shares + * the same directory, their base filenames <b>MUST</b> be different or else the instances + * will overwrite one another. + */ + inline const std::string& base_filename() const { return _base_filename; } + + inline u_int16_t num_jfiles() const { return _lpmgr.num_jfiles(); } + + inline fcntl* get_fcntlp(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid); } + + inline u_int32_t jfsize_sblks() const { return _jfsize_sblks; } + + // Logging + virtual void log(log_level level, const std::string& log_stmt) const; + virtual void log(log_level level, const char* const log_stmt) const; + + // FIXME these are _rmgr to _wmgr interactions, remove when _rmgr contains ref to _wmgr: + void chk_wr_frot(); + inline u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); } + void fhdr_wr_sync(const u_int16_t lid); + inline u_int32_t wr_subm_cnt_dblks(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid)->wr_subm_cnt_dblks(); } + + // Management instrumentation callbacks + inline virtual void instr_incr_outstanding_aio_cnt() {} + inline virtual void instr_decr_outstanding_aio_cnt() {} + + /** + * /brief Static function for creating new fcntl objects for use with obj_arr. + */ + static fcntl* new_fcntl(jcntl* const jcp, const u_int16_t lid, const u_int16_t fid, const rcvdat* const rdp); + + protected: + static bool _init; + static bool init_statics(); + + /** + * \brief Check status of journal before allowing write operations. + */ + void check_wstatus(const char* fn_name) const; + + /** + * \brief Check status of journal before allowing read operations. + */ + void check_rstatus(const char* fn_name) const; + + /** + * \brief Write info file <basefilename>.jinf to disk + */ + void write_infofile() const; + + /** + * \brief Call that blocks while waiting for all outstanding AIOs to complete + */ + void aio_cmpl_wait(); + + /** + * \brief Call that blocks until at least one message returns; used to wait for + * AIO wait conditions to clear. + */ + bool handle_aio_wait(const iores res, iores& resout, const data_tok* dtp); + + /** + * \brief Analyze journal for recovery. + */ + void rcvr_janalyze(rcvdat& rd, const std::vector<std::string>* prep_txn_list_ptr); + + bool rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd); + + bool decode(jrec& rec, u_int16_t& fid, std::ifstream* ifsp, std::size_t& cum_size_read, + rec_hdr& h, bool& lowi, rcvdat& rd, std::streampos& rec_offset); + + bool jfile_cycle(u_int16_t& fid, std::ifstream* ifsp, bool& lowi, rcvdat& rd, + const bool jump_fro); + + bool check_owi(const u_int16_t fid, rec_hdr& h, bool& lowi, rcvdat& rd, + std::streampos& read_pos); + + void check_journal_alignment(const u_int16_t fid, std::streampos& rec_offset, rcvdat& rd); + }; + +} // namespace journal +} // namespace mrg + +#endif // ifndef QPID_LEGACYSTORE_JRNL_JCNTL_H |