diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2013-11-14 20:39:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2013-11-14 20:39:32 +0000 |
| commit | 7b69da362fa2f2d53ec5378539195b8a4a7cfa41 (patch) | |
| tree | 9acd64a5e5ac0062dd12dd425ae3b1aaa31f1f82 /cpp/src/qpid/linearstore/MessageStoreImpl.cpp | |
| parent | 7dd5803403d7e371a5dcadd05f6d0a97ce25d0c1 (diff) | |
| download | qpid-python-7b69da362fa2f2d53ec5378539195b8a4a7cfa41.tar.gz | |
QPID-4984: Fix for recovery ambiguity issue, other code tidy-ups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1542066 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 114 |
1 files changed, 54 insertions, 60 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 100c8925de..3d06adaa67 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en isInit(false), envPath(envpath_), broker(broker_), - jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE), + jrnlLog(qpid::linearstore::journal::JournalLog::LOG_NOTICE), mgmtObject(), agent(0) {} @@ -119,13 +119,13 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_) } } -qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_, +qpid::linearstore::journal::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition_, const std::string& /*paramName_*/) { // TODO: check against list of existing partitions, throw if not found return partition_; } -qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_, +qpid::linearstore::journal::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_, const std::string& paramName_) { uint8_t rem = efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB); if (rem != 0) { @@ -171,8 +171,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_) { // Extract and check options const StoreOptions* opts = static_cast<const StoreOptions*>(options_); - qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); - qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); + qpid::linearstore::journal::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition"); + qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size"); uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); @@ -182,8 +182,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_) // These params, taken from options, are assumed to be correct and verified bool MessageStoreImpl::init(const std::string& storeDir_, - qpid::qls_jrnl::efpPartitionNumber_t efpPartition_, - qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_, + qpid::linearstore::journal::efpPartitionNumber_t efpPartition_, + qpid::linearstore::journal::efpDataSize_kib_t efpFileSize_kib_, const bool truncateFlag_, uint32_t wCachePageSizeKib_, uint32_t tplWCachePageSizeKib_) @@ -230,7 +230,7 @@ void MessageStoreImpl::init() } try { - qpid::qls_jrnl::jdir::create_dir(getBdbBaseDir()); + qpid::linearstore::journal::jdir::create_dir(getBdbBaseDir()); dbenv.reset(new DbEnv(0)); dbenv->set_errpfx("linearstore"); @@ -282,7 +282,7 @@ void MessageStoreImpl::init() THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e); } catch (const StoreException&) { throw; - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG(error, "Journal Exception occurred while initializing store: " << e); THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what()); } catch (...) { @@ -291,7 +291,7 @@ void MessageStoreImpl::init() } } while (!isInit); - efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), + efpMgr.reset(new qpid::linearstore::journal::EmptyFilePoolManager(getStoreTopLevelDir(), defaultEfpPartitionNumber, defaultEfpFileSize_kib, jrnlLog)); @@ -337,9 +337,9 @@ void MessageStoreImpl::truncateInit() // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s). - qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir()); - qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir()); - qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir()); + qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir()); + qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir()); + qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir()); QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated."); init(); } @@ -349,7 +349,7 @@ void MessageStoreImpl::chkTplStoreInit() // Prevent multiple threads from late-initializing the TPL qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { - qpid::qls_jrnl::jdir::create_dir(getTplBaseDir()); + qpid::linearstore::journal::jdir::create_dir(getTplBaseDir()); tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks); if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true); } @@ -379,7 +379,7 @@ MessageStoreImpl::~MessageStoreImpl() closeDbs(); } catch (const DbException& e) { QLS_LOG(error, "Error closing BDB databases: " << e.what()); - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { QLS_LOG(error, "Error: " << e.what()); } catch (const std::exception& e) { QLS_LOG(error, "Error: " << e.what()); @@ -420,7 +420,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue)); try { jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks); - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what()); } try { @@ -432,28 +432,28 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, } } -qpid::qls_jrnl::EmptyFilePool* -MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_, - const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) { - qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); +qpid::linearstore::journal::EmptyFilePool* +MessageStoreImpl::getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t efpPartitionNumber_, + const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_) { + qpid::linearstore::journal::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_); if (efpp == 0) { std::ostringstream oss; oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB"; - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool"); } return efpp; } -qpid::qls_jrnl::EmptyFilePool* +qpid::linearstore::journal::EmptyFilePool* MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { qpid::framing::FieldTable::ValuePtr value; - qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; + qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; value = args_.get("qpid.efp_partition"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); } - qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; + qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; value = args_.get("qpid.efp_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); @@ -694,7 +694,6 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, txn_list& prepared, message_index& messages) { - QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()"); Cursor queues; queues.open(queueDb, txn.get()); @@ -731,7 +730,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, long rcnt = 0L; // recovered msg count long idcnt = 0L; // in-doubt msg count uint64_t thisHighestRid = 0ULL; - jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); + jQueue->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // Check for changes to queue store settings qpid.file_count and qpid.file_size resulting // from recovery of a store that has had its size changed externally by the resize utility. @@ -757,7 +756,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt); QLS_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt."); jQueue->recover_complete(); // start journal. - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what()); } //read all messages: done on a per queue basis if using Journal @@ -869,7 +868,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, long& rcnt, long& idcnt) { - QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\""); size_t preambleLength = sizeof(uint32_t)/*header size*/; JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore()); @@ -894,11 +892,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, try { unsigned aio_sleep_cnt = 0; while (read) { - qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); switch (res) { - case qpid::qls_jrnl::RHM_IORES_SUCCESS: { + case qpid::linearstore::journal::RHM_IORES_SUCCESS: { msg_count++; qpid::broker::RecoverableMessage::shared_ptr msg; char* data = (char*)dbuff; @@ -948,11 +946,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, } } else { // Enqueue and/or dequeue tx - qpid::qls_jrnl::txn_map& tmap = jc->get_txn_map(); - qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map(); + qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found bool enq = false; bool deq = false; - for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { + for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { if (j->enq_flag_ && j->rid_ == rid) enq = true; else if (!j->enq_flag_ && j->drid_ == rid) deq = true; } @@ -977,21 +975,21 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/, aio_sleep_cnt = 0; break; } - case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT: + case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT: if (++aio_sleep_cnt > MAX_AIO_SLEEPS) THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()"); ::usleep(AIO_SLEEP_TIME_US); break; - case qpid::qls_jrnl::RHM_IORES_EMPTY: + case qpid::linearstore::journal::RHM_IORES_EMPTY: read = false; break; // done with all messages. (add call in jrnl to test that _emap is empty.) default: std::ostringstream oss; - oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::qls_jrnl::iores_str(res); + oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::linearstore::journal::iores_str(res); THROW_STORE_EXCEPTION(oss.str()); } // switch } // while - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what()); } } @@ -1000,7 +998,7 @@ qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage( uint64_t /*messageId*/, unsigned& /*headerSize*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage"); } int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, @@ -1036,9 +1034,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_, void MessageStoreImpl::readTplStore() { - QLS_LOG(info, "*** MessageStoreImpl::readTplStore()"); tplRecoverMap.clear(); - qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map(); + qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map(); DataTokenImpl dtok; void* dbuff = NULL; size_t dbuffSize = 0; void* xidbuff = NULL; size_t xidbuffSize = 0; @@ -1050,9 +1047,9 @@ void MessageStoreImpl::readTplStore() while (!done) { dtok.reset(); dtok.set_wstate(DataTokenImpl::ENQ); - qpid::qls_jrnl::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); + qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok); switch (res) { - case qpid::qls_jrnl::RHM_IORES_SUCCESS: { + case qpid::linearstore::journal::RHM_IORES_SUCCESS: { // Every TPL record contains both data and an XID assert(dbuffSize>0); assert(xidbuffSize>0); @@ -1060,7 +1057,7 @@ void MessageStoreImpl::readTplStore() bool is2PC = *(static_cast<char*>(dbuff)) != 0; // Check transaction details; add to recover map - qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found + qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found if (!txnList.empty()) { // xid found in tmap unsigned enqCnt = 0; unsigned deqCnt = 0; @@ -1070,7 +1067,7 @@ void MessageStoreImpl::readTplStore() // Note: will apply to both 1PC and 2PC transactions. bool commitFlag = true; - for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { + for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) { if (j->enq_flag_) { rid = j->rid_; enqCnt++; @@ -1088,31 +1085,30 @@ void MessageStoreImpl::readTplStore() aio_sleep_cnt = 0; break; } - case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT: + case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT: if (++aio_sleep_cnt > MAX_AIO_SLEEPS) THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()"); ::usleep(AIO_SLEEP_TIME_US); break; - case qpid::qls_jrnl::RHM_IORES_EMPTY: + case qpid::linearstore::journal::RHM_IORES_EMPTY: done = true; break; // done with all messages. (add call in jrnl to test that _emap is empty.) default: std::ostringstream oss; - oss << "readTplStore(): Unexpected result from journal read: " << qpid::qls_jrnl::iores_str(res); + oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res); THROW_STORE_EXCEPTION(oss.str()); } // switch } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what()); } } void MessageStoreImpl::recoverTplStore() { - QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()"); - if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) { + if (qpid::linearstore::journal::jdir::exists(tplStorePtr->jrnl_dir())) { uint64_t thisHighestRid = 0ULL; - tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); + tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0); if (highestRid == 0ULL) highestRid = thisHighestRid; else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit @@ -1127,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore() void MessageStoreImpl::recoverLockedMappings(txn_list& txns) { - QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()"); if (!tplStorePtr->is_ready()) recoverTplStore(); @@ -1143,7 +1138,6 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns) void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) { - QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()"); if (tplStorePtr->is_ready()) { readTplStore(); } else { @@ -1158,18 +1152,18 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids) void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage"); } void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy"); } void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/, const std::string& /*data*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent"); } void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/, @@ -1178,7 +1172,7 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue uint64_t /*offset*/, uint32_t /*length*/) { - throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); + throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent"); } void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) @@ -1193,7 +1187,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_) // TODO: check if this result should be used... /*mrg::journal::iores res =*/ jc->flush(); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() ); } } @@ -1268,7 +1262,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_, } else { THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " + e.what()); } @@ -1328,7 +1322,7 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_, } else { jc->dequeue_txn_data_record(ddtokp.get(), tid); } - } catch (const qpid::qls_jrnl::jexception& e) { + } catch (const qpid::linearstore::journal::jexception& e) { ddtokp->release(); THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what()); } |
