diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
tree | 13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/MessageHandlerImpl.cpp | |
parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 65 |
1 files changed, 17 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 41dd8cc145..da57439e21 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -21,8 +21,6 @@ #include "BrokerChannel.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" -#include "ConsumeAdapter.h" -#include "GetAdapter.h" #include "Broker.h" #include "BrokerMessageMessage.h" #include "qpid/framing/MessageAppendBody.h" @@ -45,66 +43,44 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - //client.ok(); } void -MessageHandlerImpl::open(const string& reference) +MessageHandlerImpl::open(const string& /*reference*/) { - references.open(reference); - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::append(const framing::MethodContext& context) +MessageHandlerImpl::append(const framing::MethodContext& /*context*/) { - MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); - references.get(body->getReference())->append(body); - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::close(const string& reference) +MessageHandlerImpl::close(const string& /*reference*/) { - Reference::shared_ptr ref = references.get(reference); - //client.ok(); - - // Send any transfer messages to their correct exchanges and okay them - const Reference::Messages& msgs = ref->getMessages(); - for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { - channel.handleInlineTransfer(*m); - client.setResponseTo((*m)->getRequestId()); - client.ok(); - } - ref->close(); + throw ConnectionException(540, "References no longer supported"); } void MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { - // Initial implementation (which is conforming) is to do nothing here - // and return offset zero for the resume - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::resume(const string& reference, +MessageHandlerImpl::resume(const string& /*reference*/, const string& /*identifier*/ ) { - // Initial (null) implementation - // open reference and return 0 offset - references.open(reference); - client.offset(0); + throw ConnectionException(540, "References no longer supported"); } void MessageHandlerImpl::offset(uint64_t /*value*/ ) { - // Shouldn't ever receive this as it is reponse to resume - // which is never sent - // TODO astitcher 2007-02-16 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "impossible"); + throw ConnectionException(540, "References no longer supported"); } void @@ -120,14 +96,12 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), - tag, queue, !noAck, exclusive, - noLocal ? &connection : 0, &filter); - //client.ok(); + channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } + void MessageHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, @@ -136,11 +110,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, { Queue::shared_ptr queue = getQueue(queueName); - GetAdapter out(adapter, queue, destination, connection.getFrameMax()); - if(channel.get(out, queue, !noAck)) { - client.ok(); + if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){ + //don't send any response... rely on execution completion } else { - client.empty(); + //temporarily disabled: + //client.empty(); } } @@ -167,14 +141,12 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - //client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - //client.ok(); } void @@ -193,11 +165,8 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) if (transfer->getBody().isInline()) { MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(); } else { - Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); - MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); - ref->addMessage(message); + throw ConnectionException(540, "References no longer supported"); } } |