diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/legacystore/JournalImpl.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/legacystore/JournalImpl.cpp')
-rw-r--r-- | cpp/src/qpid/legacystore/JournalImpl.cpp | 633 |
1 files changed, 633 insertions, 0 deletions
diff --git a/cpp/src/qpid/legacystore/JournalImpl.cpp b/cpp/src/qpid/legacystore/JournalImpl.cpp new file mode 100644 index 0000000000..ba3f2aecae --- /dev/null +++ b/cpp/src/qpid/legacystore/JournalImpl.cpp @@ -0,0 +1,633 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/legacystore/JournalImpl.h" + +#include "qpid/legacystore/jrnl/jerrno.h" +#include "qpid/legacystore/jrnl/jexception.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/legacystore/ArgsJournalExpand.h" +#include "qmf/org/apache/qpid/legacystore/EventCreated.h" +#include "qmf/org/apache/qpid/legacystore/EventEnqThresholdExceeded.h" +#include "qmf/org/apache/qpid/legacystore/EventFull.h" +#include "qmf/org/apache/qpid/legacystore/EventRecovered.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Timer.h" +#include "qpid/legacystore/StoreException.h" + +using namespace mrg::msgstore; +using namespace mrg::journal; +using qpid::management::ManagementAgent; +namespace _qmf = qmf::org::apache::qpid::legacystore; + +InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): + qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {} + +void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); } + +GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): + qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {} + +void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); } + +JournalImpl::JournalImpl(qpid::sys::Timer& timer_, + const std::string& journalId, + const std::string& journalDirectory, + const std::string& journalBaseFilename, + const qpid::sys::Duration getEventsTimeout, + const qpid::sys::Duration flushTimeout, + qpid::management::ManagementAgent* a, + DeleteCallback onDelete): + jcntl(journalId, journalDirectory, journalBaseFilename), + timer(timer_), + getEventsTimerSetFlag(false), + lastReadRid(0), + writeActivityFlag(false), + flushTriggeredFlag(true), + _xidp(0), + _datap(0), + _dlen(0), + _dtok(), + _external(false), + deleteCallback(onDelete) +{ + getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); + inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout); + { + timer.start(); + timer.add(inactivityFireEventPtr); + } + + initManagement(a); + + log(LOG_NOTICE, "Created"); + std::ostringstream oss; + oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\""; + log(LOG_DEBUG, oss.str()); +} + +JournalImpl::~JournalImpl() +{ + if (deleteCallback) deleteCallback(*this); + if (_init_flag && !_stop_flag){ + try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! + catch (const jexception& e) { log(LOG_ERROR, e.what()); } + } + getEventsFireEventsPtr->cancel(); + inactivityFireEventPtr->cancel(); + free_read_buffers(); + + if (_mgmtObject.get() != 0) { + _mgmtObject->resourceDestroy(); + _mgmtObject.reset(); + } + + log(LOG_NOTICE, "Destroyed"); +} + +void +JournalImpl::initManagement(qpid::management::ManagementAgent* a) +{ + _agent = a; + if (_agent != 0) + { + _mgmtObject = _qmf::Journal::shared_ptr ( + new _qmf::Journal(_agent, this)); + + _mgmtObject->set_name(_jid); + _mgmtObject->set_directory(_jdir.dirname()); + _mgmtObject->set_baseFileName(_base_filename); + _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_readPages(JRNL_RMGR_PAGES); + + // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime + _mgmtObject->set_initialFileCount(0); + _mgmtObject->set_dataFileSize(0); + _mgmtObject->set_currentFileCount(0); + _mgmtObject->set_writePageSize(0); + _mgmtObject->set_writePages(0); + + _agent->addObject(_mgmtObject, 0, true); + } +} + + +void +JournalImpl::initialize(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + mrg::journal::aio_callback* const cbp) +{ + std::ostringstream oss; + oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; + oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; + oss << " wcache_num_pages=" << wcache_num_pages; + log(LOG_DEBUG, oss.str()); + jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp); + log(LOG_DEBUG, "Initialization complete"); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); + _mgmtObject->set_autoExpand(_lpmgr.is_ae()); + _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); + _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); + _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_writePages(wcache_num_pages); + } + if (_agent != 0) + _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()), + qpid::management::ManagementAgent::SEV_NOTE); +} + +void +JournalImpl::recover(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + mrg::journal::aio_callback* const cbp, + boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr, + u_int64_t& highest_rid, + u_int64_t queue_id) +{ + std::ostringstream oss1; + oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks; + oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec; + oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; + oss1 << " wcache_num_pages=" << wcache_num_pages; + log(LOG_DEBUG, oss1.str()); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); + _mgmtObject->set_autoExpand(_lpmgr.is_ae()); + _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); + _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); + _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE); + _mgmtObject->set_writePages(wcache_num_pages); + } + + if (prep_tx_list_ptr) { + // Create list of prepared xids + std::vector<std::string> prep_xid_list; + for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { + prep_xid_list.push_back(i->xid); + } + + jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + cbp, &prep_xid_list, highest_rid); + } else { + jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + cbp, 0, highest_rid); + } + + // Populate PreparedTransaction lists from _tmap + if (prep_tx_list_ptr) + { + for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { + txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found + for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { + if (tdl_itr->_enq_flag) { // enqueue op + i->enqueues->add(queue_id, tdl_itr->_rid); + } else { // dequeue op + i->dequeues->add(queue_id, tdl_itr->_drid); + } + } + } + } + std::ostringstream oss2; + oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid; + oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size(); + oss2 << "; journal now read-only."; + log(LOG_DEBUG, oss2.str()); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->inc_recordDepth(_emap.size()); + _mgmtObject->inc_enqueues(_emap.size()); + _mgmtObject->inc_txn(_tmap.size()); + _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt()); + _mgmtObject->inc_txnDequeues(_tmap.deq_cnt()); + } +} + +void +JournalImpl::recover_complete() +{ + jcntl::recover_complete(); + log(LOG_DEBUG, "Recover phase 2 complete; journal now writable."); + if (_agent != 0) + _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(), + _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE); +} + +//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec +//#define AIO_SLEEP_TIME_US 10 // 0.01 ms +// Return true if content is recovered from store; false if content is external and must be recovered from an external store. +// Throw exception for all errors. +bool +JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset) +{ + qpid::sys::Mutex::ScopedLock sl(_read_lock); + if (_dtok.rid() != rid) + { + // Free any previous msg + free_read_buffers(); + + // Last read encountered out-of-order rids, check if this rid is in that list + bool oooFlag = false; + for (std::vector<u_int64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) { + if (*i == rid) { + oooFlag = true; + } + } + + // TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering + // jumpover points and allow the read to jump back to the first known jumpover point - but this needs + // a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a + // combination of lid/offset. + // NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing. + if (oooFlag || rid < lastReadRid) { + _rmgr.invalidate(); + oooRidList.clear(); + } + _dlen = 0; + _dtok.reset(); + _dtok.set_wstate(DataTokenImpl::ENQ); + _dtok.set_rid(0); + _external = false; + size_t xlen = 0; + bool transient = false; + bool done = false; + bool rid_found = false; + while (!done) { + iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok); + switch (res) { + case mrg::journal::RHM_IORES_SUCCESS: + if (_dtok.rid() != rid) { + // Check if this is an out-of-order rid that may impact next read + if (_dtok.rid() > rid) + oooRidList.push_back(_dtok.rid()); + free_read_buffers(); + // Reset data token for next read + _dlen = 0; + _dtok.reset(); + _dtok.set_wstate(DataTokenImpl::ENQ); + _dtok.set_rid(0); + } else { + rid_found = _dtok.rid() == rid; + lastReadRid = rid; + done = true; + } + break; + case mrg::journal::RHM_IORES_PAGE_AIOWAIT: + if (get_wr_events(&_aio_cmpl_timeout) == journal::jerrno::AIO_TIMEOUT) { + std::stringstream ss; + ss << "read_data_record() returned " << mrg::journal::iores_str(res); + ss << "; timed out waiting for page to be processed."; + throw jexception(mrg::journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl", + "loadMsgContent"); + } + break; + default: + std::stringstream ss; + ss << "read_data_record() returned " << mrg::journal::iores_str(res); + throw jexception(mrg::journal::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl", + "loadMsgContent"); + } + } + if (!rid_found) { + std::stringstream ss; + ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec; + ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec; + ss << " (" << _dtok.rid() << ")"; + throw jexception(mrg::journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent"); + } + } + + if (_external) return false; + + u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t); + if (hdr_offs + offset + length > _dlen) { + data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset); + } else { + data.append((const char*)_datap + hdr_offs + offset, length); + } + return true; +} + +void +JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len, + const size_t this_data_len, data_tok* dtokp, const bool transient) +{ + handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient)); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->inc_enqueues(); + _mgmtObject->inc_recordDepth(); + } +} + +void +JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp, + const bool transient) +{ + handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->inc_enqueues(); + _mgmtObject->inc_recordDepth(); + } +} + +void +JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, + const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient) +{ + bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; + + handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient)); + + if (_mgmtObject.get() != 0) + { + if (!txn_incr) // If this xid was not in _tmap, it will be now... + _mgmtObject->inc_txn(); + _mgmtObject->inc_enqueues(); + _mgmtObject->inc_txnEnqueues(); + _mgmtObject->inc_recordDepth(); + } +} + +void +JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp, + const std::string& xid, const bool transient) +{ + bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; + + handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient)); + + if (_mgmtObject.get() != 0) + { + if (!txn_incr) // If this xid was not in _tmap, it will be now... + _mgmtObject->inc_txn(); + _mgmtObject->inc_enqueues(); + _mgmtObject->inc_txnEnqueues(); + _mgmtObject->inc_recordDepth(); + } +} + +void +JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit) +{ + handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit)); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->inc_dequeues(); + _mgmtObject->inc_txnDequeues(); + _mgmtObject->dec_recordDepth(); + } +} + +void +JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit) +{ + bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; + + handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit)); + + if (_mgmtObject.get() != 0) + { + if (!txn_incr) // If this xid was not in _tmap, it will be now... + _mgmtObject->inc_txn(); + _mgmtObject->inc_dequeues(); + _mgmtObject->inc_txnDequeues(); + _mgmtObject->dec_recordDepth(); + } +} + +void +JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid) +{ + handleIoResult(jcntl::txn_abort(dtokp, xid)); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->dec_txn(); + _mgmtObject->inc_txnAborts(); + } +} + +void +JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid) +{ + handleIoResult(jcntl::txn_commit(dtokp, xid)); + + if (_mgmtObject.get() != 0) + { + _mgmtObject->dec_txn(); + _mgmtObject->inc_txnCommits(); + } +} + +void +JournalImpl::stop(bool block_till_aio_cmpl) +{ + InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()); + assert(ifep); // dynamic_cast can return null if the cast fails + ifep->cancel(); + jcntl::stop(block_till_aio_cmpl); + + if (_mgmtObject.get() != 0) { + _mgmtObject->resourceDestroy(); + _mgmtObject.reset(); + } +} + +iores +JournalImpl::flush(const bool block_till_aio_cmpl) +{ + const iores res = jcntl::flush(block_till_aio_cmpl); + { + qpid::sys::Mutex::ScopedLock sl(_getf_lock); + if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); } + } + return res; +} + +void +JournalImpl::log(mrg::journal::log_level ll, const std::string& log_stmt) const +{ + log(ll, log_stmt.c_str()); +} + +void +JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const +{ + switch (ll) + { + case LOG_TRACE: QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_DEBUG: QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_INFO: QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_NOTICE: QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_WARN: QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break; + case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break; + } +} + +void +JournalImpl::getEventsFire() +{ + qpid::sys::Mutex::ScopedLock sl(_getf_lock); + getEventsTimerSetFlag = false; + if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); } + if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); } +} + +void +JournalImpl::flushFire() +{ + if (writeActivityFlag) { + writeActivityFlag = false; + flushTriggeredFlag = false; + } else { + if (!flushTriggeredFlag) { + flush(); + flushTriggeredFlag = true; + } + } + inactivityFireEventPtr->setupNextFire(); + { + timer.add(inactivityFireEventPtr); + } +} + +void +JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl) +{ + for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) + { + DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i); + if (/*!is_stopped() &&*/ dtokp->getSourceMessage()) + { + switch (dtokp->wstate()) + { + case data_tok::ENQ: + dtokp->getSourceMessage()->enqueueComplete(); + break; + case data_tok::DEQ: +/* Don't need to signal until we have a way to ack completion of dequeue in AMQP + dtokp->getSourceMessage()->dequeueComplete(); + if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue + dtokp->getSourceMessage()->setPersistenceId(0); +*/ + break; + default: ; + } + } + dtokp->release(); + } +} + +void +JournalImpl::rd_aio_cb(std::vector<u_int16_t>& /*pil*/) +{} + +void +JournalImpl::free_read_buffers() +{ + if (_xidp) { + ::free(_xidp); + _xidp = 0; + _datap = 0; + } else if (_datap) { + ::free(_datap); + _datap = 0; + } +} + +void +JournalImpl::handleIoResult(const iores r) +{ + writeActivityFlag = true; + switch (r) + { + case mrg::journal::RHM_IORES_SUCCESS: + return; + case mrg::journal::RHM_IORES_ENQCAPTHRESH: + { + std::ostringstream oss; + oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\"."; + log(LOG_WARN, oss.str()); + if (_agent != 0) + _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), + qpid::management::ManagementAgent::SEV_WARN); + THROW_STORE_FULL_EXCEPTION(oss.str()); + } + case mrg::journal::RHM_IORES_FULL: + { + std::ostringstream oss; + oss << "Journal full on queue \"" << _jid << "\"."; + log(LOG_CRITICAL, oss.str()); + if (_agent != 0) + _agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR); + THROW_STORE_FULL_EXCEPTION(oss.str()); + } + default: + { + std::ostringstream oss; + oss << "Unexpected I/O response (" << mrg::journal::iores_str(r) << ") on queue " << _jid << "\"."; + log(LOG_ERROR, oss.str()); + THROW_STORE_FULL_EXCEPTION(oss.str()); + } + } +} + +qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId, + qpid::management::Args& /*args*/, + std::string& /*text*/) +{ + Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; + + switch (methodId) + { + case _qmf::Journal::METHOD_EXPAND : + //_qmf::ArgsJournalExpand& eArgs = (_qmf::ArgsJournalExpand&) args; + + // Implement "expand" using eArgs.i_by (expand-by argument) + + status = Manageable::STATUS_NOT_IMPLEMENTED; + break; + } + + return status; +} |