diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/IncomingExecutionContext.cpp | 138 | ||||
-rw-r--r-- | cpp/src/qpid/broker/IncomingExecutionContext.h | 58 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/PersistableMessage.h | 78 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 2 |
8 files changed, 269 insertions, 58 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 217d3c3b4c..d3aad9939e 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -178,6 +178,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ + qpid/broker/IncomingExecutionContext.cpp \ qpid/broker/Message.cpp \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ @@ -278,6 +279,7 @@ nobase_include_HEADERS = \ qpid/broker/Daemon.h \ qpid/broker/DeliveryRecord.h \ qpid/broker/HeadersExchange.h \ + qpid/broker/IncomingExecutionContext.h \ qpid/broker/MessageStore.h \ qpid/broker/PersistableExchange.h \ qpid/broker/PersistableMessage.h \ diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.cpp b/cpp/src/qpid/broker/IncomingExecutionContext.cpp new file mode 100644 index 0000000000..7cf1179fcb --- /dev/null +++ b/cpp/src/qpid/broker/IncomingExecutionContext.cpp @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "IncomingExecutionContext.h" +#include "qpid/Exception.h" + +namespace qpid { +namespace broker { + +using qpid::framing::AccumulatedAck; +using qpid::framing::SequenceNumber; +using qpid::framing::SequenceNumberSet; + +void IncomingExecutionContext::noop() +{ + complete(next()); +} + +void IncomingExecutionContext::flush() +{ + for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ) { + if ((*i)->isEnqueueComplete()) { + complete((*i)->getCommandId()); + i = incomplete.erase(i); + } else { + i++; + } + } + window.lwm = completed.mark; +} + +void IncomingExecutionContext::sync() +{ + while (completed.mark < window.hwm) { + wait(); + } +} + +void IncomingExecutionContext::sync(const SequenceNumber& point) +{ + while (!isComplete(point)) { + wait(); + } +} + +/** + * Every call to next() should be followed be either a call to + * complete() - in the case of commands, which are always synchronous + * - or track() - in the case of messages which may be asynchronously + * stored. + */ +SequenceNumber IncomingExecutionContext::next() +{ + return ++window.hwm; +} + +void IncomingExecutionContext::complete(const SequenceNumber& command) +{ + completed.update(command, command); +} + +void IncomingExecutionContext::track(Message::shared_ptr msg) +{ + if (msg->isEnqueueComplete()) { + complete(msg->getCommandId()); + } else { + incomplete.push_back(msg); + } +} + +bool IncomingExecutionContext::isComplete(const SequenceNumber& command) +{ + if (command > window.hwm) { + throw Exception(QPID_MSG("Bad sync request: point exceeds last command received [" + << command.getValue() << " > " << window.hwm.getValue() << "]")); + } + + return completed.covers(command); +} + + +const SequenceNumber& IncomingExecutionContext::getMark() +{ + return completed.mark; +} + +SequenceNumberSet IncomingExecutionContext::getRange() +{ + SequenceNumberSet range; + completed.collectRanges(range); + return range; +} + +void IncomingExecutionContext::wait() +{ + check(); + incomplete.front()->waitForEnqueueComplete(); + flush(); +} + +/** + * This is a check of internal state consistency. + */ +void IncomingExecutionContext::check() +{ + if (incomplete.empty()) { + if (window.hwm != completed.mark) { + //can only happen if there is a call to next() without a + //corresponding call to completed() or track() - or if + //there is a logical error in flush() or + //AccumulatedAck::update() + throw Exception(QPID_MSG("Completion tracking error: window.hwm=" + << window.hwm.getValue() << ", completed.mark=" + << completed.mark.getValue())); + } + } +} + +}} + diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.h b/cpp/src/qpid/broker/IncomingExecutionContext.h new file mode 100644 index 0000000000..1e4a394be6 --- /dev/null +++ b/cpp/src/qpid/broker/IncomingExecutionContext.h @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _IncomingExecutionContext_ +#define _IncomingExecutionContext_ + +#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceNumber.h" +#include "Message.h" + +namespace qpid { +namespace broker { + +class IncomingExecutionContext +{ + typedef std::list<Message::shared_ptr> Messages; + framing::Window window; + framing::AccumulatedAck completed; + Messages incomplete; + + bool isComplete(const framing::SequenceNumber& command); + void check(); + void wait(); +public: + void noop(); + void flush(); + void sync(); + void sync(const framing::SequenceNumber& point); + framing::SequenceNumber next(); + void complete(const framing::SequenceNumber& command); + void track(Message::shared_ptr); + + const framing::SequenceNumber& getMark(); + framing::SequenceNumberSet getRange(); + +}; + + +}} + +#endif diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 26b31d73e5..53640c65ad 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -57,6 +57,8 @@ public: const ConnectionToken* getPublisher() const { return publisher; } void setPublisher(ConnectionToken* p) { publisher = p; } + const framing::SequenceNumber& getCommandId() { return frames.getId(); } + uint64_t contentSize() const; std::string getRoutingKey() const; diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 06fc59107e..30d8d37409 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -26,6 +26,7 @@ #include <boost/shared_ptr.hpp> #include "Persistable.h" #include "qpid/framing/amqp_types.h" +#include "qpid/sys/Monitor.h" namespace qpid { namespace broker { @@ -36,31 +37,23 @@ namespace broker { */ class PersistableMessage : public Persistable { + sys::Monitor asyncEnqueueLock; - - /** - * Needs to be set false on Message construction, then - * set once the broker has taken responsibility for the - * message. For transient, once enqueued, for durable, once - * stored. - */ - bool enqueueCompleted; - /** - * Counts the number of times the message has been processed - * async - thus when it == 0 the broker knows it has ownership - * -> an async store can increment this counter if it writes a - * copy to each queue, and case use this counter to know when all - * the write are complete - */ - int asyncCounter; + * Tracks the number of outstanding asynchronous enqueue + * operations. When the message is enqueued asynchronously the + * count is incremented; when that enqueue completes it is + * decremented. Thus when it is 0, there are no outstanding + * enqueues. + */ + int asyncEnqueueCounter; /** - * Needs to be set false on Message construction, then - * set once the dequeueis complete, it gets set - * For transient, once dequeued, for durable, once - * dequeue record has been stored. - */ + * Needs to be set false on Message construction, then + * set once the dequeueis complete, it gets set + * For transient, once dequeued, for durable, once + * dequeue record has been stored. + */ bool dequeueCompleted; public: @@ -73,23 +66,36 @@ public: virtual ~PersistableMessage() {}; - PersistableMessage(): - enqueueCompleted(false), - asyncCounter(0), - dequeueCompleted(false){}; + PersistableMessage(): asyncEnqueueCounter(0), dequeueCompleted(false) {} - inline bool isEnqueueComplete() {return enqueueCompleted;}; + inline void waitForEnqueueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + while (asyncEnqueueCounter > 0) { + asyncEnqueueLock.wait(); + } + } + + inline bool isEnqueueComplete() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + return asyncEnqueueCounter == 0; + } + inline void enqueueComplete() { - if (asyncCounter<=1) { - asyncCounter =0; - enqueueCompleted = true; - }else{ - asyncCounter--; - } - }; - inline void enqueueAsync() {asyncCounter++;}; - inline bool isDequeueComplete() {return dequeueCompleted;}; - inline void dequeueComplete() {dequeueCompleted = true;}; + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + if (asyncEnqueueCounter > 0) { + if (--asyncEnqueueCounter == 0) { + asyncEnqueueLock.notify(); + } + } + } + + inline void enqueueAsync() { + sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock); + asyncEnqueueCounter++; + } + + inline bool isDequeueComplete() { return dequeueCompleted; } + inline void dequeueComplete() { dequeueCompleted = true; } }; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index ead2fad379..dab41dd92f 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -97,27 +97,29 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran } } -void SemanticHandler::flush() +void SemanticHandler::sendCompletion() { - //flush doubles as a sync to begin with - send an execution.complete if (isOpen()) { + SequenceNumber mark = incoming.getMark(); + SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())); + ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range)); } } +void SemanticHandler::flush() +{ + incoming.flush(); + sendCompletion(); +} void SemanticHandler::sync() { - //for now, just treat as flush; will need to get more clever when we deal with async publication - flush(); + incoming.sync(); + sendCompletion(); } void SemanticHandler::noop() { - //Do nothing... - // - //is this an L3 control? or is it an L4 command? - //if the former, of what use is it? - //if the latter it may contain a synch request... but its odd to have it in this class + incoming.noop(); } void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) @@ -127,17 +129,18 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { - ++(incoming.lwm); + SequenceNumber id = incoming.next(); InvocationVisitor v(&adapter); method->accept(v); - //TODO: need to account for async store operations and interleaving - ++(incoming.hwm); + incoming.complete(id); if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult())); + ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult())); } + //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } + //TODO: if window gets too large send unsolicited completion } void SemanticHandler::handleL3(framing::AMQMethodBody* method) @@ -151,16 +154,16 @@ void SemanticHandler::handleContent(AMQFrame& frame) { Message::shared_ptr msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(++(incoming.lwm)); + msgBuilder.start(incoming.next()); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags msg->setPublisher(&connection); - session.handle(msg); + session.handle(msg); msgBuilder.end(); - //TODO: need to account for async store operations and interleaving - ++(incoming.hwm); + incoming.track(msg); + //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } } } @@ -172,11 +175,8 @@ bool SemanticHandler::isOpen() const { DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - //SequenceNumber copy(outgoing.hwm); - //++copy; MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax()); return outgoing.hwm; - //return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 478dfb6760..f17ef67bfc 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -25,6 +25,7 @@ #include "BrokerAdapter.h" #include "DeliveryAdapter.h" #include "MessageBuilder.h" +#include "IncomingExecutionContext.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ServerOperations.h" @@ -53,7 +54,7 @@ class SemanticHandler : public DeliveryAdapter, Session& session; Connection& connection; BrokerAdapter adapter; - framing::Window incoming; + IncomingExecutionContext incoming; framing::Window outgoing; sys::Mutex outLock; MessageBuilder msgBuilder; @@ -65,6 +66,8 @@ class SemanticHandler : public DeliveryAdapter, void handleCommand(framing::AMQMethodBody* method); void handleContent(framing::AMQFrame& frame); + void sendCompletion(); + //ChannelAdapter virtual methods: void handleMethod(framing::AMQMethodBody* method); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index bf742f9511..29f0d17cef 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -91,6 +91,7 @@ class QueueTest : public CppUnit::TestCase //Test basic delivery: Message::shared_ptr msg1 = message("e", "A"); + msg1->enqueueAsync();//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); @@ -107,6 +108,7 @@ class QueueTest : public CppUnit::TestCase void testAsyncMessageCount(){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); Message::shared_ptr msg1 = message("e", "A"); + msg1->enqueueAsync();//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); |