diff options
Diffstat (limited to 'qpid/cpp/src/qpid/legacystore/JournalImpl.h')
-rw-r--r-- | qpid/cpp/src/qpid/legacystore/JournalImpl.h | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/legacystore/JournalImpl.h b/qpid/cpp/src/qpid/legacystore/JournalImpl.h new file mode 100644 index 0000000000..7227b2ffd4 --- /dev/null +++ b/qpid/cpp/src/qpid/legacystore/JournalImpl.h @@ -0,0 +1,265 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef QPID_LEGACYSTORE_JOURNALIMPL_H +#define QPID_LEGACYSTORE_JOURNALIMPL_H + +#include <set> +#include "qpid/legacystore/jrnl/enums.h" +#include "qpid/legacystore/jrnl/jcntl.h" +#include "qpid/legacystore/DataTokenImpl.h" +#include "qpid/legacystore/PreparedTransaction.h" +#include "qpid/broker/PersistableQueue.h" +#include "qpid/sys/Timer.h" +#include "qpid/sys/Time.h" +#include <boost/ptr_container/ptr_list.hpp> +#include <boost/intrusive_ptr.hpp> +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/legacystore/Journal.h" + +namespace qpid { namespace sys { +class Timer; +}} + +namespace mrg { +namespace msgstore { + +class JournalImpl; + +class InactivityFireEvent : public qpid::sys::TimerTask +{ + JournalImpl* _parent; + qpid::sys::Mutex _ife_lock; + + public: + InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout); + virtual ~InactivityFireEvent() {} + void fire(); + inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; } +}; + +class GetEventsFireEvent : public qpid::sys::TimerTask +{ + JournalImpl* _parent; + qpid::sys::Mutex _gefe_lock; + + public: + GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout); + virtual ~GetEventsFireEvent() {} + void fire(); + inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } +}; + +class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback +{ + public: + typedef boost::function<void (JournalImpl&)> DeleteCallback; + + private: +// static qpid::sys::Mutex _static_lock; +// static u_int32_t cnt; + + qpid::sys::Timer& timer; + bool getEventsTimerSetFlag; + boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr; + qpid::sys::Mutex _getf_lock; + qpid::sys::Mutex _read_lock; + + u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests + std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence + + bool writeActivityFlag; + bool flushTriggeredFlag; + boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr; + + // temp local vars for loadMsgContent below + void* _xidp; + void* _datap; + size_t _dlen; + mrg::journal::data_tok _dtok; + bool _external; + + qpid::management::ManagementAgent* _agent; + qmf::org::apache::qpid::legacystore::Journal::shared_ptr _mgmtObject; + DeleteCallback deleteCallback; + + public: + + JournalImpl(qpid::sys::Timer& timer, + const std::string& journalId, + const std::string& journalDirectory, + const std::string& journalBaseFilename, + const qpid::sys::Duration getEventsTimeout, + const qpid::sys::Duration flushTimeout, + qpid::management::ManagementAgent* agent, + DeleteCallback deleteCallback=DeleteCallback() ); + + virtual ~JournalImpl(); + + void initManagement(qpid::management::ManagementAgent* agent); + + void initialize(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + mrg::journal::aio_callback* const cbp); + + inline void initialize(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks) { + initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + this); + } + + void recover(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + mrg::journal::aio_callback* const cbp, + boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr, + u_int64_t& highest_rid, + u_int64_t queue_id); + + inline void recover(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr, + u_int64_t& highest_rid, + u_int64_t queue_id) { + recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + this, prep_tx_list_ptr, highest_rid, queue_id); + } + + void recover_complete(); + + // Temporary fn to read and save last msg read from journal so it can be assigned + // in chunks. To be replaced when coding to do this direct from the journal is ready. + // Returns true if the record is extern, false if local. + bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0); + + // Overrides for write inactivity timer + void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, + const size_t this_data_len, mrg::journal::data_tok* dtokp, + const bool transient = false); + + void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp, + const bool transient = false); + + void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, + const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid, + const bool transient = false); + + void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp, + const std::string& xid, const bool transient = false); + + void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false); + + void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); + + void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid); + + void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid); + + void stop(bool block_till_aio_cmpl = false); + + // Logging + void log(mrg::journal::log_level level, const std::string& log_stmt) const; + void log(mrg::journal::log_level level, const char* const log_stmt) const; + + // Overrides for get_events timer + mrg::journal::iores flush(const bool block_till_aio_cmpl = false); + + // TimerTask callback + void getEventsFire(); + void flushFire(); + + // AIO callbacks + virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl); + virtual void rd_aio_cb(std::vector<u_int16_t>& pil); + + qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const + { return _mgmtObject; } + + qpid::management::Manageable::status_t ManagementMethod (uint32_t, + qpid::management::Args&, + std::string&); + + void resetDeleteCallback() { deleteCallback = DeleteCallback(); } + + private: + void free_read_buffers(); + + inline void setGetEventTimer() + { + getEventsFireEventsPtr->setupNextFire(); + timer.add(getEventsFireEventsPtr); + getEventsTimerSetFlag = true; + } + void handleIoResult(const mrg::journal::iores r); + + // Management instrumentation callbacks overridden from jcntl + inline void instr_incr_outstanding_aio_cnt() { + if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs(); + } + inline void instr_decr_outstanding_aio_cnt() { + if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs(); + } + +}; // class JournalImpl + +class TplJournalImpl : public JournalImpl +{ + public: + TplJournalImpl(qpid::sys::Timer& timer, + const std::string& journalId, + const std::string& journalDirectory, + const std::string& journalBaseFilename, + const qpid::sys::Duration getEventsTimeout, + const qpid::sys::Duration flushTimeout, + qpid::management::ManagementAgent* agent) : + JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) + {} + + virtual ~TplJournalImpl() {} + + // Special version of read_data_record that ignores transactions - needed when reading the TPL + inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize, + void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, + mrg::journal::data_tok* const dtokp) { + return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); + } + inline void read_reset() { _rmgr.invalidate(); } +}; // class TplJournalImpl + +} // namespace msgstore +} // namespace mrg + +#endif // ifndef QPID_LEGACYSTORE_JOURNALIMPL_H |