summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-06 21:38:30 +0000
committerAlan Conway <aconway@apache.org>2007-02-06 21:38:30 +0000
commit877e7ae368d4320bd60ba5750be207a5cac13f43 (patch)
tree9f0777c5e6069b537e13d1c1f88cc08560f47de3 /cpp
parenta0c19714ccb547c401e598189a36573ac750e809 (diff)
downloadqpid-python-877e7ae368d4320bd60ba5750be207a5cac13f43.tar.gz
* cpp/lib/broker/BrokerQueue.cpp (): Centralized exceptions.
* cpp/lib/broker/BrokerAdapter.cpp (consume): Moved exceptions to Queue * cpp/lib/broker/BrokerChannel.cpp (consume): Moved exceptions to Queue * cpp/lib/broker/BrokerMessageBase.cpp: - Added getApplicationHeaders. * cpp/lib/broker/BrokerMessageMessage.cpp: - Fixed exchangeName/destination mix up. - Removed redundant constructor. - Added getApplicationHeaders * cpp/lib/broker/MessageHandlerImpl.cpp: - Added missing acknowledgements - Replaced assert(0) with throw "unimplemented". - Moved exchange existence exceptions to ExchangeRegistry - Handle transfers with references. * cpp/tests/Makefile.am (check): Don't run tests unless all libs built OK. * cpp/tests/python_tests: Re-enabled python tests. Not all passing. * python/tests/message.py (MessageTests.test_get): Replace get-ok with ok. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504305 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp29
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp15
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp9
-rw-r--r--cpp/lib/broker/BrokerMessage.h1
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h6
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp68
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h9
-rw-r--r--cpp/lib/broker/BrokerQueue.cpp17
-rw-r--r--cpp/lib/broker/BrokerQueue.h6
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp122
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.h7
-rw-r--r--cpp/tests/Makefile.am3
-rwxr-xr-xcpp/tests/python_tests4
13 files changed, 155 insertions, 141 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index c9d44c7445..625dda1480 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -15,6 +15,8 @@
* limitations under the License.
*
*/
+#include <boost/format.hpp>
+
#include "BrokerAdapter.h"
#include "Connection.h"
#include "Exception.h"
@@ -25,6 +27,7 @@
namespace qpid {
namespace broker {
+using boost::format;
using namespace qpid;
using namespace qpid::framing;
@@ -151,9 +154,11 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext
}
}
}
- if (exclusive && !queue->isExclusiveOwner(&connection)) {
- throw ChannelException(405, "Cannot grant exclusive access to queue");
- }
+ if (exclusive && !queue->isExclusiveOwner(&connection))
+ throw ChannelException(
+ 405,
+ format("Cannot grant exclusive access to queue '%s'")
+ % queue->getName());
if (!nowait) {
string queueName = queue->getName();
connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount());
@@ -248,20 +253,14 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume(
throw ConnectionException(530, "Consumer tags must be unique");
}
- try{
- string newTag = consumerTag;
- channel.consume(
- newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
+ string newTag = consumerTag;
+ channel.consume(
+ newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
- else throw ChannelException(403, "Access would violate previously granted exclusivity");
- }
+ if(!nowait) connection.client->getBasic().consumeOk(context, newTag);
+ //allow messages to be dispatched if required as there is now a consumer:
+ queue->dispatch();
}
void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 954eb391ea..ba1ccb7031 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -82,10 +82,11 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks,
ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
- consumers[tag] = c;
- }catch(ExclusiveAccessException& e){
+ consumers[tag] = c;
+ } catch(...) {
+ // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt.
delete c;
- throw e;
+ throw;
}
}
@@ -190,11 +191,11 @@ void Channel::ConsumerImpl::requestDispatch(){
void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
- exch->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable));
}else{
DeliverableMessage deliverable(msg);
- exch->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ exch->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
}
}
@@ -227,12 +228,12 @@ void Channel::complete(Message::shared_ptr msg) {
if(transactional) {
std::auto_ptr<TxPublish> deliverable(new TxPublish(msg));
exchange->route(*deliverable, msg->getRoutingKey(),
- &(msg->getHeaderProperties()->getHeaders()));
+ &(msg->getApplicationHeaders()));
txBuffer.enlist(new DeletingTxOp(deliverable.release()));
} else {
DeliverableMessage deliverable(msg);
exchange->route(deliverable, msg->getRoutingKey(),
- &(msg->getHeaderProperties()->getHeaders()));
+ &(msg->getApplicationHeaders()));
}
}
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 69d4ba087f..43a22ab6b9 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -18,6 +18,8 @@
* under the License.
*
*/
+#include <boost/cast.hpp>
+
#include <BrokerMessage.h>
#include <iostream>
@@ -116,7 +118,12 @@ void BasicMessage::sendContent(
}
BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+ return boost::polymorphic_downcast<BasicHeaderProperties*>(
+ header->getProperties());
+}
+
+const FieldTable& BasicMessage::getApplicationHeaders(){
+ return getHeaderProperties()->getHeaders();
}
const ConnectionToken* const BasicMessage::getPublisher(){
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index 5ac9c83d0e..d56912ea60 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -85,6 +85,7 @@ class BasicMessage : public Message {
u_int32_t framesize);
framing::BasicHeaderProperties* getHeaderProperties();
+ const framing::FieldTable& getApplicationHeaders();
bool isPersistent();
u_int64_t contentSize() const { return size; }
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index 9a5b136ada..d5e37fbc7a 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -37,6 +37,7 @@ namespace framing {
class MethodContext;
class ChannelAdapter;
class BasicHeaderProperties;
+class FieldTable;
}
namespace broker {
@@ -114,7 +115,12 @@ class Message{
virtual bool isComplete() = 0;
virtual u_int64_t contentSize() const = 0;
+ // FIXME aconway 2007-02-06: Get rid of BasicHeaderProperties
+ // at this level. Expose only generic properties available from both
+ // message types (e.g. getApplicationHeaders below).
+ //
virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
+ virtual const framing::FieldTable& getApplicationHeaders() = 0;
virtual bool isPersistent() = 0;
virtual const ConnectionToken* const getPublisher() = 0;
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index 459a0e69e7..d7020b8923 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -18,11 +18,13 @@
* under the License.
*
*/
+#include "QpidError.h"
#include "BrokerMessageMessage.h"
#include "ChannelAdapter.h"
#include "MessageTransferBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
+#include "framing/FieldTable.h"
#include <iostream>
@@ -30,24 +32,15 @@ using namespace std;
using namespace qpid::broker;
using namespace qpid::framing;
-MessageMessage::MessageMessage(
- const boost::shared_ptr<MessageTransferBody> _methodBody,
- const std::string& _exchange, const std::string& _routingKey,
- bool _mandatory, bool _immediate) :
- Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
- methodBody(_methodBody)
-{
-}
-
MessageMessage::MessageMessage(TransferPtr transfer_)
- : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+ : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
transfer_->getMandatory(), transfer_->getImmediate(),
transfer_),
transfer(transfer_)
{}
MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
- : Message(transfer_->getExchange(), transfer_->getRoutingKey(),
+ : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
transfer_->getMandatory(), transfer_->getImmediate(),
transfer_),
transfer(transfer_),
@@ -62,29 +55,29 @@ void MessageMessage::deliver(
{
channel.send(
new MessageTransferBody(channel.getVersion(),
- methodBody->getTicket(),
+ transfer->getTicket(),
consumerTag,
getRedelivered(),
- methodBody->getImmediate(),
- methodBody->getTtl(),
- methodBody->getPriority(),
- methodBody->getTimestamp(),
- methodBody->getDeliveryMode(),
- methodBody->getExpiration(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
getExchange(),
getRoutingKey(),
- methodBody->getMessageId(),
- methodBody->getCorrelationId(),
- methodBody->getReplyTo(),
- methodBody->getContentType(),
- methodBody->getContentEncoding(),
- methodBody->getUserId(),
- methodBody->getAppId(),
- methodBody->getTransactionId(),
- methodBody->getSecurityToken(),
- methodBody->getApplicationHeaders(),
- methodBody->getBody(),
- methodBody->getMandatory()));
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ transfer->getBody(),
+ transfer->getMandatory()));
}
void MessageMessage::sendGetOk(
@@ -98,11 +91,12 @@ void MessageMessage::sendGetOk(
bool MessageMessage::isComplete()
{
- return true; // FIXME aconway 2007-02-05:
+ return true;
}
u_int64_t MessageMessage::contentSize() const
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return 0; // FIXME aconway 2007-02-05:
}
@@ -110,33 +104,45 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
{
return 0; // FIXME aconway 2007-02-05:
}
+
+const FieldTable& MessageMessage::getApplicationHeaders()
+{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+ return transfer->getApplicationHeaders();
+}
bool MessageMessage::isPersistent()
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return false; // FIXME aconway 2007-02-05:
}
const ConnectionToken* const MessageMessage::getPublisher()
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedSize()
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedHeaderSize()
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return 0; // FIXME aconway 2007-02-05:
}
u_int32_t MessageMessage::encodedContentSize()
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return 0; // FIXME aconway 2007-02-05:
}
u_int64_t MessageMessage::expectedContentSize()
{
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return 0; // FIXME aconway 2007-02-05:
}
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index c943ce6102..5310ef65b3 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -38,17 +38,11 @@ namespace broker {
class Reference;
class MessageMessage: public Message{
- const boost::shared_ptr<framing::MessageTransferBody> methodBody;
-
public:
typedef Reference::TransferPtr TransferPtr;
typedef Reference::AppendPtr AppendPtr;
typedef Reference::Appends Appends;
- MessageMessage(
- const boost::shared_ptr<framing::MessageTransferBody> methodBody,
- const std::string& exchange, const std::string& routingKey,
- bool mandatory, bool immediate);
MessageMessage(TransferPtr transfer);
MessageMessage(TransferPtr transfer, const Reference&);
@@ -67,7 +61,8 @@ class MessageMessage: public Message{
bool isComplete();
u_int64_t contentSize() const;
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
+ framing::BasicHeaderProperties* getHeaderProperties();
+ const framing::FieldTable& getApplicationHeaders();
bool isPersistent();
const ConnectionToken* const getPublisher();
diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp
index 0e48d3b13d..99c045c59a 100644
--- a/cpp/lib/broker/BrokerQueue.cpp
+++ b/cpp/lib/broker/BrokerQueue.cpp
@@ -18,6 +18,9 @@
* under the License.
*
*/
+
+#include <boost/format.hpp>
+
#include <BrokerQueue.h>
#include <MessageStore.h>
#include <sys/Monitor.h>
@@ -27,6 +30,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
+using boost::format;
Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
@@ -128,12 +132,17 @@ void Queue::dispatch(){
void Queue::consume(Consumer* c, bool requestExclusive){
Mutex::ScopedLock locker(lock);
- if(exclusive) throw ExclusiveAccessException();
- if(requestExclusive){
- if(!consumers.empty()) throw ExclusiveAccessException();
+ if(exclusive)
+ throw ChannelException(
+ 403, format("Queue '%s' has an exclusive consumer."
+ " No more consumers allowed.") % getName());
+ if(requestExclusive) {
+ if(!consumers.empty())
+ throw ChannelException(
+ 403, format("Queue '%s' already has conumers."
+ "Exclusive access denied.") %getName());
exclusive = c;
}
-
if(autodelete && consumers.empty()) lastUsed = 0;
consumers.push_back(c);
}
diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h
index 860de45b9c..40fa4bd415 100644
--- a/cpp/lib/broker/BrokerQueue.h
+++ b/cpp/lib/broker/BrokerQueue.h
@@ -21,7 +21,6 @@
* under the License.
*
*/
-
#include <vector>
#include <memory>
#include <queue>
@@ -35,6 +34,9 @@
#include <sys/Monitor.h>
#include <QueuePolicy.h>
+// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
+// enforce ownership of Consumers.
+
namespace qpid {
namespace broker {
class MessageStore;
@@ -42,8 +44,6 @@ namespace qpid {
/**
* Thrown when exclusive access would be violated.
*/
- struct ExclusiveAccessException{};
-
using std::string;
/**
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index e19afd0e67..5f5e9b84e7 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -1,4 +1,3 @@
-
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -17,6 +16,7 @@
*
*/
+#include "QpidError.h"
#include "MessageHandlerImpl.h"
#include "BrokerChannel.h"
#include "FramingContent.h"
@@ -31,6 +31,11 @@ namespace broker {
using namespace framing;
+MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
+ : channel(ch), connection(c), broker(b), references(ch),
+ client(connection.client->getMessage())
+{}
+
//
// Message class method handlers
//
@@ -42,7 +47,7 @@ MessageHandlerImpl::append(const MethodContext& context,
references.get(reference).append(
boost::shared_polymorphic_downcast<MessageAppendBody>(
context.methodBody));
- sendOk(context);
+ client.ok(context);
}
@@ -51,7 +56,7 @@ MessageHandlerImpl::cancel(const MethodContext& context,
const string& destination )
{
channel.cancel(destination);
- sendOk(context);
+ client.ok(context);
}
void
@@ -59,7 +64,8 @@ MessageHandlerImpl::checkpoint(const MethodContext&,
const string& /*reference*/,
const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -67,7 +73,7 @@ MessageHandlerImpl::close(const MethodContext& context,
const string& reference)
{
references.get(reference).close();
- sendOk(context);
+ client.ok(context);
}
void
@@ -80,32 +86,23 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
- if(!destination.empty() && channel.exists(destination)){
+ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
+ if(!destination.empty() && channel.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
- }
-
- try{
- string newTag = destination;
- channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter);
-
- sendOk(context);
-
- //allow messages to be dispatched if required as there is now a consumer:
- queue->dispatch();
- }catch(ExclusiveAccessException& e){
- if(exclusive)
- throw ChannelException(403, "Exclusive access cannot be granted");
- else
- throw ChannelException(
- 403, "Access would violate previously granted exclusivity");
- }
+ string tag = destination;
+ channel.consume(
+ tag, queue, !noAck, exclusive,
+ noLocal ? &connection : 0, &filter);
+ client.ok(context);
+ // Dispatch messages as there is now a consumer.
+ queue->dispatch();
}
void
MessageHandlerImpl::empty( const MethodContext& )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -121,17 +118,18 @@ MessageHandlerImpl::get( const MethodContext& context,
connection.getQueue(queueName, context.channel->getId());
// FIXME: get is probably Basic specific
- if(!channel.get(queue, !noAck)){
- connection.client->getMessageHandler()->empty(context);
- }
-
+ if(channel.get(queue, !noAck))
+ client.ok(context);
+ else
+ client.empty(context);
}
void
MessageHandlerImpl::offset(const MethodContext&,
u_int64_t /*value*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -145,7 +143,7 @@ MessageHandlerImpl::open(const MethodContext& context,
const string& reference)
{
references.open(reference);
- sendOk(context);
+ client.ok(context);
}
void
@@ -157,18 +155,17 @@ MessageHandlerImpl::qos(const MethodContext& context,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
-
- sendOk(context);
+ client.ok(context);
}
void
-MessageHandlerImpl::recover(const MethodContext&,
- bool requeue )
+MessageHandlerImpl::recover(const MethodContext& context,
+ bool requeue)
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented");
+ // FIXME aconway 2007-02-06: Call to recover hangs client.
channel.recover(requeue);
-
+ client.ok(context);
}
void
@@ -176,7 +173,8 @@ MessageHandlerImpl::reject(const MethodContext&,
u_int16_t /*code*/,
const string& /*text*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
@@ -184,22 +182,23 @@ MessageHandlerImpl::resume(const MethodContext&,
const string& /*reference*/,
const string& /*identifier*/ )
{
- assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
+ // FIXME astitcher 2007-01-11: 0-9 feature
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
}
void
MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
- const string& /*destination*/,
+ const string& destination,
bool /*redelivered*/,
- bool immediate,
+ bool /*immediate*/,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
- const string& exchangeName,
- const string& routingKey,
+ const string& /*exchangeName*/,
+ const string& /*routingKey*/,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -211,30 +210,23 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool mandatory)
+ bool /*mandatory*/)
{
- Exchange::shared_ptr exchange = exchangeName.empty() ?
- broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
- boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
- if(exchange){
- if (body.isInline()) {
- Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
- routingKey, mandatory, immediate));
-
- channel.handleInlineTransfer(msg, exchange);
-
- connection.client->getMessageHandler()->ok(context);
- } else {
- references.get(body.getValue()).transfer(transfer);
- }
- }else{
- throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+ Exchange::shared_ptr exchange(
+ broker.getExchanges().get(destination));
+ MessageTransferBody::shared_ptr transfer(
+ boost::shared_polymorphic_downcast<MessageTransferBody>(
+ context.methodBody));
+ if (body.isInline()) {
+ Message::shared_ptr msg(new MessageMessage(transfer));
+ channel.handleInlineTransfer(msg, exchange);
+ }
+ else {
+ // Add to reference.
+ references.get(body.getValue()).transfer(transfer);
}
+ client.ok(context);
}
-void MessageHandlerImpl::sendOk(const MethodContext& context) {
- connection.client->getMessageHandler()->ok(context);
-}
-
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h
index 886ca5fb54..0fef45bb19 100644
--- a/cpp/lib/broker/MessageHandlerImpl.h
+++ b/cpp/lib/broker/MessageHandlerImpl.h
@@ -22,6 +22,7 @@
#include <memory>
#include "AMQP_ServerOperations.h"
+#include "AMQP_ClientProxy.h"
#include "Reference.h"
#include "BrokerChannel.h"
@@ -36,8 +37,7 @@ class MessageHandlerImpl :
public framing::AMQP_ServerOperations::MessageHandler
{
public:
- MessageHandlerImpl(Channel& ch, Connection& c, Broker& b)
- : channel(ch), connection(c), broker(b), references(ch) {}
+ MessageHandlerImpl(Channel& ch, Connection& c, Broker& b);
void append(const framing::MethodContext&,
const std::string& reference,
@@ -119,12 +119,11 @@ class MessageHandlerImpl :
framing::Content body,
bool mandatory );
private:
- void sendOk(const framing::MethodContext&);
-
Channel& channel;
Connection& connection;
Broker& broker;
ReferenceRegistry references;
+ framing::AMQP_ClientProxy::Message& client;
};
}} // namespace qpid::broker
diff --git a/cpp/tests/Makefile.am b/cpp/tests/Makefile.am
index 93a117b514..2b51a5b125 100644
--- a/cpp/tests/Makefile.am
+++ b/cpp/tests/Makefile.am
@@ -98,3 +98,6 @@ gen.mk: Makefile.am
) \
> $@-t
mv $@-t $@
+
+
+check: $(check_LTLIBRARIES) $(lib_common) $(lib_client) $(lib_broker)
diff --git a/cpp/tests/python_tests b/cpp/tests/python_tests
index d41f8adb80..3f22082f51 100755
--- a/cpp/tests/python_tests
+++ b/cpp/tests/python_tests
@@ -1,8 +1,4 @@
#!/bin/sh
-# FIXME aconway 2007-01-09: Re-enable.
-echo "*** WARNING: PYTHON TESTS DISABLED till branch is functioning on 0-9."
-exit
-
# Run the python tests.
if test -d ../../python ; then
cd ../../python && ./run-tests -v -I cpp_failing.txt