summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-11-04 22:15:14 +0000
committerKim van der Riet <kpvdr@apache.org>2013-11-04 22:15:14 +0000
commit1be9d0e97c9c6f59d0b69096b5cd73bf651200a9 (patch)
tree89d21109e0c4da085d45585b225f83d1018bad13 /cpp/src/qpid/linearstore/MessageStoreImpl.cpp
parenta115ed83708fdd5c24f7553a8154373b65eddd42 (diff)
downloadqpid-python-1be9d0e97c9c6f59d0b69096b5cd73bf651200a9.tar.gz
QPID-4984: WIP. Basic enqueue/dequeue/txns work, still no EFP recycling.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1538790 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/linearstore/MessageStoreImpl.cpp')
-rw-r--r--cpp/src/qpid/linearstore/MessageStoreImpl.cpp82
1 files changed, 26 insertions, 56 deletions
diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
index cbc395d61a..100c8925de 100644
--- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
+++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
@@ -291,7 +291,10 @@ void MessageStoreImpl::init()
}
} while (!isInit);
- efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(), jrnlLog));
+ efpMgr.reset(new qpid::qls_jrnl::EmptyFilePoolManager(getStoreTopLevelDir(),
+ defaultEfpPartitionNumber,
+ defaultEfpFileSize_kib,
+ jrnlLog));
efpMgr->findEfpPartitions();
}
@@ -331,6 +334,9 @@ void MessageStoreImpl::truncateInit()
dbenv->close(0);
isInit = false;
}
+
+ // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
+
qpid::qls_jrnl::jdir::delete_dir(getBdbBaseDir());
qpid::qls_jrnl::jdir::delete_dir(getJrnlBaseDir());
qpid::qls_jrnl::jdir::delete_dir(getTplBaseDir());
@@ -730,6 +736,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
// Check for changes to queue store settings qpid.file_count and qpid.file_size resulting
// from recovery of a store that has had its size changed externally by the resize utility.
// If so, update the queue store settings so that QMF queries will reflect the new values.
+ // TODO: Update this for new settings, as qpid.file_count and qpid.file_size no longer apply
/*
const qpid::framing::FieldTable& storeargs = queue->getSettings().storeSettings;
qpid::framing::FieldTable::ValuePtr value;
@@ -866,8 +873,6 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
size_t preambleLength = sizeof(uint32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- DataTokenImpl dtok;
- size_t readSize = 0;
unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
@@ -876,19 +881,20 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
//bool read = jc->get_enq_cnt() > 0;
bool read = true;
- void* dbuff = NULL; size_t dbuffSize = 0;
- void* xidbuff = NULL; size_t xidbuffSize = 0;
+ void* dbuff = NULL;
+ size_t dbuffSize = 0;
+ void* xidbuff = NULL;
+ size_t xidbuffSize = 0;
bool transientFlag = false;
bool externalFlag = false;
-
- dtok.set_wstate(DataTokenImpl::ENQ);
+ DataTokenImpl dtok;
+ dtok.set_wstate(DataTokenImpl::NONE);
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
qpid::qls_jrnl::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
- readSize = dtok.dsize();
switch (res)
{
@@ -909,11 +915,11 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
// At some future point if delivery attempts are stored, then this call would
// become optional depending on that information.
msg->setRedelivered();
- // Reset the TTL for the recovered message
- msg->computeExpiration(broker->getExpiryPolicy());
+ // Reset the TTL for the recovered message
+ msg->computeExpiration(broker->getExpiryPolicy());
uint32_t contentOffset = headerSize + preambleLength;
- uint64_t contentSize = readSize - contentOffset;
+ uint64_t contentSize = dbuffSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
@@ -947,8 +953,8 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
bool enq = false;
bool deq = false;
for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag && j->_rid == rid) enq = true;
- else if (!j->_enq_flag && j->_drid == rid) deq = true;
+ if (j->enq_flag_ && j->rid_ == rid) enq = true;
+ else if (!j->enq_flag_ && j->drid_ == rid) deq = true;
}
if (enq && !deq && citr->second.commit_flag) {
rcnt++;
@@ -962,7 +968,7 @@ void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
}
dtok.reset();
- dtok.set_wstate(DataTokenImpl::ENQ);
+ dtok.set_wstate(DataTokenImpl::NONE);
if (xidbuff)
::free(xidbuff);
@@ -1065,11 +1071,11 @@ void MessageStoreImpl::readTplStore()
bool commitFlag = true;
for (qpid::qls_jrnl::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
- if (j->_enq_flag) {
- rid = j->_rid;
+ if (j->enq_flag_) {
+ rid = j->rid_;
enqCnt++;
} else {
- commitFlag = j->_commit_flag;
+ commitFlag = j->commit_flag_;
deqCnt++;
}
}
@@ -1104,10 +1110,9 @@ void MessageStoreImpl::readTplStore()
void MessageStoreImpl::recoverTplStore()
{
QLS_LOG(info, "*** MessageStoreImpl::recoverTplStore()");
-/*
- if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
+ if (qpid::qls_jrnl::jdir::exists(tplStorePtr->jrnl_dir())) {
uint64_t thisHighestRid = 0ULL;
- tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
+ tplStorePtr->recover(boost::dynamic_pointer_cast<qpid::qls_jrnl::EmptyFilePoolManager>(efpMgr), tplWCacheNumPages, tplWCachePgSizeSblks, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
@@ -1118,7 +1123,6 @@ void MessageStoreImpl::recoverTplStore()
tplStorePtr->recover_complete(); // start journal.
}
-*/
}
void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
@@ -1137,12 +1141,10 @@ void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
}
}
-void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
+void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
QLS_LOG(info, "*** MessageStoreImpl::collectPreparedXids()");
-/*
if (tplStorePtr->is_ready()) {
- tplStorePtr->read_reset();
readTplStore();
} else {
recoverTplStore();
@@ -1152,7 +1154,6 @@ void MessageStoreImpl::collectPreparedXids(std::set<std::string>& /*xids*/)
if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
-*/
}
void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
@@ -1178,31 +1179,6 @@ void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& /*queue
uint32_t /*length*/)
{
throw qpid::qls_jrnl::jexception(qpid::qls_jrnl::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "loadContent");
-/*
- checkInit();
- uint64_t messageId (msg->getPersistenceId());
-
- if (messageId != 0) {
- try {
- JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
- if (jc && jc->is_enqueued(messageId) ) {
- if (!jc->loadMsgContent(messageId, data, length, offset)) {
- std::ostringstream oss;
- oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern";
- THROW_STORE_EXCEPTION(oss.str());
- }
- } else {
- std::ostringstream oss;
- oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued";
- THROW_STORE_EXCEPTION(oss.str());
- }
- } catch (const qpid::qls_jrnl::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
- }
- } else {
- THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
- }
-*/
}
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue_)
@@ -1358,12 +1334,6 @@ void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt_,
}
}
-uint32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue_*/)
-{
-/* checkInit();*/
- return 0;
-}
-
void MessageStoreImpl::completed(TxnCtxt& txn_,
bool commit_)
{