summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/MessageHandlerImpl.cpp
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp65
1 files changed, 17 insertions, 48 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 41dd8cc145..da57439e21 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -21,8 +21,6 @@
#include "BrokerChannel.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
-#include "ConsumeAdapter.h"
-#include "GetAdapter.h"
#include "Broker.h"
#include "BrokerMessageMessage.h"
#include "qpid/framing/MessageAppendBody.h"
@@ -45,66 +43,44 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- //client.ok();
}
void
-MessageHandlerImpl::open(const string& reference)
+MessageHandlerImpl::open(const string& /*reference*/)
{
- references.open(reference);
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::append(const framing::MethodContext& context)
+MessageHandlerImpl::append(const framing::MethodContext& /*context*/)
{
- MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
- references.get(body->getReference())->append(body);
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::close(const string& reference)
+MessageHandlerImpl::close(const string& /*reference*/)
{
- Reference::shared_ptr ref = references.get(reference);
- //client.ok();
-
- // Send any transfer messages to their correct exchanges and okay them
- const Reference::Messages& msgs = ref->getMessages();
- for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
- channel.handleInlineTransfer(*m);
- client.setResponseTo((*m)->getRequestId());
- client.ok();
- }
- ref->close();
+ throw ConnectionException(540, "References no longer supported");
}
void
MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
- // Initial implementation (which is conforming) is to do nothing here
- // and return offset zero for the resume
- //client.ok();
+ throw ConnectionException(540, "References no longer supported");
}
void
-MessageHandlerImpl::resume(const string& reference,
+MessageHandlerImpl::resume(const string& /*reference*/,
const string& /*identifier*/ )
{
- // Initial (null) implementation
- // open reference and return 0 offset
- references.open(reference);
- client.offset(0);
+ throw ConnectionException(540, "References no longer supported");
}
void
MessageHandlerImpl::offset(uint64_t /*value*/ )
{
- // Shouldn't ever receive this as it is reponse to resume
- // which is never sent
- // TODO astitcher 2007-02-16 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
+ throw ConnectionException(540, "References no longer supported");
}
void
@@ -120,14 +96,12 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
- channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
- tag, queue, !noAck, exclusive,
- noLocal ? &connection : 0, &filter);
- //client.ok();
+ channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
+
void
MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& queueName,
@@ -136,11 +110,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = getQueue(queueName);
- GetAdapter out(adapter, queue, destination, connection.getFrameMax());
- if(channel.get(out, queue, !noAck)) {
- client.ok();
+ if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){
+ //don't send any response... rely on execution completion
} else {
- client.empty();
+ //temporarily disabled:
+ //client.empty();
}
}
@@ -167,14 +141,12 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- //client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- //client.ok();
}
void
@@ -193,11 +165,8 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context)
if (transfer->getBody().isInline()) {
MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- client.ok();
} else {
- Reference::shared_ptr ref(references.get(transfer->getBody().getValue()));
- MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref));
- ref->addMessage(message);
+ throw ConnectionException(540, "References no longer supported");
}
}