diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
commit | a2f1ddbe4175c6136b1188faeccaf1f8e561e3b2 (patch) | |
tree | fec659c6eb749cf6eca5b790a439962dd930281d | |
parent | 6054ada715929a82afa55601f1c4b5f226cf45b8 (diff) | |
download | qpid-python-a2f1ddbe4175c6136b1188faeccaf1f8e561e3b2.tar.gz |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@560285 13f79535-47bb-0310-9956-ffa450edef68
39 files changed, 586 insertions, 399 deletions
diff --git a/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java b/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java index e78eec112f..66c19532c6 100644 --- a/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java +++ b/qpid/cpp/gentools/src/org/apache/qpid/gentools/AmqpMethod.java @@ -51,6 +51,7 @@ public class AmqpMethod implements Printable, NodeAware, VersionConsistencyCheck } public boolean isResponse(AmqpVersion version) { + if (!CppGenerator.USE_RELIABLE_FRAMING) return false; return (version == null) ? isResponseFlagMap.isSet() : isResponseFlagMap.isSet(version); } diff --git a/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java b/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java index f31f9615fc..4fbda4e0fb 100644 --- a/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java +++ b/qpid/cpp/gentools/src/org/apache/qpid/gentools/CppGenerator.java @@ -29,6 +29,8 @@ import java.util.TreeMap; public class CppGenerator extends Generator { + protected static final boolean USE_RELIABLE_FRAMING = false; + protected static final String versionNamespaceStartToken = "${version_namespace_start}"; protected static final String versionNamespaceEndToken = "${version_namespace_end}"; @@ -150,12 +152,11 @@ public class CppGenerator extends Generator "buffer.putLongString(#)", // encodeExpression "buffer.getLongString(#)")); // decodeExpression - //NB: this is WRONG! but is here as a transitional aid typeMap.put("rfc1982-long-set", new DomainInfo( - "u_int16_t", // type - "2", // size - "buffer.putShort(#)", // encodeExpression - "# = buffer.getShort()")); // decodeExpression + "SequenceNumberSet", // type + "#.encodedSize()", // size + "#.encode(buffer)", // encodeExpression + "#.decode(buffer)")); // decodeExpression } public boolean isQuietFlag() @@ -378,6 +379,7 @@ public class CppGenerator extends Generator } private String baseClass(AmqpMethod method, AmqpVersion version) { + if (!USE_RELIABLE_FRAMING) return "AMQMethodBody"; String base = method.isResponse(version) ? "AMQResponseBody":"AMQRequestBody"; return base; } @@ -787,16 +789,6 @@ public class CppGenerator extends Generator sb.append(generateMethodParameterList(thisFieldMap, indentSize + (5*tabSize), false, true, true)); } - //if (abstractMethodFlag) sb.append("const MethodContext& context"); - //boolean leadingComma = abstractMethodFlag; - //int paramIndent = indentSize + (5*tabSize); - // sb.append(generateMethodParameterList(thisFieldMap, paramIndent, leadingComma, true, true)); - /* - if (!abstractMethodFlag && method.isResponse(null)) { - if (!thisFieldMap.isEmpty()) sb.append(", \n"+Utils.createSpaces(paramIndent)); - sb.append(" RequestId responseTo"); - } - */ sb.append(" )"); if (abstractMethodFlag) sb.append(" = 0"); diff --git a/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl b/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl index 093a5ffe90..aeabd8a256 100644 --- a/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl +++ b/qpid/cpp/gentools/templ.cpp/MethodBodyClass.h.tmpl @@ -37,6 +37,7 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FramingContent.h" +#include "qpid/framing/SequenceNumberSet.h" namespace qpid { diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index f4e807cf66..cf1598bcca 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -155,6 +155,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Requester.cpp \ qpid/framing/Responder.cpp \ qpid/framing/SequenceNumber.cpp \ + qpid/framing/SequenceNumberSet.cpp \ qpid/framing/Correlator.cpp \ qpid/framing/Value.cpp \ qpid/framing/Proxy.cpp \ @@ -200,7 +201,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionAdapter.cpp \ qpid/broker/ConnectionFactory.cpp \ - qpid/broker/ConsumeAdapter.cpp \ qpid/broker/Daemon.cpp \ qpid/broker/DeliverableMessage.cpp \ qpid/broker/DeliveryRecord.cpp \ @@ -213,7 +213,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DtxWorkRecord.cpp \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ - qpid/broker/GetAdapter.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/InMemoryContent.cpp \ qpid/broker/LazyLoadedContent.cpp \ @@ -258,11 +257,11 @@ nobase_include_HEADERS = \ qpid/broker/BrokerMessageBase.h \ qpid/broker/BrokerQueue.h \ qpid/broker/CompletionHandler.h \ - qpid/broker/ConsumeAdapter.h \ qpid/broker/Consumer.h \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ qpid/broker/DeliveryAdapter.h \ + qpid/broker/DeliveryToken.h \ qpid/broker/DirectExchange.h \ qpid/broker/DtxAck.h \ qpid/broker/DtxBuffer.h \ @@ -272,7 +271,6 @@ nobase_include_HEADERS = \ qpid/broker/DtxWorkRecord.h \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ - qpid/broker/GetAdapter.h \ qpid/broker/HandlerImpl.h \ qpid/broker/InMemoryContent.h \ qpid/broker/MessageBuilder.h \ @@ -362,6 +360,7 @@ nobase_include_HEADERS = \ qpid/framing/Responder.h \ qpid/framing/SerializeHandler.h \ qpid/framing/SequenceNumber.h \ + qpid/framing/SequenceNumberSet.h \ qpid/framing/Value.h \ qpid/framing/Uuid.h \ qpid/framing/amqp_framing.h \ diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp index 376108193a..8edf448bc4 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -20,8 +20,7 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" #include "Connection.h" -#include "ConsumeAdapter.h" -#include "GetAdapter.h" +#include "DeliveryToken.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" @@ -325,8 +324,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //need to generate name here, so we have it for the adapter (it is //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); - channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), - newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag)); + channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); if(!nowait) client.consumeOk(newTag); @@ -357,8 +356,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); - GetAdapter out(adapter, queue, "", connection.getFrameMax()); - if(!channel.get(out, queue, !noAck)){ + DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue)); + if(!channel.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId); diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp index a598717c5d..c50fbd5559 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp @@ -49,9 +49,10 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : +Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) : id(_id), connection(con), + out(_out), currentDeliveryTag(1), prefetchSize(0), prefetchCount(0), @@ -76,7 +77,7 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, +void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) @@ -84,7 +85,7 @@ void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, token, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -97,7 +98,8 @@ void Channel::cancel(const string& tag){ consumers.erase(i); } -void Channel::close(){ +void Channel::close() +{ opened = false; consumers.clear(); if (dtxBuffer.get()) { @@ -106,11 +108,15 @@ void Channel::close(){ recover(true); } -void Channel::startTx(){ +void Channel::startTx() +{ txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Channel::commit(){ +void Channel::commit() +{ + if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { @@ -118,16 +124,21 @@ void Channel::commit(){ } } -void Channel::rollback(){ +void Channel::rollback() +{ + if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); + txBuffer->rollback(); accumulatedAck.clear(); } -void Channel::selectDtx(){ +void Channel::selectDtx() +{ dtxSelected = true; } -void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ +void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join) +{ if (!dtxSelected) { throw ConnectionException(503, "Channel has not been selected for use with dtx"); } @@ -140,7 +151,8 @@ void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ } } -void Channel::endDtx(const std::string& xid, bool fail){ +void Channel::endDtx(const std::string& xid, bool fail) +{ if (!dtxBuffer) { throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); } @@ -160,7 +172,8 @@ void Channel::endDtx(const std::string& xid, bool fail){ dtxBuffer.reset(); } -void Channel::suspendDtx(const std::string& xid){ +void Channel::suspendDtx(const std::string& xid) +{ if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") % dtxBuffer->getXid() % xid); @@ -171,7 +184,8 @@ void Channel::suspendDtx(const std::string& xid){ dtxBuffer->setSuspended(true); } -void Channel::resumeDtx(const std::string& xid){ +void Channel::resumeDtx(const std::string& xid) +{ if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") % dtxBuffer->getXid() % xid); @@ -199,20 +213,22 @@ void Channel::record(const DeliveryRecord& delivery) delivery.addTo(&outstanding); } -bool Channel::checkPrefetch(Message::shared_ptr& msg){ +bool Channel::checkPrefetch(Message::shared_ptr& msg) +{ Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token, const string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack - ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection), ackExpected(ack), blocked(false) {} -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg) +{ if(!connection || connection != msg->getPublisher()){//check for no_local if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ blocked = true; @@ -220,11 +236,10 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = false; Mutex::ScopedLock locker(parent->deliveryLock); - uint64_t deliveryTag = adapter->getNextDeliveryTag(); + uint64_t deliveryTag = parent->out.deliver(msg, token); if(ackExpected){ parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); } - adapter->deliver(msg, deliveryTag); return true; } @@ -234,14 +249,15 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { Mutex::ScopedLock locker(parent->deliveryLock); - adapter->deliver(msg, deliveryTag); + parent->out.redeliver(msg, token, deliveryTag); } Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } -void Channel::ConsumerImpl::cancel(){ +void Channel::ConsumerImpl::cancel() +{ if(queue) { queue->cancel(this); if (queue->canAutoDelete()) { @@ -251,27 +267,32 @@ void Channel::ConsumerImpl::cancel(){ } } -void Channel::ConsumerImpl::requestDispatch(){ +void Channel::ConsumerImpl::requestDispatch() +{ if(blocked) queue->requestDispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg){ +void Channel::handleInlineTransfer(Message::shared_ptr msg) +{ complete(msg); } -void Channel::handlePublish(Message* _message){ +void Channel::handlePublish(Message* _message) +{ Message::shared_ptr message(_message); messageBuilder.initialise(message); } -void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ +void Channel::handleHeader(AMQHeaderBody::shared_ptr header) +{ messageBuilder.setHeader(header); //at this point, decide based on the size of the message whether we want //to stage it by saving content directly to disk as it arrives } -void Channel::handleContent(AMQContentBody::shared_ptr content){ +void Channel::handleContent(AMQContentBody::shared_ptr content) +{ messageBuilder.addContent(content); } @@ -306,14 +327,16 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { } // Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple){ +void Channel::ack(uint64_t deliveryTag, bool multiple) +{ if (multiple) ack(0, deliveryTag); else ack(deliveryTag, deliveryTag); } -void Channel::ack(uint64_t firstTag, uint64_t lastTag){ +void Channel::ack(uint64_t firstTag, uint64_t lastTag) +{ if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -355,7 +378,8 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ } } -void Channel::recover(bool requeue){ +void Channel::recover(bool requeue) +{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ @@ -368,12 +392,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ +bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) +{ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); - adapter.deliver(msg, myDeliveryTag); + uint64_t myDeliveryTag = out.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h index a70dce0ce8..e9672c96d7 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.h +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h @@ -33,6 +33,7 @@ #include "Consumer.h" #include "DeliveryAdapter.h" #include "DeliveryRecord.h" +#include "DeliveryToken.h" #include "Deliverable.h" #include "DtxBuffer.h" #include "DtxManager.h" @@ -64,7 +65,7 @@ class Channel : public CompletionHandler class ConsumerImpl : public Consumer { Channel* parent; - std::auto_ptr<DeliveryAdapter> adapter; + DeliveryToken::shared_ptr token; const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; @@ -72,7 +73,7 @@ class Channel : public CompletionHandler bool blocked; public: - ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, + ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); ~ConsumerImpl(); @@ -86,6 +87,7 @@ class Channel : public CompletionHandler framing::ChannelId id; Connection& connection; + DeliveryAdapter& out; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; @@ -110,7 +112,7 @@ class Channel : public CompletionHandler void checkDtxTimeout(); public: - Channel(Connection& parent, framing::ChannelId id, MessageStore* const store = 0); + Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0); ~Channel(); bool isOpen() const { return opened; } @@ -127,11 +129,11 @@ class Channel : public CompletionHandler /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks, + void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected); + bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); void commit(); diff --git a/qpid/cpp/src/qpid/broker/BrokerMessage.cpp b/qpid/cpp/src/qpid/broker/BrokerMessage.cpp index d192b09a63..bf0e37e8e3 100644 --- a/qpid/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerMessage.cpp @@ -26,6 +26,7 @@ #include "InMemoryContent.h" #include "LazyLoadedContent.h" #include "MessageStore.h" +#include "BrokerQueue.h" #include "qpid/log/Statement.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/BasicGetOkBody.h" @@ -37,6 +38,30 @@ #include "qpid/framing/ChannelAdapter.h" #include "RecoveryManagerImpl.h" +namespace qpid{ +namespace broker{ + +struct BasicGetToken : DeliveryToken +{ + typedef boost::shared_ptr<BasicGetToken> shared_ptr; + + Queue::shared_ptr queue; + + BasicGetToken(Queue::shared_ptr q) : queue(q) {} +}; + +struct BasicConsumeToken : DeliveryToken +{ + typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; + + const string consumer; + + BasicConsumeToken(const string c) : consumer(c) {} +}; + +} +} + using namespace boost; using namespace qpid::broker; using namespace qpid::framing; @@ -74,6 +99,16 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } +DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) +{ + return DeliveryToken::shared_ptr(new BasicGetToken(queue)); +} + +DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer) +{ + return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); +} + void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize) @@ -86,23 +121,39 @@ void BasicMessage::deliver(ChannelAdapter& channel, } void BasicMessage::sendGetOk(ChannelAdapter& channel, - const std::string& /*destination*/, uint32_t messageCount, - uint64_t responseTo, + uint64_t /*responseTo*/, uint64_t deliveryTag, uint32_t framesize) { channel.send(make_shared_ptr( new BasicGetOkBody( channel.getVersion(), - responseTo, + //responseTo, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); sendContent(channel, framesize); } -void BasicMessage::sendContent( - ChannelAdapter& channel, uint32_t framesize) +void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize) +{ + BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token); + if (consume) { + deliver(channel, consume->consumer, deliveryTag, framesize); + } else { + BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token); + if (get) { + uint64_t request(1/*actual value doesn't affect anything at present*/); + sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize); + } else { + //TODO: + //either need to be able to convert to a message transfer or + //throw error of some kind to allow this to be handled higher up + } + } +} + +void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) { channel.send(header); Mutex::ScopedLock locker(contentLock); diff --git a/qpid/cpp/src/qpid/broker/BrokerMessage.h b/qpid/cpp/src/qpid/broker/BrokerMessage.h index 2e031d0bb2..e6483b4733 100644 --- a/qpid/cpp/src/qpid/broker/BrokerMessage.h +++ b/qpid/cpp/src/qpid/broker/BrokerMessage.h @@ -43,6 +43,7 @@ class AMQHeaderBody; namespace broker { class MessageStore; +class Queue; using framing::string; /** @@ -70,13 +71,16 @@ class BasicMessage : public Message { void addContent(framing::AMQContentBody::shared_ptr data); bool isComplete(); + static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue); + static DeliveryToken::shared_ptr createConsumeToken(const string& consumer); + void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); + void deliver(framing::ChannelAdapter&, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize); void sendGetOk(framing::ChannelAdapter& channel, - const std::string& destination, uint32_t messageCount, uint64_t responseTo, uint64_t deliveryTag, diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageBase.h b/qpid/cpp/src/qpid/broker/BrokerMessageBase.h index 73af3935a8..d9269fa94f 100644 --- a/qpid/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/qpid/cpp/src/qpid/broker/BrokerMessageBase.h @@ -25,6 +25,7 @@ #include <string> #include <boost/shared_ptr.hpp> #include "Content.h" +#include "DeliveryToken.h" #include "PersistableMessage.h" #include "qpid/framing/amqp_types.h" @@ -91,23 +92,9 @@ class Message : public PersistableMessage{ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } void redeliver() { redelivered = true; } - /** - * Used to deliver the message from the queue - */ - virtual void deliver(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize) = 0; - /** - * Used to return a message in response to a get from a queue - */ - virtual void sendGetOk(framing::ChannelAdapter& channel, - const std::string& destination, - uint32_t messageCount, - uint64_t responseTo, - uint64_t deliveryTag, - uint32_t framesize) = 0; - + virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/, + DeliveryToken::shared_ptr token, uint32_t framesize) = 0; + virtual bool isComplete() = 0; virtual uint64_t contentSize() const = 0; diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp index efa295e44f..8e8eaf23f0 100644 --- a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -34,10 +34,18 @@ #include <algorithm> using namespace std; +using namespace boost; using namespace qpid::framing; namespace qpid { namespace broker { + +struct MessageDeliveryToken : public DeliveryToken +{ + const std::string destination; + + MessageDeliveryToken(const std::string& d) : destination(d) {} +}; MessageMessage::MessageMessage( ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ @@ -179,22 +187,13 @@ void MessageMessage::transferMessage( channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); } -void MessageMessage::deliver( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t /*deliveryTag*/, - uint32_t framesize) + +void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize) { - transferMessage(channel, consumerTag, framesize); + transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize); } -void MessageMessage::sendGetOk( - framing::ChannelAdapter& channel, - const std::string& destination, - uint32_t /*messageCount*/, - uint64_t /*responseTo*/, - uint64_t /*deliveryTag*/, - uint32_t framesize) +void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize) { transferMessage(channel, destination, framesize); } @@ -321,6 +320,10 @@ MessageMessage::ReferencePtr MessageMessage::getReference() const { return reference; } +DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination) +{ + return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination)); +} }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h index c2d4b7f20b..612f457ae4 100644 --- a/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h +++ b/qpid/cpp/src/qpid/broker/BrokerMessageMessage.h @@ -53,17 +53,8 @@ class MessageMessage: public Message{ TransferPtr getTransfer() const { return transfer; } ReferencePtr getReference() const ; - void deliver(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize); - - void sendGetOk(framing::ChannelAdapter& channel, - const std::string& destination, - uint32_t messageCount, - uint64_t responseTo, - uint64_t deliveryTag, - uint32_t framesize); + void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); + void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize); bool isComplete(); @@ -81,6 +72,8 @@ class MessageMessage: public Message{ void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); + static DeliveryToken::shared_ptr getToken(const std::string& destination); + private: void transferMessage( framing::ChannelAdapter& channel, diff --git a/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp b/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp deleted file mode 100644 index 59b6795a77..0000000000 --- a/qpid/cpp/src/qpid/broker/ConsumeAdapter.cpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConsumeAdapter.h" - -using namespace qpid::broker; -using qpid::framing::ChannelAdapter; -using qpid::framing::RequestId; - -ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {} - -RequestId ConsumeAdapter::getNextDeliveryTag() -{ - return adapter.getNextSendRequestId(); -} - -void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag) -{ - msg->deliver(adapter, tag, deliveryTag, framesize); -} diff --git a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h index 45b103bd68..971f4095cf 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h @@ -22,11 +22,13 @@ #define _DeliveryAdapter_ #include "BrokerMessageBase.h" +#include "DeliveryToken.h" #include "qpid/framing/amqp_types.h" namespace qpid { namespace broker { + typedef framing::RequestId DeliveryId; /** * The intention behind this interface is to separate the generic * handling of some form of message delivery to clients that is @@ -40,8 +42,8 @@ namespace broker { class DeliveryAdapter { public: - virtual framing::RequestId getNextDeliveryTag() = 0; - virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0; + virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0; + virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0; virtual ~DeliveryAdapter(){} }; diff --git a/qpid/cpp/src/qpid/broker/ConsumeAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryToken.h index 43cda7753e..8bdf5e6359 100644 --- a/qpid/cpp/src/qpid/broker/ConsumeAdapter.h +++ b/qpid/cpp/src/qpid/broker/DeliveryToken.h @@ -18,23 +18,25 @@ * under the License. * */ -#ifndef _ConsumeAdapter_ -#define _ConsumeAdapter_ +#ifndef _DeliveryToken_ +#define _DeliveryToken_ -#include "DeliveryAdapter.h" -#include "qpid/framing/ChannelAdapter.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { - class ConsumeAdapter : public DeliveryAdapter + + /** + * A DeliveryToken allows the delivery of a message to be + * associated with whatever mechanism caused it to be + * delivered. (i.e. its a form of Memento). + */ + class DeliveryToken { - framing::ChannelAdapter& adapter; - const std::string tag; - const uint32_t framesize; public: - ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize); - framing::RequestId getNextDeliveryTag(); - void deliver(Message::shared_ptr& msg, framing::RequestId tag); + typedef boost::shared_ptr<DeliveryToken> shared_ptr; + + virtual ~DeliveryToken(){} }; }} diff --git a/qpid/cpp/src/qpid/broker/GetAdapter.cpp b/qpid/cpp/src/qpid/broker/GetAdapter.cpp deleted file mode 100644 index bbffade712..0000000000 --- a/qpid/cpp/src/qpid/broker/GetAdapter.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "GetAdapter.h" -#include "qpid/framing/MethodContext.h" - -using namespace qpid::broker; -using qpid::framing::ChannelAdapter; -using qpid::framing::RequestId; -using qpid::framing::MethodContext; - -GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f) - : adapter(a), queue(q), destination(d), framesize(f) {} - -RequestId GetAdapter::getNextDeliveryTag() -{ - return adapter.getNextSendRequestId(); -} - -void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag) -{ - msg->sendGetOk(adapter, destination, queue->getMessageCount(), 1, deliveryTag, framesize); -} diff --git a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 41dd8cc145..da57439e21 100644 --- a/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -21,8 +21,6 @@ #include "BrokerChannel.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" -#include "ConsumeAdapter.h" -#include "GetAdapter.h" #include "Broker.h" #include "BrokerMessageMessage.h" #include "qpid/framing/MessageAppendBody.h" @@ -45,66 +43,44 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - //client.ok(); } void -MessageHandlerImpl::open(const string& reference) +MessageHandlerImpl::open(const string& /*reference*/) { - references.open(reference); - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::append(const framing::MethodContext& context) +MessageHandlerImpl::append(const framing::MethodContext& /*context*/) { - MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); - references.get(body->getReference())->append(body); - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::close(const string& reference) +MessageHandlerImpl::close(const string& /*reference*/) { - Reference::shared_ptr ref = references.get(reference); - //client.ok(); - - // Send any transfer messages to their correct exchanges and okay them - const Reference::Messages& msgs = ref->getMessages(); - for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { - channel.handleInlineTransfer(*m); - client.setResponseTo((*m)->getRequestId()); - client.ok(); - } - ref->close(); + throw ConnectionException(540, "References no longer supported"); } void MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { - // Initial implementation (which is conforming) is to do nothing here - // and return offset zero for the resume - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::resume(const string& reference, +MessageHandlerImpl::resume(const string& /*reference*/, const string& /*identifier*/ ) { - // Initial (null) implementation - // open reference and return 0 offset - references.open(reference); - client.offset(0); + throw ConnectionException(540, "References no longer supported"); } void MessageHandlerImpl::offset(uint64_t /*value*/ ) { - // Shouldn't ever receive this as it is reponse to resume - // which is never sent - // TODO astitcher 2007-02-16 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "impossible"); + throw ConnectionException(540, "References no longer supported"); } void @@ -120,14 +96,12 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), - tag, queue, !noAck, exclusive, - noLocal ? &connection : 0, &filter); - //client.ok(); + channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } + void MessageHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, @@ -136,11 +110,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, { Queue::shared_ptr queue = getQueue(queueName); - GetAdapter out(adapter, queue, destination, connection.getFrameMax()); - if(channel.get(out, queue, !noAck)) { - client.ok(); + if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){ + //don't send any response... rely on execution completion } else { - client.empty(); + //temporarily disabled: + //client.empty(); } } @@ -167,14 +141,12 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - //client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - //client.ok(); } void @@ -193,11 +165,8 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) if (transfer->getBody().isInline()) { MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(); } else { - Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); - MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); - ref->addMessage(message); + throw ConnectionException(540, "References no longer supported"); } } diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp index 2b1de1bbc0..e9ec698400 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.cpp @@ -25,10 +25,11 @@ using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : connection(c), - channel(c, id, &c.broker.getStore()) + channel(c, *this, id, &c.broker.getStore()) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); @@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ } } -void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/) +void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) { - //just record it for now (will eventually need to use it to ack messages): - outgoing.lwm = SequenceNumber(mark); + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + channel.ack(mark.getValue(), true); + //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; + } + if (range.size() % 2) { //must be even number + throw ConnectionException(530, "Received odd number of elements in ranged mark"); + } else { + //TODO: need to keep a record of the full range previously acked + for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { + channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + } + } } void SemanticHandler::flush() @@ -86,8 +101,8 @@ void SemanticHandler::flush() //flush doubles as a sync to begin with - send an execution.complete incoming.lwm = incoming.hwm; if (isOpen()) { - /*use dummy value for range which is not yet encoded correctly*/ - send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0))); + Mutex::ScopedLock l(outLock); + ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } } @@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartb channel.handleHeartbeat(body); } +DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) +{ + Mutex::ScopedLock l(outLock); + SequenceNumber copy(outgoing.hwm); + ++copy; + msg->deliver(*this, copy.getValue(), token, connection.getFrameMax()); + //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + return outgoing.hwm.getValue(); +} + +void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) +{ + msg->deliver(*this, tag, token, connection.getFrameMax()); +} + +RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action) +{ + Mutex::ScopedLock l(outLock); + uint8_t type(body->type()); + if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) { + ++outgoing.hwm; + //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; + } + return ChannelAdapter::send(body, action); +} diff --git a/qpid/cpp/src/qpid/broker/SemanticHandler.h b/qpid/cpp/src/qpid/broker/SemanticHandler.h index a57559d043..b863b3486e 100644 --- a/qpid/cpp/src/qpid/broker/SemanticHandler.h +++ b/qpid/cpp/src/qpid/broker/SemanticHandler.h @@ -24,6 +24,7 @@ #include <memory> #include "BrokerChannel.h" #include "Connection.h" +#include "DeliveryAdapter.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" @@ -36,6 +37,7 @@ class BrokerAdapter; class framing::ChannelAdapter; class SemanticHandler : private framing::ChannelAdapter, + private DeliveryAdapter, public framing::FrameHandler, public framing::AMQP_ServerOperations::ExecutionHandler { @@ -44,6 +46,7 @@ class SemanticHandler : private framing::ChannelAdapter, std::auto_ptr<BrokerAdapter> adapter; framing::Window incoming; framing::Window outgoing; + sys::Mutex outLock; void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, const qpid::framing::MethodContext& context); @@ -55,12 +58,22 @@ class SemanticHandler : private framing::ChannelAdapter, void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); + + framing::RequestId send(shared_ptr<framing::AMQBody> body, framing::Correlator::Action action=framing::Correlator::Action()); + + + //delivery adapter methods: + DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); + void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag); + public: SemanticHandler(framing::ChannelId id, Connection& c); + + //frame handler: void handle(framing::AMQFrame& frame); //execution class method handlers: - void complete(uint32_t cumulativeExecutionMark, uint16_t); + void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range); void flush(); }; diff --git a/qpid/cpp/src/qpid/client/ClientChannel.cpp b/qpid/cpp/src/qpid/client/ClientChannel.cpp index 0033cbdbe4..19b4726a72 100644 --- a/qpid/cpp/src/qpid/client/ClientChannel.cpp +++ b/qpid/cpp/src/qpid/client/ClientChannel.cpp @@ -77,7 +77,7 @@ void Channel::protocolInit( connection->connector->init(); // Send ProtocolInit block. ConnectionStartBody::shared_ptr connectionStart = responses.receive<ConnectionStartBody>(); - + FieldTable props; string mechanism("PLAIN"); string response = ((char)0) + uid + ((char)0) + pwd; @@ -85,7 +85,7 @@ void Channel::protocolInit( ConnectionTuneBody::shared_ptr proposal = sendAndReceive<ConnectionTuneBody>( make_shared_ptr(new ConnectionStartOkBody( - version, connectionStart->getRequestId(), + version, //connectionStart->getRequestId(), props, mechanism, response, locale))); @@ -98,7 +98,7 @@ void Channel::protocolInit( **/ sendCommand(make_shared_ptr(new ConnectionTuneOkBody( - version, proposal->getRequestId(), + version, //proposal->getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(), proposal->getHeartbeat()))); @@ -222,10 +222,10 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt) } } -void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) { +void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) { switch (method->amqpMethodId()) { case ChannelCloseBody::METHOD_ID: - sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId()))); + sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/))); peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); return; case ChannelFlowBody::METHOD_ID: diff --git a/qpid/cpp/src/qpid/cluster/SessionManager.cpp b/qpid/cpp/src/qpid/cluster/SessionManager.cpp index 9f6438cf92..88ddfe843f 100644 --- a/qpid/cpp/src/qpid/cluster/SessionManager.cpp +++ b/qpid/cpp/src/qpid/cluster/SessionManager.cpp @@ -36,7 +36,7 @@ using namespace sys; using namespace broker; /** Handler to send frames direct to local broker (bypass correlation etc.) */ -struct BrokerHandler : public FrameHandler, private ChannelAdapter { + struct BrokerHandler : public FrameHandler, private ChannelAdapter, private DeliveryAdapter { Connection connection; Channel channel; BrokerAdapter adapter; @@ -51,7 +51,7 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter { // BrokerHandler(Broker& broker) : connection(0, broker), - channel(connection, 1, 0), + channel(connection, *this, 1, 0), adapter(channel, connection, broker, *this) {} void handle(AMQFrame& frame) { @@ -68,6 +68,10 @@ struct BrokerHandler : public FrameHandler, private ChannelAdapter { virtual void handleMethodInContext(shared_ptr<AMQMethodBody>, const MethodContext&){} // No-op send. virtual RequestId send(shared_ptr<AMQBody>, Correlator::Action) { return 0; } + + //delivery adapter methods, also no-ops: + virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; } + virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {} }; /** Wrap plain AMQFrames in SessionFrames */ diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp index 04941eaa58..eb34d48c5f 100644 --- a/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.cpp @@ -60,4 +60,9 @@ void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) { decodeContent(buffer); } +void AMQMethodBody::encode(Buffer& buffer) const { + encodeId(buffer); + encodeContent(buffer); +} + }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/AMQMethodBody.h b/qpid/cpp/src/qpid/framing/AMQMethodBody.h index 55cf5cb864..2b46c6ea00 100644 --- a/qpid/cpp/src/qpid/framing/AMQMethodBody.h +++ b/qpid/cpp/src/qpid/framing/AMQMethodBody.h @@ -47,6 +47,7 @@ class AMQMethodBody : public AMQBody AMQMethodBody(ProtocolVersion ver) : version(ver) {} virtual ~AMQMethodBody() {} void decode(Buffer&, uint32_t); + virtual void encode(Buffer& buffer) const; virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; @@ -64,8 +65,8 @@ class AMQMethodBody : public AMQBody virtual bool isRequest() const { return false; } virtual bool isResponse() const { return false; } - protected: static uint32_t baseSize() { return 4; } + protected: struct ClassMethodId { uint16_t classId; @@ -76,6 +77,9 @@ class AMQMethodBody : public AMQBody void encodeId(Buffer& buffer) const; virtual void encodeContent(Buffer& buffer) const = 0; virtual void decodeContent(Buffer& buffer) = 0; + + virtual void printPrefix(std::ostream&) const {} + }; diff --git a/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp new file mode 100644 index 0000000000..357b5dabd7 --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SequenceNumberSet.h" + +using namespace qpid::framing; + +void SequenceNumberSet::encode(Buffer& buffer) const +{ + buffer.putShort(size()); + for (const_iterator i = begin(); i != end(); i++) { + buffer.putLong(i->getValue()); + } +} + +void SequenceNumberSet::decode(Buffer& buffer) +{ + uint16_t count = buffer.getShort(); + for (uint16_t i = 0; i < count; i++) { + push_back(SequenceNumber(buffer.getLong())); + } +} + +uint32_t SequenceNumberSet::encodedSize() const +{ + return 2 /*count*/ + (size() * 4); +} + +namespace qpid{ +namespace framing{ + +std::ostream& operator<<(std::ostream& out, const SequenceNumberSet& set) { + out << "{"; + for (SequenceNumberSet::const_iterator i = set.begin(); i != set.end(); i++) { + if (i != set.begin()) out << ", "; + out << (i->getValue()); + } + out << "}"; + return out; +} + +} +} diff --git a/qpid/cpp/src/qpid/broker/GetAdapter.h b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h index e90619a5f3..bcf78d4f22 100644 --- a/qpid/cpp/src/qpid/broker/GetAdapter.h +++ b/qpid/cpp/src/qpid/framing/SequenceNumberSet.h @@ -18,30 +18,32 @@ * under the License. * */ -#ifndef _GetAdapter_ -#define _GetAdapter_ +#ifndef _framing_SequenceNumberSet_h +#define _framing_SequenceNumberSet_h -#include "BrokerQueue.h" -#include "DeliveryAdapter.h" -#include "qpid/framing/ChannelAdapter.h" +#include <ostream> +#include <vector> +#include "amqp_types.h" +#include "Buffer.h" +#include "SequenceNumber.h" namespace qpid { -namespace broker { - - class GetAdapter : public DeliveryAdapter - { - framing::ChannelAdapter& adapter; - Queue::shared_ptr queue; - const std::string destination; - const uint32_t framesize; - public: - GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize); - ~GetAdapter(){} - framing::RequestId getNextDeliveryTag(); - void deliver(Message::shared_ptr& msg, framing::RequestId tag); - }; - -}} +namespace framing { + +class SequenceNumberSet : public std::vector<SequenceNumber> +{ +public: + typedef std::vector<SequenceNumber>::const_iterator const_iterator; + typedef std::vector<SequenceNumber>::iterator iterator; + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + uint32_t encodedSize() const; + + friend std::ostream& operator<<(std::ostream&, const SequenceNumberSet&); +}; + +}} // namespace qpid::framing #endif diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h index efb720f047..ff75b28468 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types.h +++ b/qpid/cpp/src/qpid/framing/amqp_types.h @@ -53,6 +53,7 @@ typedef uint16_t ReplyCode; // Types represented by classes. class Content; class FieldTable; +class SequenceNumberSet; // Useful constants diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp index 05bdb7b3f0..eb67601875 100644 --- a/qpid/cpp/src/tests/BrokerChannelTest.cpp +++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp @@ -48,30 +48,21 @@ struct MockHandler : ConnectionOutputHandler{ void close() {}; }; -struct DeliveryRecorder +struct DeliveryRecorder : DeliveryAdapter { - typedef std::pair<Message::shared_ptr, RequestId> Delivery; + DeliveryId id; + typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery; std::vector<Delivery> delivered; - struct Adapter : DeliveryAdapter + DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { - RequestId id; - DeliveryRecorder& recorder; - - Adapter(DeliveryRecorder& r) : recorder(r) {} - - RequestId getNextDeliveryTag() { return id + 1; } - void deliver(Message::shared_ptr& msg, RequestId tag) - { - recorder.delivered.push_back(Delivery(msg, tag)); - id++; - } - - }; + delivered.push_back(Delivery(msg, token)); + return ++id; + } - std::auto_ptr<DeliveryAdapter> createAdapter() + void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/) { - return std::auto_ptr<DeliveryAdapter>(new Adapter(*this)); + delivered.push_back(Delivery(msg, token)); } }; @@ -166,6 +157,7 @@ class BrokerChannelTest : public CppUnit::TestCase } }; + DeliveryRecorder recorder; public: @@ -179,13 +171,13 @@ class BrokerChannelTest : public CppUnit::TestCase void testConsumerMgmt(){ Queue::shared_ptr queue(new Queue("my_queue")); - Channel channel(connection, 0, 0); + Channel channel(connection, recorder, 0, 0); channel.open(); CPPUNIT_ASSERT(!channel.exists("my_consumer")); ConnectionToken* owner = 0; string tag("my_consumer"); - std::auto_ptr<DeliveryAdapter> unused; + DeliveryToken::shared_ptr unused; channel.consume(unused, tag, queue, false, false, owner); string tagA; string tagB; @@ -205,24 +197,25 @@ class BrokerChannelTest : public CppUnit::TestCase } void testDeliveryNoAck(){ - Channel channel(connection, 7); + Channel channel(connection, recorder, 7); Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); Queue::shared_ptr queue(new Queue("my_queue")); - DeliveryRecorder recorder; string tag("test"); - channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + channel.consume(token, tag, queue, false, false, 0); queue->deliver(msg); sleep(2); CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); + CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } void testStaging(){ MockMessageStore store; connection.setFrameMax(1000); connection.setStagingThreshold(10); - Channel channel(connection, 1, &store); + Channel channel(connection, recorder, 1, &store); const string data[] = {"abcde", "fghij", "klmno"}; Message* msg = new BasicMessage(0, "my_exchange", "my_routing_key", false, false); @@ -314,7 +307,7 @@ class BrokerChannelTest : public CppUnit::TestCase } void testFlow(){ - Channel channel(connection, 7); + Channel channel(connection, recorder, 7); channel.open(); //there will always be a connection-start frame CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size()); @@ -327,9 +320,9 @@ class BrokerChannelTest : public CppUnit::TestCase Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14)); addContent(msg, data); Queue::shared_ptr queue(new Queue("my_queue")); - DeliveryRecorder recorder; string tag("test"); - channel.consume(recorder.createAdapter(), tag, queue, false, false, 0); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken("my-token")); + channel.consume(token, tag, queue, false, false, 0); channel.flow(false); queue->deliver(msg); //ensure no messages have been delivered @@ -340,6 +333,7 @@ class BrokerChannelTest : public CppUnit::TestCase //ensure no messages have been delivered CPPUNIT_ASSERT_EQUAL((size_t) 1, recorder.delivered.size()); CPPUNIT_ASSERT_EQUAL(msg, recorder.delivered.front().first); + CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second); } Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize) diff --git a/qpid/cpp/src/tests/FramingTest.cpp b/qpid/cpp/src/tests/FramingTest.cpp index 134acb94f9..98f89b59be 100644 --- a/qpid/cpp/src/tests/FramingTest.cpp +++ b/qpid/cpp/src/tests/FramingTest.cpp @@ -108,7 +108,7 @@ class FramingTest : public CppUnit::TestCase { std::string a = "hostA"; std::string b = "hostB"; - ConnectionRedirectBody in(version, 0, a, b); + ConnectionRedirectBody in(version, a, b); in.encodeContent(buffer); buffer.flip(); ConnectionRedirectBody out(version); @@ -146,7 +146,7 @@ class FramingTest : public CppUnit::TestCase std::string a = "hostA"; std::string b = "hostB"; AMQFrame in(version, 999, - new ConnectionRedirectBody(version, 0, a, b)); + new ConnectionRedirectBody(version, a, b)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -157,7 +157,7 @@ class FramingTest : public CppUnit::TestCase void testBasicConsumeOkBodyFrame() { std::string s = "hostA"; - AMQFrame in(version, 999, new BasicConsumeOkBody(version, 0, s)); + AMQFrame in(version, 999, new BasicConsumeOkBody(version, s)); in.encode(buffer); buffer.flip(); AMQFrame out; @@ -400,22 +400,22 @@ class FramingTest : public CppUnit::TestCase c.declareQueue(queue); c.bind(exchange, queue, "MyTopic", framing::FieldTable()); broker::InProcessBroker::Conversation::const_iterator i = ibroker.conversation.begin(); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=1,mark=0): ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; request(id=2,mark=1): ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; response(id=2,request=2,batch=0): ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=0; request(id=1,mark=0): ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=0; response(id=1,request=1,batch=0): ConnectionOpenOk: knownHosts=]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=1,mark=0): ChannelOpen: outOfBand=]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=1,request=1,batch=0): ChannelOpenOk: ]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=2,mark=1): ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=3,mark=1): ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; request(id=1,mark=0): ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=4,mark=1): QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; response(id=2,request=4,batch=0): QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=5,mark=2): QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); - ASSERT_FRAME("CLIENT: Frame[channel=1; request(id=6,mark=2): ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; request(id=2,mark=0): ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet=0]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionStart: versionMajor=0; versionMinor=10; serverProperties={}; mechanisms=PLAIN; locales=en_US]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionStartOk: clientProperties={}; mechanism=PLAIN; response=\000guest\000guest; locale=en_US]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionTune: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionTuneOk: channelMax=32767; frameMax=65536; heartbeat=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=0; ConnectionOpen: virtualHost=/; capabilities=; insist=1]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=0; ConnectionOpenOk: knownHosts=]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ChannelOpen: outOfBand=]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++); } }; diff --git a/qpid/python/cpp_failing_0-10.txt b/qpid/python/cpp_failing_0-10.txt index 798d1769eb..e68f942d67 100644 --- a/qpid/python/cpp_failing_0-10.txt +++ b/qpid/python/cpp_failing_0-10.txt @@ -1,4 +1,15 @@ -tests_0-10.message.MessageTests.test_checkpoint tests_0-10.message.MessageTests.test_reject tests_0-10.basic.BasicTests.test_get +tests_0-10.message.MessageTests.test_get +tests_0-10.message.MessageTests.test_checkpoint +tests_0-10.message.MessageTests.test_empty_reference +tests_0-10.message.MessageTests.test_reference_already_opened_error +tests_0-10.message.MessageTests.test_reference_completion +tests_0-10.message.MessageTests.test_reference_large +tests_0-10.message.MessageTests.test_reference_multi_transfer +tests_0-10.message.MessageTests.test_reference_simple +tests_0-10.message.MessageTests.test_reference_unopened_on_append_error +tests_0-10.message.MessageTests.test_reference_unopened_on_close_error +tests_0-10.message.MessageTests.test_reference_unopened_on_transfer_error + diff --git a/qpid/python/qpid/codec.py b/qpid/python/qpid/codec.py index a5228e8003..a0d9696c8b 100644 --- a/qpid/python/qpid/codec.py +++ b/qpid/python/qpid/codec.py @@ -329,12 +329,6 @@ class Codec: return ReferenceId(self.decode_longstr()) # new domains for 0-10: - - def encode_uuid(self, s): - self.encode_longstr(s) - - def decode_uuid(self): - return self.decode_longstr() def encode_rfc1982_long(self, s): self.encode_long(s) @@ -342,10 +336,21 @@ class Codec: def decode_rfc1982_long(self): return self.decode_long() - #Not done yet def encode_rfc1982_long_set(self, s): - self.encode_short(0) + self.encode_short(len(s)) + for i in s: + self.encode_long(i) def decode_rfc1982_long_set(self): - self.decode_short() - return 0; + count = self.decode_short() + set = [] + for i in range(0, count): + set.append(self.decode_long()) + return set; + + #not correct for 0-10 yet + def encode_uuid(self, s): + self.encode_longstr(s) + + def decode_uuid(self): + return self.decode_longstr() diff --git a/qpid/python/qpid/message.py b/qpid/python/qpid/message.py index f80293180e..970ab9d974 100644 --- a/qpid/python/qpid/message.py +++ b/qpid/python/qpid/message.py @@ -26,7 +26,10 @@ class Message: self.frame = frame self.method = frame.method_type self.content = content - + if self.method.klass.name != "execution": + self.command_id = self.channel.incoming_completion.sequence.next() + #print "allocated: ", self.command_id, "to ", self.method.klass.name, "_", self.method.name + def __len__(self): return len(self.frame.args) @@ -66,3 +69,6 @@ class Message: def __repr__(self): return Message.REPR % (self.method, self.frame.args, self.content) + + def complete(self, cumulative=True): + self.channel.incoming_completion.complete(mark=self.command_id, cumulative=cumulative) diff --git a/qpid/python/qpid/peer.py b/qpid/python/qpid/peer.py index 3927f20667..bedc96895b 100644 --- a/qpid/python/qpid/peer.py +++ b/qpid/python/qpid/peer.py @@ -30,6 +30,7 @@ from message import Message from queue import Queue, Closed as QueueClosed from content import Content from cStringIO import StringIO +from time import time class Sequence: @@ -186,11 +187,11 @@ class Channel: self.requester = Requester(self.write) self.responder = Responder(self.write) - self.completion = ExecutionCompletion() + self.completion = OutgoingCompletion() + self.incoming_completion = IncomingCompletion(self) # Use reliable framing if version == 0-9. - # (also for 0-10 while transitioning...) - self.reliable = (spec.major == 0 and (spec.minor == 9 or spec.minor == 10)) + self.reliable = (spec.major == 0 and spec.minor == 9) self.use_execution_layer = (spec.major == 0 and spec.minor == 10) self.synchronous = True @@ -202,6 +203,7 @@ class Channel: self.incoming.close() self.responses.close() self.completion.close() + self.incoming_completion.reset() def write(self, frame, content = None): if self.closed: @@ -252,6 +254,9 @@ class Channel: self.responder.respond(method, batch, request) def invoke(self, type, args, kwargs): + if type.klass.name == "channel" and (type.name == "close" or type.name == "open"): + self.completion.reset() + self.incoming_completion.reset() self.completion.next_command(type) content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) @@ -306,6 +311,13 @@ class Channel: return Message(self, resp, content) else: raise ValueError(resp) + elif self.synchronous and not frame.method.response \ + and self.use_execution_layer and frame.method.klass.name != "execution": + self.execution_flush() + self.completion.wait() + if self.closed: + raise Closed(self.reason) + except QueueClosed, e: if self.closed: raise Closed(self.reason) @@ -349,21 +361,32 @@ class Future: def is_complete(self): return self.completed.isSet() -class ExecutionCompletion: +class OutgoingCompletion: + """ + Manages completion of outgoing commands i.e. command sent by this peer + """ + def __init__(self): self.condition = threading.Condition() - self.sequence = Sequence(1) - self.command_id = 0 - self.mark = 0 + + self.sequence = Sequence(1) #issues ids for outgoing commands + self.command_id = 0 #last issued id + self.mark = 0 #commands up to this mark are known to be complete + self.closed = False def next_command(self, method): #the following test is a hack until the track/sub-channel is available if method.klass.name != "execution": self.command_id = self.sequence.next() + def reset(self): + self.sequence = Sequence(1) #reset counter + def close(self): + self.reset() self.condition.acquire() try: + self.closed = True self.condition.notifyAll() finally: self.condition.release() @@ -378,11 +401,50 @@ class ExecutionCompletion: def wait(self, point_of_interest=-1, timeout=None): if point_of_interest == -1: point_of_interest = self.command_id + start_time = time() + remaining = timeout self.condition.acquire() try: - if point_of_interest > self.mark: - self.condition.wait(timeout) + while not self.closed and point_of_interest > self.mark: + #print "waiting for ", point_of_interest, " mark is currently at ", self.mark + self.condition.wait(remaining) + if timeout: + if start_time + timeout > time(): break + else: remaining = timeout - (time() - start_time) finally: self.condition.release() - #todo: retry until timed out or closed return point_of_interest <= self.mark + +class IncomingCompletion: + """ + Manages completion of incoming commands i.e. command received by this peer + """ + + def __init__(self, channel): + self.sequence = Sequence(1) #issues ids for incoming commands + self.mark = 0 #id of last command of whose completion notification was sent to the other peer + self.channel = channel + + def next_id(self, method): + #the following test is a hack until the track/sub-channel is available + if method.klass.name != "execution": + return self.sequence.next() + else: + return 0 + + def reset(self): + self.sequence = Sequence(1) #reset counter + + def complete(self, mark, cumulative=True): + if cumulative: + if mark > self.mark: + self.mark = mark + self.channel.execution_complete(cumulative_execution_mark=self.mark) + else: + #TODO: record and manage the ranges properly + range = [mark, mark] + self.channel.execution_complete(cumulative_execution_mark=self.mark, ranged_execution_set=range) + + + + diff --git a/qpid/python/qpid/spec.py b/qpid/python/qpid/spec.py index c537401164..09e7dc9d0b 100644 --- a/qpid/python/qpid/spec.py +++ b/qpid/python/qpid/spec.py @@ -240,7 +240,7 @@ class Method(Metadata): "content": None, "uuid": "", "rfc1982_long": 0, - "rfc1982_long_set": 0 + "rfc1982_long_set": [] } def define_method(self, name): diff --git a/qpid/python/tests_0-10/broker.py b/qpid/python/tests_0-10/broker.py index 684b36597e..6bc2f7ceb8 100644 --- a/qpid/python/tests_0-10/broker.py +++ b/qpid/python/tests_0-10/broker.py @@ -48,7 +48,7 @@ class BrokerTests(TestBase): body = "test ack" ch.message_transfer(routing_key = "otherqueue", body = body) msg = self.client.queue(ctag).get(timeout = 5) - msg.ok() + msg.complete() self.assert_(msg.body == body) def test_simple_delivery_immediate(self): diff --git a/qpid/python/tests_0-10/dtx.py b/qpid/python/tests_0-10/dtx.py index c0d1bd2b74..2835d703ae 100644 --- a/qpid/python/tests_0-10/dtx.py +++ b/qpid/python/tests_0-10/dtx.py @@ -40,6 +40,11 @@ class DtxTests(TestBase): XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 XA_OK = 8 + tx_counter = 0 + + def reset_channel(self): + self.channel.channel_close() + self.channel.channel_open() def test_simple_commit(self): """ @@ -56,6 +61,9 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status) + #should close and reopen channel to ensure no unacked messages are held + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -79,6 +87,8 @@ class DtxTests(TestBase): #commit self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status) + self.reset_channel() + #check result self.assertMessageCount(0, "queue-a") self.assertMessageCount(1, "queue-b") @@ -100,6 +110,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -123,6 +135,8 @@ class DtxTests(TestBase): #rollback self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status) + self.reset_channel() + #check result self.assertMessageCount(1, "queue-a") self.assertMessageCount(0, "queue-b") @@ -191,6 +205,8 @@ class DtxTests(TestBase): channel = self.channel #do some transactional work & complete the transaction self.test_simple_commit() + # channel has been reset, so reselect for use with dtx + channel.dtx_demarcation_select() #start association for the same xid as the previously completed txn tx = self.xid("my-xid") @@ -355,7 +371,7 @@ class DtxTests(TestBase): self.assertEqual("two", msg.message_id) channel.message_cancel(destination="results") #ack the message then close the channel - msg.ok() + msg.complete() channel.channel_close() channel = self.channel @@ -446,7 +462,7 @@ class DtxTests(TestBase): channel2.dtx_demarcation_select() channel2.dtx_demarcation_start(xid=tx) channel2.message_get(queue="dummy", destination="dummy") - self.client.queue("dummy").get(timeout=1).ok() + self.client.queue("dummy").get(timeout=1).complete() channel2.message_transfer(routing_key="dummy", body="whatever") channel2.channel_close() @@ -548,7 +564,9 @@ class DtxTests(TestBase): channel.dtx_coordination_rollback(xid=x) self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra)) - def xid(self, txid, branchqual = ''): + def xid(self, txid): + DtxTests.tx_counter += 1 + branchqual = "v%s" % DtxTests.tx_counter return pack('LBB', 0, len(txid), len(branchqual)) + txid + branchqual def txswap(self, tx, id): @@ -573,7 +591,7 @@ class DtxTests(TestBase): #consume from src: channel.message_get(destination="temp-swap", queue=src) msg = self.client.queue("temp-swap").get(timeout=1) - msg.ok(); + msg.complete(); #re-publish to dest channel.message_transfer(routing_key=dest, message_id=msg.message_id, body=msg.body) diff --git a/qpid/python/tests_0-10/example.py b/qpid/python/tests_0-10/example.py index 7ab4cc7d0a..dc71b0590b 100644 --- a/qpid/python/tests_0-10/example.py +++ b/qpid/python/tests_0-10/example.py @@ -90,5 +90,5 @@ class ExampleTest (TestBase): self.assertEqual(body, msg.body) # Now acknowledge the message. - msg.ok() + msg.complete() diff --git a/qpid/python/tests_0-10/message.py b/qpid/python/tests_0-10/message.py index b25016e680..74e2b6416f 100644 --- a/qpid/python/tests_0-10/message.py +++ b/qpid/python/tests_0-10/message.py @@ -171,8 +171,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok(batchoffset=1)#One and Two - msg4.ok() + msg2.complete(cumulative=True)#One and Two + msg4.complete(cumulative=False) channel.message_recover(requeue=False) @@ -215,8 +215,8 @@ class MessageTests(TestBase): self.assertEqual("Four", msg4.body) self.assertEqual("Five", msg5.body) - msg1.ok(batchoffset=1) #One and Two - msg4.ok() #Four + msg2.complete(cumulative=True) #One and Two + msg4.complete(cumulative=False) #Four channel.message_cancel(destination="consumer_tag") @@ -276,14 +276,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - #todo: once batching is implmented, send a single response for all messages - msg.ok(batchoffset=-4)#1-5 + msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-4)#6-10 + msg.complete() try: extra = queue.get(timeout=1) @@ -320,13 +319,13 @@ class MessageTests(TestBase): except Empty: None #ack messages and check that the next set arrive ok: - msg.ok(batchoffset=-4)#1-5 + msg.complete() for i in range(6, 11): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-4)#6-10 + msg.complete() try: extra = queue.get(timeout=1) @@ -376,9 +375,9 @@ class MessageTests(TestBase): self.assertEqual("Message %d" % i, msg.body) if (i==13): - msg.ok(batchoffset=-2)#11, 12 & 13 + msg.complete()#11, 12 & 13 if(i in [15, 17, 19]): - msg.ok() + msg.complete(cumulative=False) reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") @@ -395,8 +394,7 @@ class MessageTests(TestBase): self.assertEqual(reply.method.name, "ok") msg = self.client.queue(tag).get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() - #channel.message_ack(delivery_tag=reply.delivery_tag) + msg.complete() reply = channel.message_get(no_ack=True, queue="test-get") self.assertEqual(reply.method.klass.name, "message") diff --git a/qpid/python/tests_0-10/tx.py b/qpid/python/tests_0-10/tx.py index 0f6b4f5cd1..b499c2d1f9 100644 --- a/qpid/python/tests_0-10/tx.py +++ b/qpid/python/tests_0-10/tx.py @@ -30,23 +30,39 @@ class TxTests(TestBase): """ Test that commited publishes are delivered and commited acks are not re-delivered """ + channel2 = self.client.channel(2) + channel2.channel_open() + self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + channel2.tx_commit() + channel2.channel_close() + + #use a different channel with new subscriptions to ensure + #there is no redelivery of acked messages: channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel.tx_commit() + channel.tx_select() + + channel.message_consume(queue="tx-commit-a", destination="qa", no_ack=False) + queue_a = self.client.queue("qa") + + channel.message_consume(queue="tx-commit-b", destination="qb", no_ack=False) + queue_b = self.client.queue("qb") + + channel.message_consume(queue="tx-commit-c", destination="qc", no_ack=False) + queue_c = self.client.queue("qc") #check results for i in range(1, 5): msg = queue_c.get(timeout=1) self.assertEqual("TxMessage %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("TxMessage 6", msg.body) - msg.ok() + msg.complete() msg = queue_a.get(timeout=1) self.assertEqual("TxMessage 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -76,15 +92,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -114,15 +130,15 @@ class TxTests(TestBase): for i in range(1, 5): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok() + msg.complete() msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() for q in [queue_a, queue_b, queue_c]: try: @@ -150,10 +166,10 @@ class TxTests(TestBase): channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) for i in range(1, 5): - channel.message_transfer(routing_key=name_a, body="Message %d" % i) + channel.message_transfer(routing_key=name_a, message_id="msg%d" % i, body="Message %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="Message 6") - channel.message_transfer(routing_key=topic, destination="amq.topic", body="Message 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="msg6", body="Message 6") + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="msg7", body="Message 7") channel.tx_select() @@ -164,25 +180,25 @@ class TxTests(TestBase): msg = queue_a.get(timeout=1) self.assertEqual("Message %d" % i, msg.body) - msg.ok(batchoffset=-3) + msg.complete() channel.message_consume(queue=name_b, destination="sub_b", no_ack=False) queue_b = self.client.queue("sub_b") msg = queue_b.get(timeout=1) self.assertEqual("Message 6", msg.body) - msg.ok() + msg.complete() sub_c = channel.message_consume(queue=name_c, destination="sub_c", no_ack=False) queue_c = self.client.queue("sub_c") msg = queue_c.get(timeout=1) self.assertEqual("Message 7", msg.body) - msg.ok() + msg.complete() #publish messages for i in range(1, 5): - channel.message_transfer(routing_key=topic, destination="amq.topic", body="TxMessage %d" % i) + channel.message_transfer(routing_key=topic, destination="amq.topic", message_id="tx-msg%d" % i, body="TxMessage %d" % i) - channel.message_transfer(routing_key=key, destination="amq.direct", body="TxMessage 6") - channel.message_transfer(routing_key=name_a, body="TxMessage 7") + channel.message_transfer(routing_key=key, destination="amq.direct", message_id="tx-msg6", body="TxMessage 6") + channel.message_transfer(routing_key=name_a, message_id="tx-msg7", body="TxMessage 7") return queue_a, queue_b, queue_c diff --git a/qpid/specs/amqp-transitional.0-10.xml b/qpid/specs/amqp-transitional.0-10.xml index eb2d745c35..53912d0c2e 100644 --- a/qpid/specs/amqp-transitional.0-10.xml +++ b/qpid/specs/amqp-transitional.0-10.xml @@ -6184,13 +6184,6 @@ <chassis name="server" implement="MUST" /> <chassis name="client" implement="MUST" /> - <!-- TRANSITIONAL: this is included as a temporary measure to allow easier evolution from 0-9 to 0-10 --> - <method name = "ok" index = "500" label = "old means of signalling completion"> - <chassis name = "server" implement = "MUST" /> - <chassis name = "client" implement = "MUST" /> - </method> - - <!-- - Method: message.transfer - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> <method name="transfer" index="10" label="transfer a message"> @@ -6220,8 +6213,9 @@ <chassis name="server" implement="MUST" /> <chassis name="client" implement="MUST" /> - <response name="ok" /> + <!-- commented out to ease transition from 0-9 to 0-10 <response name="reject" /> + --> <field name="ticket" domain="access-ticket" label="access ticket"> <rule name="validity" on-failure="access-refused"> @@ -6594,8 +6588,9 @@ <chassis name="server" implement="MUST" /> - <response name="ok" /><!-- added in just to ease transition --> + <!-- commented out to aid transition to 0-10 <response name="empty" /> + --> <field name="ticket" domain="access-ticket"> <rule name="ticket-required" on-failure="access-refused"> |