summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/IncompleteMessageList.h48
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h1
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAdapter.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAdapter.h7
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h10
-rw-r--r--qpid/cpp/src/tests/IncompleteMessageList.cpp128
-rw-r--r--qpid/cpp/src/tests/Makefile.am3
11 files changed, 284 insertions, 10 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index b71bd754f7..f79a634060 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -201,6 +201,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/IncomingExecutionContext.cpp \
+ qpid/broker/IncompleteMessageList.cpp \
qpid/broker/Message.cpp \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
@@ -321,6 +322,7 @@ nobase_include_HEADERS = \
qpid/broker/HandlerImpl.h \
qpid/broker/HeadersExchange.h \
qpid/broker/IncomingExecutionContext.h \
+ qpid/broker/IncompleteMessageList.h \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
diff --git a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp b/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
new file mode 100644
index 0000000000..dd7bbfc067
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "IncompleteMessageList.h"
+
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
+{
+ incomplete.push_back(msg);
+}
+
+void IncompleteMessageList::process(CompletionListener l, bool sync)
+{
+ while (!incomplete.empty()) {
+ boost::intrusive_ptr<Message>& msg = incomplete.front();
+ if (!msg->isEnqueueComplete()) {
+ if (sync){
+ msg->flush();
+ msg->waitForEnqueueComplete();
+ } else {
+ //leave the message as incomplete for now
+ return;
+ }
+ }
+ l(msg);
+ incomplete.pop_front();
+ }
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/broker/IncompleteMessageList.h b/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
new file mode 100644
index 0000000000..2cfd7bfee5
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _IncompleteMessageList_
+#define _IncompleteMessageList_
+
+#include <list>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+class IncompleteMessageList
+{
+ typedef std::list< boost::intrusive_ptr<Message> > Messages;
+ Messages incomplete;
+
+public:
+ typedef boost::function<void(boost::intrusive_ptr<Message>)> CompletionListener;
+
+ void add(boost::intrusive_ptr<Message> msg);
+ void process(CompletionListener l, bool sync);
+};
+
+
+}}
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 21908256a1..b60a95228d 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -84,6 +84,11 @@ bool Message::isPersistent()
return getAdapter().isPersistent(frames);
}
+bool Message::requiresAccept()
+{
+ return getAdapter().requiresAccept(frames);
+}
+
uint32_t Message::getRequiredCredit() const
{
//add up payload for all header and content frames in the frameset
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 801834d519..561cdede59 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -69,6 +69,7 @@ public:
bool isImmediate() const;
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
+ bool requiresAccept();
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
index d54f30ff72..ea2882b474 100644
--- a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -53,10 +53,16 @@ namespace broker{
bool TransferAdapter::isPersistent(const framing::FrameSet& f)
{
- const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>();
return p && p->getDeliveryMode() == 2;
}
+ bool TransferAdapter::requiresAccept(const framing::FrameSet& f)
+ {
+ const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>();
+ return b && b->getAcceptMode();
+ }
+
std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
{
return f.as<framing::MessageTransferBody>()->getDestination();
@@ -74,4 +80,10 @@ namespace broker{
return p ? &(p->getApplicationHeaders()) : 0;
}
+ bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
}}
diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.h b/qpid/cpp/src/qpid/broker/MessageAdapter.h
index 6aebfdc9b3..9759f320ac 100644
--- a/qpid/cpp/src/qpid/broker/MessageAdapter.h
+++ b/qpid/cpp/src/qpid/broker/MessageAdapter.h
@@ -44,15 +44,17 @@ struct MessageAdapter
virtual bool isImmediate(const framing::FrameSet& f) = 0;
virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0;
virtual bool isPersistent(const framing::FrameSet& f) = 0;
+ virtual bool requiresAccept(const framing::FrameSet& f) = 0;
};
struct TransferAdapter : MessageAdapter
{
virtual std::string getRoutingKey(const framing::FrameSet& f);
virtual std::string getExchange(const framing::FrameSet& f);
- bool isImmediate(const framing::FrameSet&);
virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
- bool isPersistent(const framing::FrameSet& f);
+ virtual bool isPersistent(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet&);
+ bool requiresAccept(const framing::FrameSet& f);
};
struct PreviewAdapter : TransferAdapter
@@ -60,6 +62,7 @@ struct PreviewAdapter : TransferAdapter
std::string getExchange(const framing::FrameSet& f);
std::string getRoutingKey(const framing::FrameSet& f);
const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
}}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 35ad562a22..19fb0a4a79 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -50,7 +50,8 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)),
+ enqueuedOp(boost::bind(&SessionState::enqueued, this, _1))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- //TODO: may want to hold up execution until async enqueue is complete
- completed.add(msg->getCommandId());
+
+ if (msg->isEnqueueComplete()) {
+ enqueued(msg);
+ } else {
+ incomplete.add(msg);
+ }
+
+ //hold up execution until async enqueue is complete
if (msg->getFrames().getMethod()->isSync()) {
+ incomplete.process(enqueuedOp, true);
sendCompletion();
+ } else {
+ incomplete.process(enqueuedOp, false);
}
}
}
+void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
+{
+ completed.add(msg->getCommandId());
+ if (msg->requiresAccept()) {
+ getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ }
+}
+
void SessionState::handle(AMQFrame& frame)
{
received(frame);
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index ecf0b41a7a..18acb6f096 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -33,10 +33,10 @@
#include "qpid/management/Session.h"
#include "SessionAdapter.h"
#include "DeliveryAdapter.h"
+#include "IncompleteMessageList.h"
#include "MessageBuilder.h"
#include "SessionContext.h"
#include "SemanticState.h"
-#include "IncomingExecutionContext.h"
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
@@ -53,10 +53,11 @@ class AMQP_ClientProxy;
namespace broker {
-class SessionHandler;
-class SessionManager;
class Broker;
class ConnectionState;
+class Message;
+class SessionHandler;
+class SessionManager;
/**
* Broker-side session state includes sessions handler chains, which may
@@ -132,12 +133,15 @@ class SessionState : public framing::SessionState,
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
+ IncompleteMessageList incomplete;
RangedOperation ackOp;
+ IncompleteMessageList::CompletionListener enqueuedOp;
management::Session::shared_ptr mgmtObject;
void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
+ void enqueued(boost::intrusive_ptr<Message> msg);
friend class SessionManager;
};
diff --git a/qpid/cpp/src/tests/IncompleteMessageList.cpp b/qpid/cpp/src/tests/IncompleteMessageList.cpp
new file mode 100644
index 0000000000..65cca4a628
--- /dev/null
+++ b/qpid/cpp/src/tests/IncompleteMessageList.cpp
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include <sstream>
+#include "qpid/broker/Message.h"
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/IncompleteMessageList.h"
+
+#include "unit_test.h"
+
+QPID_AUTO_TEST_SUITE(IncompleteMessageListTestSuite)
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct Checker
+{
+ std::list<SequenceNumber> ids;
+
+ Checker() { }
+
+ Checker(uint start, uint end) {
+ for (uint i = start; i <= end; i++) {
+ ids.push_back(i);
+ }
+ }
+
+ Checker& expect(const SequenceNumber& id) {
+ ids.push_back(id);
+ return *this;
+ }
+
+ void operator()(boost::intrusive_ptr<Message> msg) {
+ BOOST_CHECK(!ids.empty());
+ BOOST_CHECK_EQUAL(msg->getCommandId(), ids.front());
+ ids.pop_front();
+ }
+};
+
+BOOST_AUTO_TEST_CASE(testProcessSimple)
+{
+ IncompleteMessageList list;
+ SequenceNumber counter(1);
+ //fill up list with messages
+ for (int i = 0; i < 5; i++) {
+ boost::intrusive_ptr<Message> msg(new Message(counter++));
+ list.add(msg);
+ }
+ //process and ensure they are all passed to completion listener
+ list.process(Checker(1, 5), false);
+ //process again and ensure none are resent to listener
+ list.process(Checker(), false);
+}
+
+BOOST_AUTO_TEST_CASE(testProcessWithIncomplete)
+{
+ IncompleteMessageList list;
+ SequenceNumber counter(1);
+ boost::intrusive_ptr<Message> middle;
+ //fill up list with messages
+ for (int i = 0; i < 5; i++) {
+ boost::intrusive_ptr<Message> msg(new Message(counter++));
+ list.add(msg);
+ if (i == 2) {
+ //mark a message in the middle as incomplete
+ msg->enqueueAsync();
+ middle = msg;
+ }
+ }
+ //process and ensure only message upto incomplete message are passed to listener
+ list.process(Checker(1, 2), false);
+ //mark message complete and re-process to get remaining messages sent to listener
+ middle->enqueueComplete();
+ list.process(Checker(3, 5), false);
+}
+
+
+struct MockStore : public NullMessageStore
+{
+ Queue::shared_ptr queue;
+ boost::intrusive_ptr<Message> msg;
+
+ void flush(const qpid::broker::PersistableQueue& q) {
+ BOOST_CHECK_EQUAL(queue.get(), &q);
+ msg->enqueueComplete();
+ }
+};
+
+BOOST_AUTO_TEST_CASE(testSyncProcessWithIncomplete)
+{
+ IncompleteMessageList list;
+ SequenceNumber counter(1);
+ MockStore store;
+ store.queue = Queue::shared_ptr(new Queue("mock-queue"));
+ //fill up list with messages
+ for (int i = 0; i < 5; i++) {
+ boost::intrusive_ptr<Message> msg(new Message(counter++));
+ list.add(msg);
+ if (i == 2) {
+ //mark a message in the middle as incomplete
+ msg->enqueueAsync(store.queue, &store);
+ store.msg = msg;
+ }
+ }
+ //process with sync bit specified and ensure that all messages are passed to listener
+ list.process(Checker(1, 5), true);
+}
+
+QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index c5f5db7346..ca25ced5e0 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -40,7 +40,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
ClientSessionTest.cpp \
SequenceSet.cpp \
serialize.cpp \
- ProxyTemplate.cpp apply.cpp BoundedIterator.cpp
+ ProxyTemplate.cpp apply.cpp BoundedIterator.cpp \
+ IncompleteMessageList.cpp
check_LTLIBRARIES += libshlibtest.la
libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)