diff options
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp | 51 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/IncompleteMessageList.h | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageAdapter.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageAdapter.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 25 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/tests/IncompleteMessageList.cpp | 128 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 3 |
11 files changed, 284 insertions, 10 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index b71bd754f7..f79a634060 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -201,6 +201,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/IncomingExecutionContext.cpp \ + qpid/broker/IncompleteMessageList.cpp \ qpid/broker/Message.cpp \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ @@ -321,6 +322,7 @@ nobase_include_HEADERS = \ qpid/broker/HandlerImpl.h \ qpid/broker/HeadersExchange.h \ qpid/broker/IncomingExecutionContext.h \ + qpid/broker/IncompleteMessageList.h \ qpid/broker/Message.h \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ diff --git a/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp b/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp new file mode 100644 index 0000000000..dd7bbfc067 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "IncompleteMessageList.h" + +#include "Message.h" + +namespace qpid { +namespace broker { + +void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg) +{ + incomplete.push_back(msg); +} + +void IncompleteMessageList::process(CompletionListener l, bool sync) +{ + while (!incomplete.empty()) { + boost::intrusive_ptr<Message>& msg = incomplete.front(); + if (!msg->isEnqueueComplete()) { + if (sync){ + msg->flush(); + msg->waitForEnqueueComplete(); + } else { + //leave the message as incomplete for now + return; + } + } + l(msg); + incomplete.pop_front(); + } +} + +}} diff --git a/qpid/cpp/src/qpid/broker/IncompleteMessageList.h b/qpid/cpp/src/qpid/broker/IncompleteMessageList.h new file mode 100644 index 0000000000..2cfd7bfee5 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/IncompleteMessageList.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _IncompleteMessageList_ +#define _IncompleteMessageList_ + +#include <list> +#include <boost/intrusive_ptr.hpp> +#include <boost/function.hpp> + +namespace qpid { +namespace broker { + +class Message; + +class IncompleteMessageList +{ + typedef std::list< boost::intrusive_ptr<Message> > Messages; + Messages incomplete; + +public: + typedef boost::function<void(boost::intrusive_ptr<Message>)> CompletionListener; + + void add(boost::intrusive_ptr<Message> msg); + void process(CompletionListener l, bool sync); +}; + + +}} + +#endif diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 21908256a1..b60a95228d 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -84,6 +84,11 @@ bool Message::isPersistent() return getAdapter().isPersistent(frames); } +bool Message::requiresAccept() +{ + return getAdapter().requiresAccept(frames); +} + uint32_t Message::getRequiredCredit() const { //add up payload for all header and content frames in the frameset diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 801834d519..561cdede59 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -69,6 +69,7 @@ public: bool isImmediate() const; const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); + bool requiresAccept(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp index d54f30ff72..ea2882b474 100644 --- a/qpid/cpp/src/qpid/broker/MessageAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/MessageAdapter.cpp @@ -53,10 +53,16 @@ namespace broker{ bool TransferAdapter::isPersistent(const framing::FrameSet& f) { - const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>(); return p && p->getDeliveryMode() == 2; } + bool TransferAdapter::requiresAccept(const framing::FrameSet& f) + { + const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>(); + return b && b->getAcceptMode(); + } + std::string PreviewAdapter::getExchange(const framing::FrameSet& f) { return f.as<framing::MessageTransferBody>()->getDestination(); @@ -74,4 +80,10 @@ namespace broker{ return p ? &(p->getApplicationHeaders()) : 0; } + bool PreviewAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + return p && p->getDeliveryMode() == 2; + } + }} diff --git a/qpid/cpp/src/qpid/broker/MessageAdapter.h b/qpid/cpp/src/qpid/broker/MessageAdapter.h index 6aebfdc9b3..9759f320ac 100644 --- a/qpid/cpp/src/qpid/broker/MessageAdapter.h +++ b/qpid/cpp/src/qpid/broker/MessageAdapter.h @@ -44,15 +44,17 @@ struct MessageAdapter virtual bool isImmediate(const framing::FrameSet& f) = 0; virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0; virtual bool isPersistent(const framing::FrameSet& f) = 0; + virtual bool requiresAccept(const framing::FrameSet& f) = 0; }; struct TransferAdapter : MessageAdapter { virtual std::string getRoutingKey(const framing::FrameSet& f); virtual std::string getExchange(const framing::FrameSet& f); - bool isImmediate(const framing::FrameSet&); virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); - bool isPersistent(const framing::FrameSet& f); + virtual bool isPersistent(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet&); + bool requiresAccept(const framing::FrameSet& f); }; struct PreviewAdapter : TransferAdapter @@ -60,6 +62,7 @@ struct PreviewAdapter : TransferAdapter std::string getExchange(const framing::FrameSet& f); std::string getRoutingKey(const framing::FrameSet& f); const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; }} diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 35ad562a22..19fb0a4a79 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -50,7 +50,8 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)), + enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -182,6 +183,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& getProxy().getExecution010().result(id, invocation.getResult()); } if (method->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -202,14 +204,31 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - //TODO: may want to hold up execution until async enqueue is complete - completed.add(msg->getCommandId()); + + if (msg->isEnqueueComplete()) { + enqueued(msg); + } else { + incomplete.add(msg); + } + + //hold up execution until async enqueue is complete if (msg->getFrames().getMethod()->isSync()) { + incomplete.process(enqueuedOp, true); sendCompletion(); + } else { + incomplete.process(enqueuedOp, false); } } } +void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +{ + completed.add(msg->getCommandId()); + if (msg->requiresAccept()) { + getProxy().getMessage010().accept(SequenceSet(msg->getCommandId())); + } +} + void SessionState::handle(AMQFrame& frame) { received(frame); diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index ecf0b41a7a..18acb6f096 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -33,10 +33,10 @@ #include "qpid/management/Session.h" #include "SessionAdapter.h" #include "DeliveryAdapter.h" +#include "IncompleteMessageList.h" #include "MessageBuilder.h" #include "SessionContext.h" #include "SemanticState.h" -#include "IncomingExecutionContext.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -53,10 +53,11 @@ class AMQP_ClientProxy; namespace broker { -class SessionHandler; -class SessionManager; class Broker; class ConnectionState; +class Message; +class SessionHandler; +class SessionManager; /** * Broker-side session state includes sessions handler chains, which may @@ -132,12 +133,15 @@ class SessionState : public framing::SessionState, SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; + IncompleteMessageList incomplete; RangedOperation ackOp; + IncompleteMessageList::CompletionListener enqueuedOp; management::Session::shared_ptr mgmtObject; void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id); + void enqueued(boost::intrusive_ptr<Message> msg); friend class SessionManager; }; diff --git a/qpid/cpp/src/tests/IncompleteMessageList.cpp b/qpid/cpp/src/tests/IncompleteMessageList.cpp new file mode 100644 index 0000000000..65cca4a628 --- /dev/null +++ b/qpid/cpp/src/tests/IncompleteMessageList.cpp @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <iostream> +#include <sstream> +#include "qpid/broker/Message.h" +#include "qpid/broker/NullMessageStore.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/IncompleteMessageList.h" + +#include "unit_test.h" + +QPID_AUTO_TEST_SUITE(IncompleteMessageListTestSuite) + +using namespace qpid::broker; +using namespace qpid::framing; + +struct Checker +{ + std::list<SequenceNumber> ids; + + Checker() { } + + Checker(uint start, uint end) { + for (uint i = start; i <= end; i++) { + ids.push_back(i); + } + } + + Checker& expect(const SequenceNumber& id) { + ids.push_back(id); + return *this; + } + + void operator()(boost::intrusive_ptr<Message> msg) { + BOOST_CHECK(!ids.empty()); + BOOST_CHECK_EQUAL(msg->getCommandId(), ids.front()); + ids.pop_front(); + } +}; + +BOOST_AUTO_TEST_CASE(testProcessSimple) +{ + IncompleteMessageList list; + SequenceNumber counter(1); + //fill up list with messages + for (int i = 0; i < 5; i++) { + boost::intrusive_ptr<Message> msg(new Message(counter++)); + list.add(msg); + } + //process and ensure they are all passed to completion listener + list.process(Checker(1, 5), false); + //process again and ensure none are resent to listener + list.process(Checker(), false); +} + +BOOST_AUTO_TEST_CASE(testProcessWithIncomplete) +{ + IncompleteMessageList list; + SequenceNumber counter(1); + boost::intrusive_ptr<Message> middle; + //fill up list with messages + for (int i = 0; i < 5; i++) { + boost::intrusive_ptr<Message> msg(new Message(counter++)); + list.add(msg); + if (i == 2) { + //mark a message in the middle as incomplete + msg->enqueueAsync(); + middle = msg; + } + } + //process and ensure only message upto incomplete message are passed to listener + list.process(Checker(1, 2), false); + //mark message complete and re-process to get remaining messages sent to listener + middle->enqueueComplete(); + list.process(Checker(3, 5), false); +} + + +struct MockStore : public NullMessageStore +{ + Queue::shared_ptr queue; + boost::intrusive_ptr<Message> msg; + + void flush(const qpid::broker::PersistableQueue& q) { + BOOST_CHECK_EQUAL(queue.get(), &q); + msg->enqueueComplete(); + } +}; + +BOOST_AUTO_TEST_CASE(testSyncProcessWithIncomplete) +{ + IncompleteMessageList list; + SequenceNumber counter(1); + MockStore store; + store.queue = Queue::shared_ptr(new Queue("mock-queue")); + //fill up list with messages + for (int i = 0; i < 5; i++) { + boost::intrusive_ptr<Message> msg(new Message(counter++)); + list.add(msg); + if (i == 2) { + //mark a message in the middle as incomplete + msg->enqueueAsync(store.queue, &store); + store.msg = msg; + } + } + //process with sync bit specified and ensure that all messages are passed to listener + list.process(Checker(1, 5), true); +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index c5f5db7346..ca25ced5e0 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -40,7 +40,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ ClientSessionTest.cpp \ SequenceSet.cpp \ serialize.cpp \ - ProxyTemplate.cpp apply.cpp BoundedIterator.cpp + ProxyTemplate.cpp apply.cpp BoundedIterator.cpp \ + IncompleteMessageList.cpp check_LTLIBRARIES += libshlibtest.la libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir) |