summaryrefslogtreecommitdiff
path: root/cpp/src/client/MessageMessageChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/client/MessageMessageChannel.cpp')
-rw-r--r--cpp/src/client/MessageMessageChannel.cpp43
1 files changed, 39 insertions, 4 deletions
diff --git a/cpp/src/client/MessageMessageChannel.cpp b/cpp/src/client/MessageMessageChannel.cpp
index 164a1cb426..8d0fdc3189 100644
--- a/cpp/src/client/MessageMessageChannel.cpp
+++ b/cpp/src/client/MessageMessageChannel.cpp
@@ -192,6 +192,17 @@ MessageTransferBody::shared_ptr makeTransfer(
));
}
+// FIXME aconway 2007-04-05: Generated code should provide this.
+/**
+ * Calculate the size of a frame containing the given body type
+ * if all variable-lengths parts are empty.
+ */
+template <class T> size_t overhead() {
+ static AMQFrame frame(
+ ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion())));
+ return frame.size();
+}
+
void MessageMessageChannel::publish(
const Message& msg, const Exchange& exchange,
const std::string& routingKey, bool mandatory, bool immediate)
@@ -201,11 +212,35 @@ void MessageMessageChannel::publish(
msg, exchange.getName(), routingKey, mandatory, immediate);
// Frame itself uses 8 bytes.
u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8;
- if (transfer->size() > frameMax) {
- // FIXME aconway 2007-02-23:
- throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented");
+ if (transfer->size() <= frameMax) {
+ channel.sendAndReceive<MessageOkBody>(transfer);
+ }
+ else {
+ std::string ref = newTag();
+ std::string data = transfer->getBody().getValue();
+ size_t chunk =
+ channel.connection->getMaxFrameSize() -
+ (overhead<MessageAppendBody>() + ref.size());
+ // TODO aconway 2007-04-05: cast around lack of generated setters
+ const_cast<Content&>(transfer->getBody()) = Content(REFERENCE,ref);
+ channel.send(
+ make_shared_ptr(new MessageOpenBody(channel.version, ref)));
+ channel.send(transfer);
+ const char* p = data.data();
+ const char* end = data.data()+data.size();
+ while (p+chunk <= end) {
+ channel.send(
+ make_shared_ptr(
+ new MessageAppendBody(channel.version, ref, std::string(p, chunk))));
+ p += chunk;
+ }
+ if (p < end) {
+ channel.send(
+ make_shared_ptr(
+ new MessageAppendBody(channel.version, ref, std::string(p, end-p))));
+ }
+ channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref)));
}
- channel.sendAndReceive<MessageOkBody>(transfer);
}
void copy(Message& msg, MessageTransferBody& transfer) {