summaryrefslogtreecommitdiff
path: root/cpp/lib
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-02-21 00:23:25 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-02-21 00:23:25 +0000
commitbe9a95607d8e831e8f7c5802828afe677c798b93 (patch)
tree1cbb1db9ad9532bb333079ba33753ac919bff383 /cpp/lib
parent5059a83a83b4c7e16e374c539f8ced77811f7e51 (diff)
downloadqpid-python-be9a95607d8e831e8f7c5802828afe677c798b93.tar.gz
r1152@fuschia: andrew | 2007-02-17 21:14:42 +0000
More support for references (and transfers of reference content) r1220@fuschia: andrew | 2007-02-21 00:22:53 +0000 Working version of delivering Message Transfers by reference git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@509834 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.cpp45
-rw-r--r--cpp/lib/broker/BrokerMessageMessage.h18
-rw-r--r--cpp/lib/broker/MessageHandlerImpl.cpp115
-rw-r--r--cpp/lib/broker/Reference.cpp19
-rw-r--r--cpp/lib/broker/Reference.h23
5 files changed, 135 insertions, 85 deletions
diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp
index f51722f2da..e1be57fad7 100644
--- a/cpp/lib/broker/BrokerMessageMessage.cpp
+++ b/cpp/lib/broker/BrokerMessageMessage.cpp
@@ -22,12 +22,14 @@
#include "BrokerMessageMessage.h"
#include "ChannelAdapter.h"
#include "MessageTransferBody.h"
+#include "MessageOpenBody.h"
+#include "MessageCloseBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
#include "framing/FieldTable.h"
#include "framing/BasicHeaderProperties.h"
-#include <iostream>
+#include <algorithm>
using namespace std;
using namespace qpid::framing;
@@ -36,21 +38,50 @@ namespace qpid {
namespace broker {
MessageMessage::MessageMessage(
- ConnectionToken* publisher, TransferPtr transfer_
+ ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_
) : Message(publisher, transfer_->getDestination(),
transfer_->getRoutingKey(),
transfer_->getMandatory(),
transfer_->getImmediate(),
transfer_),
+ requestId(requestId_),
transfer(transfer_)
{}
+MessageMessage::MessageMessage(
+ ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_,
+ ReferencePtr reference_
+) : Message(publisher, transfer_->getDestination(),
+ transfer_->getRoutingKey(),
+ transfer_->getMandatory(),
+ transfer_->getImmediate(),
+ transfer_),
+ requestId(requestId_),
+ transfer(transfer_),
+ reference(reference_)
+{}
+
void MessageMessage::deliver(
framing::ChannelAdapter& channel,
const std::string& consumerTag,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
+ const framing::Content& body = transfer->getBody();
+
+ // Send any reference data
+ if (!body.isInline()){
+ // Open
+ channel.send(new MessageOpenBody(channel.getVersion(), reference->getId()));
+ // Appends
+ for(Reference::Appends::const_iterator a = reference->getAppends().begin();
+ a != reference->getAppends().end();
+ ++a) {
+ channel.send(new MessageAppendBody(*a->get()));
+ }
+ }
+
+ // The the transfer
channel.send(
new MessageTransferBody(channel.getVersion(),
transfer->getTicket(),
@@ -74,8 +105,13 @@ void MessageMessage::deliver(
transfer->getTransactionId(),
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
- transfer->getBody(),
+ body,
transfer->getMandatory()));
+ // Close any reference data
+ if (!body.isInline()){
+ // Close
+ channel.send(new MessageCloseBody(channel.getVersion(), reference->getId()));
+ }
}
void MessageMessage::sendGetOk(
@@ -120,11 +156,10 @@ bool MessageMessage::isComplete()
u_int64_t MessageMessage::contentSize() const
{
- // FIXME astitcher 2007-2-7 only works for inline content
if (transfer->getBody().isInline())
return transfer->getBody().getValue().size();
else
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unfinished");
+ return reference->getSize();
}
qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h
index 89289a7fd0..3673fde9ed 100644
--- a/cpp/lib/broker/BrokerMessageMessage.h
+++ b/cpp/lib/broker/BrokerMessageMessage.h
@@ -23,7 +23,7 @@
*/
#include "BrokerMessageBase.h"
#include "MessageTransferBody.h"
-#include "Reference.h"
+#include "amqp_types.h"
#include <vector>
@@ -31,7 +31,6 @@ namespace qpid {
namespace framing {
class MessageTransferBody;
-class MessageApppendBody;
}
namespace broker {
@@ -42,17 +41,16 @@ class MessageMessage: public Message{
public:
typedef boost::shared_ptr<MessageMessage> shared_ptr;
typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr;
- typedef Reference::AppendPtr AppendPtr;
- typedef Reference::Appends Appends;
+ typedef boost::shared_ptr<Reference> ReferencePtr;
- MessageMessage(ConnectionToken* publisher, TransferPtr transfer);
+ MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer);
+ MessageMessage(ConnectionToken* publisher, framing::RequestId, TransferPtr transfer, ReferencePtr reference);
// Default destructor okay
+ framing::RequestId getRequestId() {return requestId; }
TransferPtr getTransfer() { return transfer; }
-
- const Appends& getAppends() { return appends; }
- void setAppends(const Appends& appends_) { appends = appends_; }
+ ReferencePtr getReference() { return reference; }
void deliver(framing::ChannelAdapter& channel,
const std::string& consumerTag,
@@ -78,9 +76,9 @@ class MessageMessage: public Message{
u_int64_t expectedContentSize();
private:
-
+ framing::RequestId requestId;
const TransferPtr transfer;
- Appends appends;
+ const ReferencePtr reference;
};
}}
diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp
index 0853aebcb1..784f180d5c 100644
--- a/cpp/lib/broker/MessageHandlerImpl.cpp
+++ b/cpp/lib/broker/MessageHandlerImpl.cpp
@@ -33,46 +33,84 @@ namespace broker {
using namespace framing;
MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
- : HandlerImplType(parent), references(channel) {}
+ : HandlerImplType(parent) {}
//
// Message class method handlers
//
+
+void
+MessageHandlerImpl::cancel(const MethodContext& context,
+ const string& destination )
+{
+ channel.cancel(destination);
+ client.ok(context.getRequestId());
+}
+
+void
+MessageHandlerImpl::open(const MethodContext& context,
+ const string& reference)
+{
+ references.open(reference);
+ client.ok(context.getRequestId());
+}
+
void
MessageHandlerImpl::append(const MethodContext& context,
const string& reference,
const string& /*bytes*/ )
{
- references.get(reference).append(
+ references.get(reference)->append(
boost::shared_polymorphic_downcast<MessageAppendBody>(
context.methodBody));
client.ok(context.getRequestId());
}
-
void
-MessageHandlerImpl::cancel(const MethodContext& context,
- const string& destination )
+MessageHandlerImpl::close(const MethodContext& context,
+ const string& reference)
{
- channel.cancel(destination);
+ Reference::shared_ptr ref = references.get(reference);
client.ok(context.getRequestId());
+
+ // Send any transfer messages to their correct exchanges and okay them
+ const Reference::Messages& msgs = ref->getMessages();
+ for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) {
+ channel.handleInlineTransfer(*m);
+ client.ok((*m)->getRequestId());
+ }
+ ref->close();
}
void
-MessageHandlerImpl::checkpoint(const MethodContext&,
+MessageHandlerImpl::checkpoint(const MethodContext& context,
const string& /*reference*/,
const string& /*identifier*/ )
{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+ // Initial implementation (which is conforming) is to do nothing here
+ // and return offset zero for the resume
+ client.ok(context.getRequestId());
}
void
-MessageHandlerImpl::close(const MethodContext& context,
- const string& reference)
+MessageHandlerImpl::resume(const MethodContext& context,
+ const string& reference,
+ const string& /*identifier*/ )
{
- references.get(reference).close();
- client.ok(context.getRequestId());
+ // Initial (null) implementation
+ // open reference and return 0 offset
+ references.open(reference);
+ client.offset(0, context.getRequestId());
+}
+
+void
+MessageHandlerImpl::offset(const MethodContext&,
+ u_int64_t /*value*/ )
+{
+ // Shouldn't ever receive this as it is reponse to resume
+ // which is never sent
+ // TODO astitcher 2007-02-16 What is the correct exception to throw here?
+ THROW_QPID_ERROR(INTERNAL_ERROR, "impossible");
}
void
@@ -98,14 +136,6 @@ MessageHandlerImpl::consume(const MethodContext& context,
}
void
-MessageHandlerImpl::empty( const MethodContext& )
-{
- // Shouldn't ever receive this as it is a response to get
- // TODO astitcher 2007-02-09 What is the correct exception to throw here?
- THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
-}
-
-void
MessageHandlerImpl::get( const MethodContext& context,
u_int16_t /*ticket*/,
const string& queueName,
@@ -122,11 +152,12 @@ MessageHandlerImpl::get( const MethodContext& context,
}
void
-MessageHandlerImpl::offset(const MethodContext&,
- u_int64_t /*value*/ )
+MessageHandlerImpl::empty( const MethodContext& )
{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
+ // Shouldn't ever receive this as it is a response to get
+ // which is never sent
+ // TODO astitcher 2007-02-09 What is the correct exception to throw here?
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Impossible");
}
void
@@ -136,14 +167,6 @@ MessageHandlerImpl::ok(const MethodContext& /*context*/)
}
void
-MessageHandlerImpl::open(const MethodContext& context,
- const string& reference)
-{
- references.open(reference);
- client.ok(context.getRequestId());
-}
-
-void
MessageHandlerImpl::qos(const MethodContext& context,
u_int32_t prefetchSize,
u_int16_t prefetchCount,
@@ -173,15 +196,6 @@ MessageHandlerImpl::reject(const MethodContext&,
}
void
-MessageHandlerImpl::resume(const MethodContext&,
- const string& /*reference*/,
- const string& /*identifier*/ )
-{
- // FIXME astitcher 2007-01-11: 0-9 feature
- THROW_QPID_ERROR(INTERNAL_ERROR, "Unimplemented ");
-}
-
-void
MessageHandlerImpl::transfer(const MethodContext& context,
u_int16_t /*ticket*/,
const string& /* destination */,
@@ -210,14 +224,19 @@ MessageHandlerImpl::transfer(const MethodContext& context,
MessageTransferBody::shared_ptr transfer(
boost::shared_polymorphic_downcast<MessageTransferBody>(
context.methodBody));
- MessageMessage::shared_ptr message(
- new MessageMessage(&connection, transfer));
+ RequestId requestId = context.getRequestId();
- if (body.isInline())
+ if (body.isInline()) {
+ MessageMessage::shared_ptr message(
+ new MessageMessage(&connection, requestId, transfer));
channel.handleInlineTransfer(message);
- else
- references.get(body.getValue()).addMessage(message);
- client.ok(context.getRequestId());
+ client.ok(requestId);
+ } else {
+ Reference::shared_ptr ref(references.get(body.getValue()));
+ MessageMessage::shared_ptr message(
+ new MessageMessage(&connection, requestId, transfer, ref));
+ ref->addMessage(message);
+ }
}
diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp
index 0aedef2bef..c4c33e6363 100644
--- a/cpp/lib/broker/Reference.cpp
+++ b/cpp/lib/broker/Reference.cpp
@@ -20,36 +20,35 @@
#include "Reference.h"
#include "BrokerMessageMessage.h"
#include "QpidError.h"
+#include "MessageAppendBody.h"
#include "CompletionHandler.h"
namespace qpid {
namespace broker {
-Reference& ReferenceRegistry::open(const Reference::Id& id) {
+Reference::shared_ptr ReferenceRegistry::open(const Reference::Id& id) {
ReferenceMap::iterator i = references.find(id);
// TODO aconway 2007-02-05: should we throw Channel or Connection
// exceptions here?
if (i != references.end())
throw ConnectionException(503, "Attempt to re-open reference " +id);
- return references[id] = Reference(id, this);
+ return references[id] = Reference::shared_ptr(new Reference(id, this));
}
-Reference& ReferenceRegistry::get(const Reference::Id& id) {
+Reference::shared_ptr ReferenceRegistry::get(const Reference::Id& id) {
ReferenceMap::iterator i = references.find(id);
if (i == references.end())
throw ConnectionException(503, "Attempt to use non-existent reference "+id);
return i->second;
}
-void Reference::close() {
- for_each(messages.begin(), messages.end(),
- boost::bind(&Reference::complete, this, _1));
- registry->references.erase(getId());
+void Reference::append(AppendPtr ptr) {
+ appends.push_back(ptr);
+ size += ptr->getBytes().length();
}
-void Reference::complete(MessagePtr message) {
- message->setAppends(appends);
- registry->handler.complete(message);
+void Reference::close() {
+ registry->references.erase(getId());
}
}} // namespace qpid::broker
diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h
index 77c315bbc5..7b3a63fca2 100644
--- a/cpp/lib/broker/Reference.h
+++ b/cpp/lib/broker/Reference.h
@@ -34,7 +34,6 @@ class MessageAppendBody;
namespace broker {
class MessageMessage;
-class CompletionHandler;
class ReferenceRegistry;
/**
@@ -51,21 +50,23 @@ class Reference
{
public:
typedef std::string Id;
+ typedef boost::shared_ptr<Reference> shared_ptr;
typedef boost::shared_ptr<MessageMessage> MessagePtr;
typedef std::vector<MessagePtr> Messages;
typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr;
typedef std::vector<AppendPtr> Appends;
Reference(const Id& id_=Id(), ReferenceRegistry* reg=0)
- : id(id_), registry(reg) {}
+ : id(id_), size(0), registry(reg) {}
const std::string& getId() const { return id; }
+ u_int64_t getSize() const { return size; }
/** Add a message to be completed with this reference */
void addMessage(MessagePtr message) { messages.push_back(message); }
/** Append more data to the reference */
- void append(AppendPtr ptr) { appends.push_back(ptr); }
+ void append(AppendPtr ptr);
/** Close the reference, complete each associated message */
void close();
@@ -74,9 +75,8 @@ class Reference
const Messages& getMessages() const { return messages; }
private:
- void complete(MessagePtr message);
-
Id id;
+ u_int64_t size;
ReferenceRegistry* registry;
Messages messages;
Appends appends;
@@ -91,17 +91,16 @@ class Reference
*/
class ReferenceRegistry {
public:
- ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {};
- Reference& open(const Reference::Id& id);
- Reference& get(const Reference::Id& id);
+ ReferenceRegistry() {};
+ Reference::shared_ptr open(const Reference::Id& id);
+ Reference::shared_ptr get(const Reference::Id& id);
private:
- typedef std::map<Reference::Id, Reference> ReferenceMap;
- CompletionHandler& handler;
+ typedef std::map<Reference::Id, Reference::shared_ptr> ReferenceMap;
ReferenceMap references;
- // Reference calls references.erase() and uses handler.
- friend class Reference;
+ // Reference calls references.erase().
+ friend class Reference;
};