/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/legacystore/JournalImpl.h" #include "qpid/legacystore/jrnl/jerrno.h" #include "qpid/legacystore/jrnl/jexception.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/legacystore/ArgsJournalExpand.h" #include "qmf/org/apache/qpid/legacystore/EventCreated.h" #include "qmf/org/apache/qpid/legacystore/EventEnqThresholdExceeded.h" #include "qmf/org/apache/qpid/legacystore/EventFull.h" #include "qmf/org/apache/qpid/legacystore/EventRecovered.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" #include "qpid/legacystore/StoreException.h" using namespace mrg::msgstore; using namespace mrg::journal; using qpid::management::ManagementAgent; namespace _qmf = qmf::org::apache::qpid::legacystore; InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {} void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); } GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {} void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); } JournalImpl::JournalImpl(qpid::sys::Timer& timer_, const std::string& journalId, const std::string& journalDirectory, const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* a, DeleteCallback onDelete): jcntl(journalId, journalDirectory, journalBaseFilename), timer(timer_), getEventsTimerSetFlag(false), lastReadRid(0), writeActivityFlag(false), flushTriggeredFlag(true), _xidp(0), _datap(0), _dlen(0), _dtok(), _external(false), deleteCallback(onDelete) { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout); { timer.start(); timer.add(inactivityFireEventPtr); } initManagement(a); log(LOG_NOTICE, "Created"); std::ostringstream oss; oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\""; log(LOG_DEBUG, oss.str()); } JournalImpl::~JournalImpl() { if (deleteCallback) deleteCallback(*this); if (_init_flag && !_stop_flag){ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! catch (const jexception& e) { log(LOG_ERROR, e.what()); } } getEventsFireEventsPtr->cancel(); inactivityFireEventPtr->cancel(); free_read_buffers(); if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); _mgmtObject.reset(); } log(LOG_NOTICE, "Destroyed"); } void JournalImpl::initManagement(qpid::management::ManagementAgent* a) { _agent = a; if (_agent != 0) { _mgmtObject = _qmf::Journal::shared_ptr ( new _qmf::Journal(_agent, this)); _mgmtObject->set_name(_jid); _mgmtObject->set_directory(_jdir.dirname()); _mgmtObject->set_baseFileName(_base_filename); _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); _mgmtObject->set_readPages(JRNL_RMGR_PAGES); // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime _mgmtObject->set_initialFileCount(0); _mgmtObject->set_dataFileSize(0); _mgmtObject->set_currentFileCount(0); _mgmtObject->set_writePageSize(0); _mgmtObject->set_writePages(0); _agent->addObject(_mgmtObject, 0, true); } } void JournalImpl::initialize(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, mrg::journal::aio_callback* const cbp) { std::ostringstream oss; oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; oss << " wcache_num_pages=" << wcache_num_pages; log(LOG_DEBUG, oss.str()); jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp); log(LOG_DEBUG, "Initialization complete"); if (_mgmtObject.get() != 0) { _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_autoExpand(_lpmgr.is_ae()); _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); _mgmtObject->set_writePages(wcache_num_pages); } if (_agent != 0) _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()), qpid::management::ManagementAgent::SEV_NOTE); } void JournalImpl::recover(const u_int16_t num_jfiles, const bool auto_expand, const u_int16_t ae_max_jfiles, const u_int32_t jfsize_sblks, const u_int16_t wcache_num_pages, const u_int32_t wcache_pgsize_sblks, mrg::journal::aio_callback* const cbp, boost::ptr_list* prep_tx_list_ptr, u_int64_t& highest_rid, u_int64_t queue_id) { std::ostringstream oss1; oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec; oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; oss1 << " wcache_num_pages=" << wcache_num_pages; log(LOG_DEBUG, oss1.str()); if (_mgmtObject.get() != 0) { _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_autoExpand(_lpmgr.is_ae()); _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); _mgmtObject->set_writePages(wcache_num_pages); } if (prep_tx_list_ptr) { // Create list of prepared xids std::vector prep_xid_list; for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { prep_xid_list.push_back(i->xid); } jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid); } else { jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid); } // Populate PreparedTransaction lists from _tmap if (prep_tx_list_ptr) { for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { if (tdl_itr->_enq_flag) { // enqueue op i->enqueues->add(queue_id, tdl_itr->_rid); } else { // dequeue op i->dequeues->add(queue_id, tdl_itr->_drid); } } } } std::ostringstream oss2; oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid; oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size(); oss2 << "; journal now read-only."; log(LOG_DEBUG, oss2.str()); if (_mgmtObject.get() != 0) { _mgmtObject->inc_recordDepth(_emap.size()); _mgmtObject->inc_enqueues(_emap.size()); _mgmtObject->inc_txn(_tmap.size()); _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt()); _mgmtObject->inc_txnDequeues(_tmap.deq_cnt()); } } void JournalImpl::recover_complete() { jcntl::recover_complete(); log(LOG_DEBUG, "Recover phase 2 complete; journal now writable."); if (_agent != 0) _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(), _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE); } //#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec //#define AIO_SLEEP_TIME_US 10 // 0.01 ms // Return true if content is recovered from store; false if content is external and must be recovered from an external store. // Throw exception for all errors. bool JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset) { qpid::sys::Mutex::ScopedLock sl(_read_lock); if (_dtok.rid() != rid) { // Free any previous msg free_read_buffers(); // Last read encountered out-of-order rids, check if this rid is in that list bool oooFlag = false; for (std::vector::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) { if (*i == rid) { oooFlag = true; } } // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering // jumpover points and allow the read to jump back to the first known jumpover point - but this needs // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a // combination of lid/offset. // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing. if (oooFlag || rid < lastReadRid) { _rmgr.invalidate(); oooRidList.clear(); } _dlen = 0; _dtok.reset(); _dtok.set_wstate(DataTokenImpl::ENQ); _dtok.set_rid(0); _external = false; size_t xlen = 0; bool transient = false; bool done = false; bool rid_found = false; while (!done) { iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok); switch (res) { case mrg::journal::RHM_IORES_SUCCESS: if (_dtok.rid() != rid) { // Check if this is an out-of-order rid that may impact next read if (_dtok.rid() > rid) oooRidList.push_back(_dtok.rid()); free_read_buffers(); // Reset data token for next read _dlen = 0; _dtok.reset(); _dtok.set_wstate(DataTokenImpl::ENQ); _dtok.set_rid(0); } else { rid_found = _dtok.rid() == rid; lastReadRid = rid; done = true; } break; case mrg::journal::RHM_IORES_PAGE_AIOWAIT: if (get_wr_events(&_aio_cmpl_timeout) == journal::jerrno::AIO_TIMEOUT) { std::stringstream ss; ss << "read_data_record() returned " << mrg::journal::iores_str(res); ss << "; timed out waiting for page to be processed."; throw jexception(mrg::journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl", "loadMsgContent"); } break; default: std::stringstream ss; ss << "read_data_record() returned " << mrg::journal::iores_str(res); throw jexception(mrg::journal::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl", "loadMsgContent"); } } if (!rid_found) { std::stringstream ss; ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec; ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec; ss << " (" << _dtok.rid() << ")"; throw jexception(mrg::journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent"); } } if (_external) return false; u_int32_t hdr_offs = qpid::framing::Buffer(static_cast(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t); if (hdr_offs + offset + length > _dlen) { data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset); } else { data.append((const char*)_datap + hdr_offs + offset, length); } return true; } void JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, const size_t this_data_len, data_tok* dtokp, const bool transient) { handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient)); if (_mgmtObject.get() != 0) { _mgmtObject->inc_enqueues(); _mgmtObject->inc_recordDepth(); } } void JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, const bool transient) { handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)); if (_mgmtObject.get() != 0) { _mgmtObject->inc_enqueues(); _mgmtObject->inc_recordDepth(); } } void JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient)); if (_mgmtObject.get() != 0) { if (!txn_incr) // If this xid was not in _tmap, it will be now... _mgmtObject->inc_txn(); _mgmtObject->inc_enqueues(); _mgmtObject->inc_txnEnqueues(); _mgmtObject->inc_recordDepth(); } } void JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp, const std::string& xid, const bool transient) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient)); if (_mgmtObject.get() != 0) { if (!txn_incr) // If this xid was not in _tmap, it will be now... _mgmtObject->inc_txn(); _mgmtObject->inc_enqueues(); _mgmtObject->inc_txnEnqueues(); _mgmtObject->inc_recordDepth(); } } void JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit) { handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit)); if (_mgmtObject.get() != 0) { _mgmtObject->inc_dequeues(); _mgmtObject->inc_txnDequeues(); _mgmtObject->dec_recordDepth(); } } void JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit) { bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit)); if (_mgmtObject.get() != 0) { if (!txn_incr) // If this xid was not in _tmap, it will be now... _mgmtObject->inc_txn(); _mgmtObject->inc_dequeues(); _mgmtObject->inc_txnDequeues(); _mgmtObject->dec_recordDepth(); } } void JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid) { handleIoResult(jcntl::txn_abort(dtokp, xid)); if (_mgmtObject.get() != 0) { _mgmtObject->dec_txn(); _mgmtObject->inc_txnAborts(); } } void JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid) { handleIoResult(jcntl::txn_commit(dtokp, xid)); if (_mgmtObject.get() != 0) { _mgmtObject->dec_txn(); _mgmtObject->inc_txnCommits(); } } void JournalImpl::stop(bool block_till_aio_cmpl) { InactivityFireEvent* ifep = dynamic_cast(inactivityFireEventPtr.get()); assert(ifep); // dynamic_cast can return null if the cast fails ifep->cancel(); jcntl::stop(block_till_aio_cmpl); if (_mgmtObject.get() != 0) { _mgmtObject->resourceDestroy(); _mgmtObject.reset(); } } iores JournalImpl::flush(const bool block_till_aio_cmpl) { const iores res = jcntl::flush(block_till_aio_cmpl); { qpid::sys::Mutex::ScopedLock sl(_getf_lock); if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); } } return res; } void JournalImpl::log(mrg::journal::log_level ll, const std::string& log_stmt) const { log(ll, log_stmt.c_str()); } void JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const { switch (ll) { case LOG_TRACE: QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break; case LOG_DEBUG: QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break; case LOG_INFO: QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break; case LOG_NOTICE: QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break; case LOG_WARN: QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break; case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break; case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break; } } void JournalImpl::getEventsFire() { qpid::sys::Mutex::ScopedLock sl(_getf_lock); getEventsTimerSetFlag = false; if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); } if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); } } void JournalImpl::flushFire() { if (writeActivityFlag) { writeActivityFlag = false; flushTriggeredFlag = false; } else { if (!flushTriggeredFlag) { flush(); flushTriggeredFlag = true; } } inactivityFireEventPtr->setupNextFire(); { timer.add(inactivityFireEventPtr); } } void JournalImpl::wr_aio_cb(std::vector& dtokl) { for (std::vector::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) { DataTokenImpl* dtokp = static_cast(*i); if (/*!is_stopped() &&*/ dtokp->getSourceMessage()) { switch (dtokp->wstate()) { case data_tok::ENQ: dtokp->getSourceMessage()->enqueueComplete(); break; case data_tok::DEQ: /* Don't need to signal until we have a way to ack completion of dequeue in AMQP dtokp->getSourceMessage()->dequeueComplete(); if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue dtokp->getSourceMessage()->setPersistenceId(0); */ break; default: ; } } dtokp->release(); } } void JournalImpl::rd_aio_cb(std::vector& /*pil*/) {} void JournalImpl::free_read_buffers() { if (_xidp) { ::free(_xidp); _xidp = 0; _datap = 0; } else if (_datap) { ::free(_datap); _datap = 0; } } void JournalImpl::handleIoResult(const iores r) { writeActivityFlag = true; switch (r) { case mrg::journal::RHM_IORES_SUCCESS: return; case mrg::journal::RHM_IORES_ENQCAPTHRESH: { std::ostringstream oss; oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\"."; log(LOG_WARN, oss.str()); if (_agent != 0) _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), qpid::management::ManagementAgent::SEV_WARN); THROW_STORE_FULL_EXCEPTION(oss.str()); } case mrg::journal::RHM_IORES_FULL: { std::ostringstream oss; oss << "Journal full on queue \"" << _jid << "\"."; log(LOG_CRITICAL, oss.str()); if (_agent != 0) _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); THROW_STORE_FULL_EXCEPTION(oss.str()); } default: { std::ostringstream oss; oss << "Unexpected I/O response (" << mrg::journal::iores_str(r) << ") on queue " << _jid << "\"."; log(LOG_ERROR, oss.str()); THROW_STORE_FULL_EXCEPTION(oss.str()); } } } qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId, qpid::management::Args& /*args*/, std::string& /*text*/) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; switch (methodId) { case _qmf::Journal::METHOD_EXPAND : //_qmf::ArgsJournalExpand& eArgs = (_qmf::ArgsJournalExpand&) args; // Implement "expand" using eArgs.i_by (expand-by argument) status = Manageable::STATUS_NOT_IMPLEMENTED; break; } return status; }