summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-27 18:04:42 +0000
committerGordon Sim <gsim@apache.org>2008-03-27 18:04:42 +0000
commitf9380575ddccbe48edd5305e96db70892c1dc1aa (patch)
treebf4361ed80e1ad3c3d2aca4a345c8d0b49d5d642 /cpp/src/qpid
parent719c2529a14527c236e871603136ccbe44f632d3 (diff)
downloadqpid-python-f9380575ddccbe48edd5305e96db70892c1dc1aa.tar.gz
Send accept in response to message publications if required.
Hold up completion (and accept) until message from transfer is fully enqueued. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@641929 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.cpp51
-rw-r--r--cpp/src/qpid/broker/IncompleteMessageList.h48
-rw-r--r--cpp/src/qpid/broker/Message.cpp5
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp14
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h7
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp25
-rw-r--r--cpp/src/qpid/broker/SessionState.h10
8 files changed, 152 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp
new file mode 100644
index 0000000000..dd7bbfc067
--- /dev/null
+++ b/cpp/src/qpid/broker/IncompleteMessageList.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "IncompleteMessageList.h"
+
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
+{
+ incomplete.push_back(msg);
+}
+
+void IncompleteMessageList::process(CompletionListener l, bool sync)
+{
+ while (!incomplete.empty()) {
+ boost::intrusive_ptr<Message>& msg = incomplete.front();
+ if (!msg->isEnqueueComplete()) {
+ if (sync){
+ msg->flush();
+ msg->waitForEnqueueComplete();
+ } else {
+ //leave the message as incomplete for now
+ return;
+ }
+ }
+ l(msg);
+ incomplete.pop_front();
+ }
+}
+
+}}
diff --git a/cpp/src/qpid/broker/IncompleteMessageList.h b/cpp/src/qpid/broker/IncompleteMessageList.h
new file mode 100644
index 0000000000..2cfd7bfee5
--- /dev/null
+++ b/cpp/src/qpid/broker/IncompleteMessageList.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _IncompleteMessageList_
+#define _IncompleteMessageList_
+
+#include <list>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+class IncompleteMessageList
+{
+ typedef std::list< boost::intrusive_ptr<Message> > Messages;
+ Messages incomplete;
+
+public:
+ typedef boost::function<void(boost::intrusive_ptr<Message>)> CompletionListener;
+
+ void add(boost::intrusive_ptr<Message> msg);
+ void process(CompletionListener l, bool sync);
+};
+
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 21908256a1..b60a95228d 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -84,6 +84,11 @@ bool Message::isPersistent()
return getAdapter().isPersistent(frames);
}
+bool Message::requiresAccept()
+{
+ return getAdapter().requiresAccept(frames);
+}
+
uint32_t Message::getRequiredCredit() const
{
//add up payload for all header and content frames in the frameset
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 801834d519..561cdede59 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -69,6 +69,7 @@ public:
bool isImmediate() const;
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
+ bool requiresAccept();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index d54f30ff72..ea2882b474 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -53,10 +53,16 @@ namespace broker{
bool TransferAdapter::isPersistent(const framing::FrameSet& f)
{
- const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>();
return p && p->getDeliveryMode() == 2;
}
+ bool TransferAdapter::requiresAccept(const framing::FrameSet& f)
+ {
+ const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>();
+ return b && b->getAcceptMode();
+ }
+
std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
{
return f.as<framing::MessageTransferBody>()->getDestination();
@@ -74,4 +80,10 @@ namespace broker{
return p ? &(p->getApplicationHeaders()) : 0;
}
+ bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 6aebfdc9b3..9759f320ac 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -44,15 +44,17 @@ struct MessageAdapter
virtual bool isImmediate(const framing::FrameSet& f) = 0;
virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0;
virtual bool isPersistent(const framing::FrameSet& f) = 0;
+ virtual bool requiresAccept(const framing::FrameSet& f) = 0;
};
struct TransferAdapter : MessageAdapter
{
virtual std::string getRoutingKey(const framing::FrameSet& f);
virtual std::string getExchange(const framing::FrameSet& f);
- bool isImmediate(const framing::FrameSet&);
virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
- bool isPersistent(const framing::FrameSet& f);
+ virtual bool isPersistent(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet&);
+ bool requiresAccept(const framing::FrameSet& f);
};
struct PreviewAdapter : TransferAdapter
@@ -60,6 +62,7 @@ struct PreviewAdapter : TransferAdapter
std::string getExchange(const framing::FrameSet& f);
std::string getRoutingKey(const framing::FrameSet& f);
const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
}}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 35ad562a22..19fb0a4a79 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -50,7 +50,8 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
+ enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- //TODO: may want to hold up execution until async enqueue is complete
- completed.add(msg->getCommandId());
+
+ if (msg->isEnqueueComplete()) {
+ enqueued(msg);
+ } else {
+ incomplete.add(msg);
+ }
+
+ //hold up execution until async enqueue is complete
if (msg->getFrames().getMethod()->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
+ } else {
+ incomplete.process(enqueuedOp, false);
}
}
}
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+{
+ completed.add(msg->getCommandId());
+ if (msg->requiresAccept()) {
+ getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ }
+}
+
void SessionState::handle(AMQFrame& frame)
{
received(frame);
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index ecf0b41a7a..18acb6f096 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -33,10 +33,10 @@
#include "qpid/management/Session.h"
#include "SessionAdapter.h"
#include "DeliveryAdapter.h"
+#include "IncompleteMessageList.h"
#include "MessageBuilder.h"
#include "SessionContext.h"
#include "SemanticState.h"
-#include "IncomingExecutionContext.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -53,10 +53,11 @@ class AMQP_ClientProxy;
namespace broker {
-class SessionHandler;
-class SessionManager;
class Broker;
class ConnectionState;
+class Message;
+class SessionHandler;
+class SessionManager;
/**
* Broker-side session state includes sessions handler chains, which may
@@ -132,12 +133,15 @@ class SessionState : public framing::SessionState,
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
+ IncompleteMessageList incomplete;
RangedOperation ackOp;
+ IncompleteMessageList::CompletionListener enqueuedOp;
management::Session::shared_ptr mgmtObject;
void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
+ void enqueued(boost::intrusive_ptr<Message> msg);
friend class SessionManager;
};