summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-11-14 20:39:32 +0000
committerKim van der Riet <kpvdr@apache.org>2013-11-14 20:39:32 +0000
commit7b69da362fa2f2d53ec5378539195b8a4a7cfa41 (patch)
tree9acd64a5e5ac0062dd12dd425ae3b1aaa31f1f82 /cpp/src/qpid/linearstore/MessageStoreImpl.cpp
parent7dd5803403d7e371a5dcadd05f6d0a97ce25d0c1 (diff)
downloadqpid-python-7b69da362fa2f2d53ec5378539195b8a4a7cfa41.tar.gz
QPID-4984: Fix for recovery ambiguity issue, other code tidy-ups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1542066 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
-rw-r--r--cpp/src/qpid/linearstore/MessageStoreImpl.cpp114
1 files changed, 54 insertions, 60 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index 100c8925de..3d06adaa67 100644
--- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* en
isInit(false),
envPath(envpath_),
broker(broker_),
- jrnlLog(qpid::qls_jrnl::JournalLog::LOG_NOTICE),
+ jrnlLog(qpid::linearstore::journal::JournalLog::LOG_NOTICE),
mgmtObject(),
agent(0)
{}
@@ -119,13 +119,13 @@ uint16_t MessageStoreImpl::getJrnlWrNumPages(const uint32_t wrPageSizeKib_)
}
}
-qpid::qls_jrnl::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::qls_jrnl::efpPartitionNumber_t partition_,
+qpid::linearstore::journal::efpPartitionNumber_t MessageStoreImpl::chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition_,
const std::string& /*paramName_*/) {
// TODO: check against list of existing partitions, throw if not found
return partition_;
}
-qpid::qls_jrnl::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_,
+qpid::linearstore::journal::efpDataSize_kib_t MessageStoreImpl::chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_,
const std::string& paramName_) {
uint8_t rem = efpFileSizeKib_ % uint64_t(QLS_SBLK_SIZE_KIB);
if (rem != 0) {
@@ -171,8 +171,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_)
{
// Extract and check options
const StoreOptions* opts = static_cast<const StoreOptions*>(options_);
- qpid::qls_jrnl::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
- qpid::qls_jrnl::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
+ qpid::linearstore::journal::efpPartitionNumber_t efpPartition = chkEfpPartition(opts->efpPartition, "efp-partition");
+ qpid::linearstore::journal::efpDataSize_kib_t efpFilePoolSize_kib = chkEfpFileSizeKiB(opts->efpFileSizeKib, "efp-file-size");
uint32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size");
uint32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size");
@@ -182,8 +182,8 @@ bool MessageStoreImpl::init(const qpid::Options* options_)
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& storeDir_,
- qpid::qls_jrnl::efpPartitionNumber_t efpPartition_,
- qpid::qls_jrnl::efpDataSize_kib_t efpFileSize_kib_,
+ qpid::linearstore::journal::efpPartitionNumber_t efpPartition_,
+ qpid::linearstore::journal::efpDataSize_kib_t efpFileSize_kib_,
const bool truncateFlag_,
uint32_t wCachePageSizeKib_,
uint32_t tplWCachePageSizeKib_)
@@ -230,7 +230,7 @@ void MessageStoreImpl::init()
}
try {
- qpid::qls_jrnl::jdir::create_dir(getBdbBaseDir());
+ qpid::linearstore::journal::jdir::create_dir(getBdbBaseDir());
dbenv.reset(new DbEnv(0));
dbenv->set_errpfx("linearstore");
@@ -282,7 +282,7 @@ void MessageStoreImpl::init()
THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e);
} catch (const StoreException&) {
throw;
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
QLS_LOG(error, "Journal Exception occurred while initializing store: " << e);
THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what());
} catch (...) {
@@ -291,7 +291,7 @@ void MessageStoreImpl::init()
}
} while (!isInit);
- efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(),
+ efpMgr.reset(new qpid::linearstore::journal::EmptyFilePoolManager(getStoreTopLevelDir(),
defaultEfpPartitionNumber,
defaultEfpFileSize_kib,
jrnlLog));
@@ -337,9 +337,9 @@ void MessageStoreImpl::truncateInit()
// TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
- qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir());
- qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir());
- qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir());
+ qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir());
+ qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir());
+ qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir());
QLS_LOG(notice, "Store directory " << getStoreTopLevelDir() << " was truncated.");
init();
}
@@ -349,7 +349,7 @@ void MessageStoreImpl::chkTplStoreInit()
// Prevent multiple threads from late-initializing the TPL
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
- qpid::qls_jrnl::jdir::create_dir(getTplBaseDir());
+ qpid::linearstore::journal::jdir::create_dir(getTplBaseDir());
tplStorePtr->initialize(/*tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks,*/ getEmptyFilePool(defaultEfpPartitionNumber, defaultEfpFileSize_kib), tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
@@ -379,7 +379,7 @@ MessageStoreImpl::~MessageStoreImpl()
closeDbs();
} catch (const DbException& e) {
QLS_LOG(error, "Error closing BDB databases: " << e.what());
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
QLS_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
QLS_LOG(error, "Error: " << e.what());
@@ -420,7 +420,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
queue_.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
jQueue->initialize(getEmptyFilePool(args_), wCacheNumPages, wCachePgSizeSblks);
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": create() failed: " + e.what());
}
try {
@@ -432,28 +432,28 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_,
}
}
-qpid::qls_jrnl::EmptyFilePool*
-MessageStoreImpl::getEmptyFilePool(const qpid::qls_jrnl::efpPartitionNumber_t efpPartitionNumber_,
- const qpid::qls_jrnl::efpDataSize_kib_t efpFileSizeKib_) {
- qpid::qls_jrnl::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
+qpid::linearstore::journal::EmptyFilePool*
+MessageStoreImpl::getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t efpPartitionNumber_,
+ const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib_) {
+ qpid::linearstore::journal::EmptyFilePool* efpp = efpMgr->getEmptyFilePool(efpPartitionNumber_, efpFileSizeKib_);
if (efpp == 0) {
std::ostringstream oss;
oss << "Partition=" << efpPartitionNumber_ << "; EfpFileSize=" << efpFileSizeKib_ << " KiB";
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR_EFP_NOEFP, oss.str(), "MessageStoreImpl", "getEmptyFilePool");
}
return efpp;
}
-qpid::qls_jrnl::EmptyFilePool*
+qpid::linearstore::journal::EmptyFilePool*
MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
qpid::framing::FieldTable::ValuePtr value;
- qpid::qls_jrnl::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
+ qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber;
value = args_.get("qpid.efp_partition");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition");
}
- qpid::qls_jrnl::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
+ qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib;
value = args_.get("qpid.efp_file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
@@ -694,7 +694,6 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
txn_list& prepared,
message_index& messages)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverQueues()");
Cursor queues;
queues.open(queueDb, txn.get());
@@ -731,7 +730,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
uint64_t thisHighestRid = 0ULL;
- jQueue->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
+ jQueue->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id);
// Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
// from recovery of a store that has had its size changed externally by the resize utility.
@@ -757,7 +756,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
QLS_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
}
//read all messages: done on a per queue basis if using Journal
@@ -869,7 +868,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
long& rcnt,
long& idcnt)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverMessages() queue=\"" << queue->getName() << "\"");
size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
@@ -894,11 +892,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
try {
unsigned aio_sleep_cnt = 0;
while (read) {
- qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
switch (res)
{
- case qpid::qls_jrnl::RHM_IORES_SUCCESS: {
+ case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
msg_count++;
qpid::broker::RecoverableMessage::shared_ptr msg;
char* data = (char*)dbuff;
@@ -948,11 +946,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
}
} else {
// Enqueue and/or dequeue tx
- qpid::qls_jrnl::txn_map& tmap = jc->get_txn_map();
- qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_map& tmap = jc->get_txn_map();
+ qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
- for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->enq_flag_ && j->rid_ == rid) enq = true;
else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
}
@@ -977,21 +975,21 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
aio_sleep_cnt = 0;
break;
}
- case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
+ case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()");
::usleep(AIO_SLEEP_TIME_US);
break;
- case qpid::qls_jrnl::RHM_IORES_EMPTY:
+ case qpid::linearstore::journal::RHM_IORES_EMPTY:
read = false;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
- oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::qls_jrnl::iores_str(res);
+ oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << qpid::linearstore::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
} // while
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
}
@@ -1000,7 +998,7 @@ qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(
uint64_t /*messageId*/,
unsigned& /*headerSize*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
}
int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
@@ -1036,9 +1034,8 @@ int MessageStoreImpl::enqueueMessage(TxnCtxt& txn_,
void MessageStoreImpl::readTplStore()
{
- QLS_LOG(info, "*** MessageStoreImpl::readTplStore()");
tplRecoverMap.clear();
- qpid::qls_jrnl::txn_map& tmap = tplStorePtr->get_txn_map();
+ qpid::linearstore::journal::txn_map& tmap = tplStorePtr->get_txn_map();
DataTokenImpl dtok;
void* dbuff = NULL; size_t dbuffSize = 0;
void* xidbuff = NULL; size_t xidbuffSize = 0;
@@ -1050,9 +1047,9 @@ void MessageStoreImpl::readTplStore()
while (!done) {
dtok.reset();
dtok.set_wstate(DataTokenImpl::ENQ);
- qpid::qls_jrnl::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
+ qpid::linearstore::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
switch (res) {
- case qpid::qls_jrnl::RHM_IORES_SUCCESS: {
+ case qpid::linearstore::journal::RHM_IORES_SUCCESS: {
// Every TPL record contains both data and an XID
assert(dbuffSize>0);
assert(xidbuffSize>0);
@@ -1060,7 +1057,7 @@ void MessageStoreImpl::readTplStore()
bool is2PC = *(static_cast<char*>(dbuff)) != 0;
// Check transaction details; add to recover map
- qpid::qls_jrnl::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
+ qpid::linearstore::journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
if (!txnList.empty()) { // xid found in tmap
unsigned enqCnt = 0;
unsigned deqCnt = 0;
@@ -1070,7 +1067,7 @@ void MessageStoreImpl::readTplStore()
// Note: will apply to both 1PC and 2PC transactions.
bool commitFlag = true;
- for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
+ for (qpid::linearstore::journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->enq_flag_) {
rid = j->rid_;
enqCnt++;
@@ -1088,31 +1085,30 @@ void MessageStoreImpl::readTplStore()
aio_sleep_cnt = 0;
break;
}
- case qpid::qls_jrnl::RHM_IORES_PAGE_AIOWAIT:
+ case qpid::linearstore::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
::usleep(AIO_SLEEP_TIME_US);
break;
- case qpid::qls_jrnl::RHM_IORES_EMPTY:
+ case qpid::linearstore::journal::RHM_IORES_EMPTY:
done = true;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
- oss << "readTplStore(): Unexpected result from journal read: " << qpid::qls_jrnl::iores_str(res);
+ oss << "readTplStore(): Unexpected result from journal read: " << qpid::linearstore::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
}
void MessageStoreImpl::recoverTplStore()
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()");
- if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) {
+ if (qpid::linearstore::journal::jdir::exists(tplStorePtr->jrnl_dir())) {
uint64_t thisHighestRid = 0ULL;
- tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
+ tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::linearstore::journal::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1127,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore()
void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
{
- QLS_LOG(info, "*** MessageStoreImpl::recoverLockedMappings()");
if (!tplStorePtr->is_ready())
recoverTplStore();
@@ -1143,7 +1138,6 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
- QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()");
if (tplStorePtr->is_ready()) {
readTplStore();
} else {
@@ -1158,18 +1152,18 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
}
void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
}
void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
const std::string& /*data*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
}
void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue*/,
@@ -1178,7 +1172,7 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue
uint64_t /*offset*/,
uint32_t /*length*/)
{
- throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
+ throw qpid::linearstore::journal::jexception(qpid::linearstore::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
}
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
@@ -1193,7 +1187,7 @@ void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
// TODO: check if this result should be used...
/*mrg::journal::iores res =*/ jc->flush();
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
@@ -1268,7 +1262,7 @@ void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue_,
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_->getName() + ": MessageStoreImpl::store() failed: " +
e.what());
}
@@ -1328,7 +1322,7 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
} else {
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
- } catch (const qpid::qls_jrnl::jexception& e) {
+ } catch (const qpid::linearstore::journal::jexception& e) {
ddtokp->release();
THROW_STORE_EXCEPTION(std::string("Queue ") + queue_.getName() + ": async_dequeue() failed: " + e.what());
}