summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
committerAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
commit8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch)
tree1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/src/qpid/broker/Message.cpp
parent9a808fb13aba243d41bbdab75158dae5939a80a4 (diff)
downloadqpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp100
1 files changed, 100 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
new file mode 100644
index 0000000000..7210ecc2f2
--- /dev/null
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "./qpid/concurrent/MonitorImpl.h"
+#include "./qpid/broker/Message.h"
+#include "./qpid/broker/ExchangeRegistry.h"
+#include <iostream>
+
+using namespace std::tr1;//for *_pointer_cast methods
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+
+Message::Message(const ConnectionToken* const _publisher,
+ const string& _exchange, const string& _routingKey,
+ bool _mandatory, bool _immediate) : publisher(_publisher),
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ redelivered(false),
+ size(0){
+
+}
+
+Message::~Message(){
+}
+
+void Message::setHeader(AMQHeaderBody::shared_ptr _header){
+ this->header = _header;
+}
+
+void Message::addContent(AMQContentBody::shared_ptr data){
+ content.push_back(data);
+ size += data->size();
+}
+
+bool Message::isComplete(){
+ return header.get() && (header->getContentSize() == contentSize());
+}
+
+void Message::redeliver(){
+ redelivered = true;
+}
+
+void Message::deliver(OutputHandler* out, int channel,
+ const string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendGetOk(OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
+ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+ out->send(new AMQFrame(channel, headerBody));
+ for(content_iterator i = content.begin(); i != content.end(); i++){
+ if((*i)->size() > framesize){
+ //TODO: need to split it
+ std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
+ }else{
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ out->send(new AMQFrame(channel, contentBody));
+ }
+ }
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+ return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+const ConnectionToken* const Message::getPublisher(){
+ return publisher;
+}
+