summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerQueue.cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-08-15 18:13:02 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-08-15 18:13:02 +0000
commit7c966bd1fcb801e14e001237096470b9c7e87f1f (patch)
tree36409d01a360e3150a587fec1b9a30bcb5e8d89b /cpp/src/qpid/broker/BrokerQueue.cpp
parent3f2ac50fdb042c2c48ebbdc1e70e442f0bf1ab86 (diff)
downloadqpid-python-7c966bd1fcb801e14e001237096470b9c7e87f1f.tar.gz
async IO for broker store
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566289 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerQueue.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerQueue.cpp35
1 files changed, 30 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp
index 3d4d6b83be..d67575103f 100644
--- a/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -56,6 +56,14 @@ Queue::Queue(const string& _name, bool _autodelete,
Queue::~Queue(){}
+void Queue::notifyDurableIOComplete()
+{
+ // signal SemanticHander to ack completed dequeues
+ // then dispatch to ack...
+ serializer.execute(dispatchCallback);
+}
+
+
void Queue::deliver(Message::shared_ptr& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
@@ -63,11 +71,20 @@ void Queue::deliver(Message::shared_ptr& msg){
alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
}
} else {
- enqueue(0, msg);
- process(msg);
+
+
+ // if no store then mark as enqueued
+ if (!enqueue(0, msg)){
+ push(msg);
+ msg->enqueueComplete();
+ }else {
+ push(msg);
+ }
+ serializer.execute(dispatchCallback);
}
}
+
void Queue::recover(Message::shared_ptr& msg){
push(msg);
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
@@ -127,6 +144,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){
void Queue::dispatch(){
+
Message::shared_ptr msg;
while(true){
{
@@ -134,7 +152,7 @@ void Queue::dispatch(){
if (messages.empty()) break;
msg = messages.front();
}
- if( dispatch(msg) ){
+ if( msg->isEnqueueComplete() && dispatch(msg) ){
pop();
}else break;
@@ -215,20 +233,27 @@ bool Queue::canAutoDelete() const{
return autodelete && consumers.size() == 0;
}
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists,
+bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
store->enqueue(ctxt, *msg.get(), *this);
+ return true;
}
+ return false;
}
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists,
+bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
store->dequeue(ctxt, *msg.get(), *this);
+ return true;
}
+ return false;
}
+
namespace
{
const std::string qpidMaxSize("qpid.max_size");