summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-06 21:38:30 +0000
committerAlan Conway <aconway@apache.org>2007-02-06 21:38:30 +0000
commit877e7ae368d4320bd60ba5750be207a5cac13f43 (patch)
tree9f0777c5e6069b537e13d1c1f88cc08560f47de3 /cpp/lib/broker/MessageHandlerImpl.cpp
parenta0c19714ccb547c401e598189a36573ac750e809 (diff)
downloadqpid-python-877e7ae368d4320bd60ba5750be207a5cac13f43.tar.gz
* cpp/lib/broker/BrokerQueue.cpp (): Centralized exceptions.
* cpp/lib/broker/BrokerAdapter.cpp (consume): Moved exceptions to Queue * cpp/lib/broker/BrokerChannel.cpp (consume): Moved exceptions to Queue * cpp/lib/broker/BrokerMessageBase.cpp: - Added getApplicationHeaders. * cpp/lib/broker/BrokerMessageMessage.cpp: - Fixed exchangeName/destination mix up. - Removed redundant constructor. - Added getApplicationHeaders * cpp/lib/broker/MessageHandlerImpl.cpp: - Added missing acknowledgements - Replaced assert(0) with throw "unimplemented". - Moved exchange existence exceptions to ExchangeRegistry - Handle transfers with references. * cpp/tests/Makefile.am (check): Don't run tests unless all libs built OK. * cpp/tests/python_tests: Re-enabled python tests. Not all passing. * python/tests/message.py (MessageTests.test_get): Replace get-ok with ok. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504305 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp122
1 files changed, 57 insertions, 65 deletions
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index e19afd0e67..5f5e9b84e7 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -1,4 +1,3 @@
-
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -17,6 +16,7 @@
*
*/
+#include "QpidError.h"
#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
#include "FramingContent.h"
@@ -31,6 +31,11 @@ namespace broker {
using namespace framing;
+MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b), references(ch),
+ client(connection.client->getMessage())
+{}
+
//
// Message class method handlers
//
@@ -42,7 +47,7 @@ MessageHandlerImpl::append(const MethodContext& context,
references.get(reference).append(
boost::shared_polymorphic_downcast<MessageAppendBody>(
context.methodBody));
- sendOk(context);
+ client.ok(context);
}
@@ -51,7 +56,7 @@ MessageHandlerImpl::cancel(const MethodContext& context,
const string& destination )
{
channel.cancel(destination);
- sendOk(context);
+ client.ok(context);
}
void
@@ -59,7 +64,8 @@ MessageHandlerImpl::checkpoint(const MethodContext&,
const string& /*reference*/,
const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -67,7 +73,7 @@ MessageHandlerImpl::close(const MethodContext& context,
const string& reference)
{
references.get(reference).close();
- sendOk(context);
+ client.ok(context);
}
void
@@ -80,32 +86,23 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!destination.empty() && channel.exists(destination)){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- sendOk(context);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive)
- throw ChannelException(403, "Exclusive access cannot be granted");
- else
- throw ChannelException(
- 403, "Access would violate previously granted exclusivity");
- }
+ string tag = destination;
+ channel.consume(
+ tag, queue, !noAck, exclusive,
+ noLocal ? &connection : 0, &filter);
+ client.ok(context);
+ // Dispatch messages as there is now a consumer.
+ queue->dispatch();
}
void
MessageHandlerImpl::empty( const MethodContext& )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -121,17 +118,18 @@ MessageHandlerImpl::get( const MethodContext& context,
connection.getQueue(queueName, context.channel->getId());
// FIXME: get is probably Basic specific
- if(!channel.get(queue, !noAck)){
- connection.client->getMessageHandler()->empty(context);
- }
-
+ if(channel.get(queue, !noAck))
+ client.ok(context);
+ else
+ client.empty(context);
}
void
MessageHandlerImpl::offset(const MethodContext&,
u_int64_t /*value*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -145,7 +143,7 @@ MessageHandlerImpl::open(const MethodContext& context,
const string& reference)
{
references.open(reference);
- sendOk(context);
+ client.ok(context);
}
void
@@ -157,18 +155,17 @@ MessageHandlerImpl::qos(const MethodContext& context,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
-
- sendOk(context);
+ client.ok(context);
}
void
-MessageHandlerImpl::recover(const MethodContext&,
- bool requeue )
+MessageHandlerImpl::recover(const MethodContext& context,
+ bool requeue)
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented");
+ // FIXME aconway 2007-02-06: Call to recover hangs client.
channel.recover(requeue);
-
+ client.ok(context);
}
void
@@ -176,7 +173,8 @@ MessageHandlerImpl::reject(const MethodContext&,
u_int16_t /*code*/,
const string& /*text*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -184,22 +182,23 @@ MessageHandlerImpl::resume(const MethodContext&,
const string& /*reference*/,
const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& /*destination*/,
+ const string& destination,
bool /*redelivered*/,
- bool immediate,
+ bool /*immediate*/,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
- const string& exchangeName,
- const string& routingKey,
+ const string& /*exchangeName*/,
+ const string& /*routingKey*/,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -211,30 +210,23 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool mandatory)
+ bool /*mandatory*/)
{
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
- if(exchange){
- if (body.isInline()) {
- Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
- routingKey, mandatory, immediate));
-
- channel.handleInlineTransfer(msg, exchange);
-
- connection.client->getMessageHandler()->ok(context);
- } else {
- references.get(body.getValue()).transfer(transfer);
- }
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ Exchange::shared_ptr exchange(
+ broker.getExchanges().get(destination));
+ MessageTransferBody::shared_ptr transfer(
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody));
+ if (body.isInline()) {
+ Message::shared_ptr msg(new MessageMessage(transfer));
+ channel.handleInlineTransfer(msg, exchange);
+ }
+ else {
+ // Add to reference.
+ references.get(body.getValue()).transfer(transfer);
}
+ client.ok(context);
}
-void MessageHandlerImpl::sendOk(const MethodContext& context) {
- connection.client->getMessageHandler()->ok(context);
-}
-
}} // namespace qpid::broker