summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-02-07 10:13:41 +0000
committerAlan Conway <aconway@apache.org>2007-02-07 10:13:41 +0000
commit1977153241e86e93b237d2ed7fe02883d44646c5 (patch)
tree68ea88d60712a2459b524add42e412f4ae8ce9b6 /cpp/lib
parent877e7ae368d4320bd60ba5750be207a5cac13f43 (diff)
downloadqpid-python-1977153241e86e93b237d2ed7fe02883d44646c5.tar.gz
* broker/BrokerMessage.cpp: Added ConnectionToken publisher.
* cpp/lib/broker/BrokerMessageMessage.cpp: - Added ConnectionToken publisher. - Implemented getDeliveryMode, getApplicationHeaders * cpp/lib/broker/Reference.cpp: Holds MessageMessage instead of just MessageTransferBody. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp4
-rw-r--r--cpp/lib/broker/BrokerChannel.h2
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp17
-rw-r--r--cpp/lib/broker/BrokerMessage.h2
-rw-r--r--cpp/lib/broker/BrokerMessageBase.h14
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp84
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h19
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp15
-rw-r--r--cpp/lib/broker/Reference.cpp8
-rw-r--r--cpp/lib/broker/Reference.h25
10 files changed, 94 insertions, 96 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index ba1ccb7031..84ac747846 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -188,7 +188,9 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exch){
+void Channel::handleInlineTransfer(
+ Message::shared_ptr msg, Exchange::shared_ptr& exch)
+{
if(transactional){
TxPublish* deliverable = new TxPublish(msg);
exch->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index cbad2382a8..6e906e7615 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -139,7 +139,7 @@ class Channel : public framing::ChannelAdapter,
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
- void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr& exchange);
+ void handleInlineTransfer(Message::shared_ptr msg, Exchange::shared_ptr& exchange);
// For ChannelAdapter
void handleMethodInContext(
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 43a22ab6b9..d232efff16 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -41,11 +41,10 @@ BasicMessage::BasicMessage(
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate, framing::AMQMethodBody::shared_ptr respondTo
) :
- Message(_exchange, _routingKey, _mandatory, _immediate, respondTo),
- publisher(_publisher),
+ Message(_publisher, _exchange, _routingKey, _mandatory,
+ _immediate, respondTo),
size(0)
-{
-}
+{}
// FIXME aconway 2007-02-01: remove.
// BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
@@ -56,7 +55,7 @@ BasicMessage::BasicMessage(
// }
// For tests only.
-BasicMessage::BasicMessage() : publisher(0), size(0)
+BasicMessage::BasicMessage() : size(0)
{}
BasicMessage::~BasicMessage(){
@@ -126,10 +125,6 @@ const FieldTable& BasicMessage::getApplicationHeaders(){
return getHeaderProperties()->getHeaders();
}
-const ConnectionToken* const BasicMessage::getPublisher(){
- return publisher;
-}
-
bool BasicMessage::isPersistent()
{
if(!header) return false;
@@ -230,12 +225,14 @@ void BasicMessage::releaseContent(MessageStore* store)
store->stage(this);
}
if (!content.get() || content->size() > 0) {
+ // FIXME aconway 2007-02-07: handle MessageMessage.
//set content to lazy loading mode (but only if there is stored content):
//Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
// then set as a member of that message so its lifetime is guaranteed to be no longer than
// that of the message itself
- content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize()));
+ content = std::auto_ptr<Content>(
+ new LazyLoadedContent(store, this, expectedContentSize()));
}
}
diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h
index d56912ea60..308fcc1791 100644
--- a/cpp/lib/broker/BrokerMessage.h
+++ b/cpp/lib/broker/BrokerMessage.h
@@ -52,7 +52,6 @@ using framing::string;
* request.
*/
class BasicMessage : public Message {
- const ConnectionToken* const publisher;
framing::AMQHeaderBody::shared_ptr header;
std::auto_ptr<Content> content;
sys::Mutex contentLock;
@@ -72,7 +71,6 @@ class BasicMessage : public Message {
void setHeader(framing::AMQHeaderBody::shared_ptr header);
void addContent(framing::AMQContentBody::shared_ptr data);
bool isComplete();
- const ConnectionToken* const getPublisher();
void deliver(framing::ChannelAdapter&,
const string& consumerTag,
diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h
index d5e37fbc7a..32767191ca 100644
--- a/cpp/lib/broker/BrokerMessageBase.h
+++ b/cpp/lib/broker/BrokerMessageBase.h
@@ -40,10 +40,10 @@ class BasicHeaderProperties;
class FieldTable;
}
-namespace broker {
-class MessageStore;
+namespace broker {
class ConnectionToken;
+class MessageStore;
/**
* Base class for all types of internal broker messages
@@ -51,6 +51,7 @@ class ConnectionToken;
* TODO; AMS: for the moment this is mostly a placeholder
*/
class Message{
+ const ConnectionToken* publisher;
std::string exchange;
std::string routingKey;
const bool mandatory;
@@ -62,9 +63,12 @@ class Message{
public:
typedef boost::shared_ptr<Message> shared_ptr;
- Message(const std::string& _exchange, const std::string& _routingKey,
+ Message(const ConnectionToken* publisher_,
+ const std::string& _exchange,
+ const std::string& _routingKey,
bool _mandatory, bool _immediate,
framing::AMQMethodBody::shared_ptr respondTo_) :
+ publisher(publisher_),
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
@@ -122,7 +126,9 @@ class Message{
virtual framing::BasicHeaderProperties* getHeaderProperties() = 0;
virtual const framing::FieldTable& getApplicationHeaders() = 0;
virtual bool isPersistent() = 0;
- virtual const ConnectionToken* const getPublisher() = 0;
+ virtual const ConnectionToken* getPublisher() const {
+ return publisher;
+ }
virtual void encode(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
virtual void encodeHeader(framing::Buffer& /*buffer*/) {}; // XXXX: Only used in tests?
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index d7020b8923..29186cc18e 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -25,26 +25,24 @@
#include "MessageAppendBody.h"
#include "Reference.h"
#include "framing/FieldTable.h"
+#include "framing/BasicHeaderProperties.h"
#include <iostream>
using namespace std;
-using namespace qpid::broker;
using namespace qpid::framing;
-
-MessageMessage::MessageMessage(TransferPtr transfer_)
- : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
- transfer_->getMandatory(), transfer_->getImmediate(),
- transfer_),
- transfer(transfer_)
-{}
-MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref)
- : Message(transfer_->getDestination(), transfer_->getRoutingKey(),
- transfer_->getMandatory(), transfer_->getImmediate(),
- transfer_),
- transfer(transfer_),
- appends(ref.getAppends())
+namespace qpid {
+namespace broker {
+
+MessageMessage::MessageMessage(
+ ConnectionToken* publisher, TransferPtr transfer_
+) : Message(publisher, transfer_->getDestination(),
+ transfer_->getRoutingKey(),
+ transfer_->getMandatory(),
+ transfer_->getImmediate(),
+ transfer_),
+ transfer(transfer_)
{}
void MessageMessage::deliver(
@@ -55,29 +53,29 @@ void MessageMessage::deliver(
{
channel.send(
new MessageTransferBody(channel.getVersion(),
- transfer->getTicket(),
- consumerTag,
- getRedelivered(),
- transfer->getImmediate(),
- transfer->getTtl(),
- transfer->getPriority(),
- transfer->getTimestamp(),
- transfer->getDeliveryMode(),
- transfer->getExpiration(),
- getExchange(),
- getRoutingKey(),
- transfer->getMessageId(),
- transfer->getCorrelationId(),
- transfer->getReplyTo(),
- transfer->getContentType(),
- transfer->getContentEncoding(),
- transfer->getUserId(),
- transfer->getAppId(),
- transfer->getTransactionId(),
- transfer->getSecurityToken(),
- transfer->getApplicationHeaders(),
- transfer->getBody(),
- transfer->getMandatory()));
+ transfer->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ transfer->getImmediate(),
+ transfer->getTtl(),
+ transfer->getPriority(),
+ transfer->getTimestamp(),
+ transfer->getDeliveryMode(),
+ transfer->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ transfer->getMessageId(),
+ transfer->getCorrelationId(),
+ transfer->getReplyTo(),
+ transfer->getContentType(),
+ transfer->getContentEncoding(),
+ transfer->getUserId(),
+ transfer->getAppId(),
+ transfer->getTransactionId(),
+ transfer->getSecurityToken(),
+ transfer->getApplicationHeaders(),
+ transfer->getBody(),
+ transfer->getMandatory()));
}
void MessageMessage::sendGetOk(
@@ -107,19 +105,11 @@ qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
const FieldTable& MessageMessage::getApplicationHeaders()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
return transfer->getApplicationHeaders();
}
bool MessageMessage::isPersistent()
{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return false; // FIXME aconway 2007-02-05:
-}
-
-const ConnectionToken* const MessageMessage::getPublisher()
-{
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
- return 0; // FIXME aconway 2007-02-05:
+ return transfer->getDeliveryMode() == PERSISTENT;
}
u_int32_t MessageMessage::encodedSize()
@@ -146,3 +136,5 @@ u_int64_t MessageMessage::expectedContentSize()
return 0; // FIXME aconway 2007-02-05:
}
+
+}} // namespace qpid::broker
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index 5310ef65b3..fb0a4749d4 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -35,19 +35,25 @@ class MessageApppendBody;
}
namespace broker {
+class ConnectionToken;
class Reference;
class MessageMessage: public Message{
public:
- typedef Reference::TransferPtr TransferPtr;
+ typedef boost::shared_ptr<MessageMessage> shared_ptr;
+ typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
typedef Reference::AppendPtr AppendPtr;
typedef Reference::Appends Appends;
- MessageMessage(TransferPtr transfer);
- MessageMessage(TransferPtr transfer, const Reference&);
+ MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
// Default destructor okay
-
+
+ TransferPtr getTransfer() { return transfer; }
+
+ const Appends& getAppends() { return appends; }
+ void setAppends(const Appends& appends_) { appends = appends_; }
+
void deliver(framing::ChannelAdapter& channel,
const std::string& consumerTag,
u_int64_t deliveryTag,
@@ -64,19 +70,16 @@ class MessageMessage: public Message{
framing::BasicHeaderProperties* getHeaderProperties();
const framing::FieldTable& getApplicationHeaders();
bool isPersistent();
- const ConnectionToken* const getPublisher();
u_int32_t encodedSize();
u_int32_t encodedHeaderSize();
u_int32_t encodedContentSize();
u_int64_t expectedContentSize();
- TransferPtr getTransfer() { return transfer; }
- const Appends& getAppends() { return appends; }
private:
const TransferPtr transfer;
- const Appends appends;
+ Appends appends;
};
}}
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 5f5e9b84e7..49c4153185 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -217,14 +217,13 @@ MessageHandlerImpl::transfer(const MethodContext& context,
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
- if (body.isInline()) {
- Message::shared_ptr msg(new MessageMessage(transfer));
- channel.handleInlineTransfer(msg, exchange);
- }
- else {
- // Add to reference.
- references.get(body.getValue()).transfer(transfer);
- }
+ MessageMessage::shared_ptr message(
+ new MessageMessage(&connection, transfer));
+
+ if (body.isInline())
+ channel.handleInlineTransfer(message, exchange);
+ else
+ references.get(body.getValue()).addMessage(message);
client.ok(context);
}
diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp
index a5e734d77a..dc9cb94f84 100644
--- a/cpp/lib/broker/Reference.cpp
+++ b/cpp/lib/broker/Reference.cpp
@@ -43,14 +43,14 @@ Reference& ReferenceRegistry::get(const Reference::Id& id) {
}
void Reference::close() {
- for_each(transfers.begin(), transfers.end(),
+ for_each(messages.begin(), messages.end(),
boost::bind(&Reference::complete, this, _1));
registry->references.erase(getId());
}
-void Reference::complete(TransferPtr transfer) {
- MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this));
- registry->handler.complete(msg);
+void Reference::complete(MessagePtr message) {
+ message->setAppends(appends);
+ registry->handler.complete(message);
}
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h
index ecaca3de41..77c315bbc5 100644
--- a/cpp/lib/broker/Reference.h
+++ b/cpp/lib/broker/Reference.h
@@ -28,20 +28,21 @@
namespace qpid {
namespace framing {
-class MessageTransferBody;
class MessageAppendBody;
}
namespace broker {
+class MessageMessage;
class CompletionHandler;
class ReferenceRegistry;
/**
* A reference is an accumulation point for data in a multi-frame
- * message. A reference can be used by multiple transfer commands, so
- * the reference tracks which commands are using it. When the reference
- * is closed, all the associated transfers are completed.
+ * message. A reference can be used by multiple transfer commands to
+ * create multiple messages, so the reference tracks which commands
+ * are using it. When the reference is closed, all the associated
+ * transfers are completed.
*
* THREAD UNSAFE: per-channel resource, access to channels is
* serialized.
@@ -50,8 +51,8 @@ class Reference
{
public:
typedef std::string Id;
- typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
- typedef std::vector<TransferPtr> Transfers;
+ typedef boost::shared_ptr<MessageMessage> MessagePtr;
+ typedef std::vector<MessagePtr> Messages;
typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
typedef std::vector<AppendPtr> Appends;
@@ -60,24 +61,24 @@ class Reference
const std::string& getId() const { return id; }
- /** Add a transfer to be completed with this reference */
- void transfer(TransferPtr transfer) { transfers.push_back(transfer); }
+ /** Add a message to be completed with this reference */
+ void addMessage(MessagePtr message) { messages.push_back(message); }
/** Append more data to the reference */
void append(AppendPtr ptr) { appends.push_back(ptr); }
- /** Close the reference, complete each associated transfer */
+ /** Close the reference, complete each associated message */
void close();
const Appends& getAppends() const { return appends; }
- const Transfers& getTransfers() const { return transfers; }
+ const Messages& getMessages() const { return messages; }
private:
- void complete(TransferPtr transfer);
+ void complete(MessagePtr message);
Id id;
ReferenceRegistry* registry;
- Transfers transfers;
+ Messages messages;
Appends appends;
};