summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerAdapter.cpp5
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp11
-rw-r--r--cpp/lib/broker/BrokerChannel.h10
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp46
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h12
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp42
6 files changed, 95 insertions, 31 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp
index 6f55f32d47..c9d44c7445 100644
--- a/cpp/lib/broker/BrokerAdapter.cpp
+++ b/cpp/lib/broker/BrokerAdapter.cpp
@@ -334,6 +334,10 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& )
//no specific action required, generic response handling should be sufficient
}
+
+//
+// Message class method handlers
+//
void
BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context)
{
@@ -341,6 +345,7 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& con
connection.client->getChannel().pong(context);
}
+
void
BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context)
{
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index c0250815e8..954eb391ea 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -187,6 +187,17 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
+void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exch->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exch->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }
+}
+
// FIXME aconway 2007-02-05: Drop exchange member, calculate from
// message in ::complete().
void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 484a4d64e3..cbad2382a8 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -96,7 +96,9 @@ class Channel : public framing::ChannelAdapter,
boost::scoped_ptr<BrokerAdapter> adapter;
- virtual void complete(Message::shared_ptr msg);
+ // completion handler for MessageBuilder
+ void complete(Message::shared_ptr msg);
+
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
@@ -110,7 +112,9 @@ class Channel : public framing::ChannelAdapter,
~Channel();
+ // For ChannelAdapter
bool isOpen() const { return opened; }
+
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
@@ -134,6 +138,10 @@ class Channel : public framing::ChannelAdapter,
void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+
+ void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exchange);
+
+ // For ChannelAdapter
void handleMethodInContext(
boost::shared_ptr<framing::AMQMethodBody> method,
const framing::MethodContext& context);
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index e2c4b94811..459a0e69e7 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -18,15 +18,27 @@
* under the License.
*
*/
-#include <iostream>
#include "BrokerMessageMessage.h"
+#include "ChannelAdapter.h"
#include "MessageTransferBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
+#include <iostream>
+
using namespace std;
using namespace qpid::broker;
+using namespace qpid::framing;
+MessageMessage::MessageMessage(
+ const boost::shared_ptr<MessageTransferBody> _methodBody,
+ const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
+ methodBody(_methodBody)
+{
+}
+
MessageMessage::MessageMessage(TransferPtr transfer_)
: Message(transfer_->getExchange(), transfer_->getRoutingKey(),
transfer_->getMandatory(), transfer_->getImmediate(),
@@ -43,14 +55,36 @@ MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
{}
void MessageMessage::deliver(
- framing::ChannelAdapter& /*channel*/,
- const std::string& /*consumerTag*/,
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
- // FIXME aconway 2007-02-05:
- cout << "MessageMessage::deliver" << *transfer << " + " << appends.size()
- << " appends." << endl;
+ channel.send(
+ new MessageTransferBody(channel.getVersion(),
+ methodBody->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ methodBody->getImmediate(),
+ methodBody->getTtl(),
+ methodBody->getPriority(),
+ methodBody->getTimestamp(),
+ methodBody->getDeliveryMode(),
+ methodBody->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ methodBody->getMessageId(),
+ methodBody->getCorrelationId(),
+ methodBody->getReplyTo(),
+ methodBody->getContentType(),
+ methodBody->getContentEncoding(),
+ methodBody->getUserId(),
+ methodBody->getAppId(),
+ methodBody->getTransactionId(),
+ methodBody->getSecurityToken(),
+ methodBody->getApplicationHeaders(),
+ methodBody->getBody(),
+ methodBody->getMandatory()));
}
void MessageMessage::sendGetOk(
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index aa136863a1..c943ce6102 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -21,10 +21,12 @@
* under the License.
*
*/
-#include <vector>
#include "BrokerMessageBase.h"
+#include "MessageTransferBody.h"
#include "Reference.h"
+#include <vector>
+
namespace qpid {
namespace framing {
@@ -36,11 +38,17 @@ namespace broker {
class Reference;
class MessageMessage: public Message{
+ const boost::shared_ptr<framing::MessageTransferBody> methodBody;
+
public:
typedef Reference::TransferPtr TransferPtr;
typedef Reference::AppendPtr AppendPtr;
- typedef Reference::Appends Appends;
+ typedef Reference::Appends Appends;
+ MessageMessage(
+ const boost::shared_ptr<framing::MessageTransferBody> methodBody,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
MessageMessage(TransferPtr transfer);
MessageMessage(TransferPtr transfer, const Reference&);
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 30b69e4654..e19afd0e67 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -80,8 +80,6 @@ MessageHandlerImpl::consume(const MethodContext& context,
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!destination.empty() && channel.exists(destination)){
throw ConnectionException(530, "Consumer tags must be unique");
@@ -139,7 +137,7 @@ MessageHandlerImpl::offset(const MethodContext&,
void
MessageHandlerImpl::ok( const MethodContext& )
{
- // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
+ // TODO: Need to ack the transfers acknowledged so far for flow control purp oses
}
void
@@ -156,8 +154,6 @@ MessageHandlerImpl::qos(const MethodContext& context,
u_int16_t prefetchCount,
bool /*global*/ )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
@@ -196,14 +192,14 @@ MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool /* immediate */,
+ bool immediate,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& /* routingKey */,
+ const string& routingKey,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -215,22 +211,24 @@ MessageHandlerImpl::transfer(const MethodContext& context,
const string& /*securityToken*/,
const qpid::framing::FieldTable& /*applicationHeaders*/,
qpid::framing::Content body,
- bool /* mandatory */ )
+ bool mandatory)
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
- MessageTransferBody::shared_ptr transfer =
- boost::shared_polymorphic_downcast<MessageTransferBody>(
- context.methodBody);
- // Verify the exchange exists, will throw if not.
- broker.getExchanges().get(exchangeName);
- if (body.isInline()) {
- MessageMessage* msg = new MessageMessage(transfer);
- // FIXME aconway 2007-02-05: Remove exchange parameter.
- // use shared_ptr for message.
- channel.handlePublish(msg, Exchange::shared_ptr());
- sendOk(context);
- } else {
- references.get(body.getValue()).transfer(transfer);
+ Exchange::shared_ptr exchange = exchangeName.empty() ?
+ broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName);
+ boost::shared_ptr<MessageTransferBody> transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
+ if(exchange){
+ if (body.isInline()) {
+ Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
+ routingKey, mandatory, immediate));
+
+ channel.handleInlineTransfer(msg, exchange);
+
+ connection.client->getMessageHandler()->ok(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
+ }
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
}
}