summaryrefslogtreecommitdiff
path: root/Final/cpp/lib/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Final/cpp/lib/broker/BrokerMessage.cpp')
-rw-r--r--Final/cpp/lib/broker/BrokerMessage.cpp223
1 files changed, 0 insertions, 223 deletions
diff --git a/Final/cpp/lib/broker/BrokerMessage.cpp b/Final/cpp/lib/broker/BrokerMessage.cpp
deleted file mode 100644
index 6ba2131a74..0000000000
--- a/Final/cpp/lib/broker/BrokerMessage.cpp
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <BrokerMessage.h>
-#include <iostream>
-
-#include <InMemoryContent.h>
-#include <LazyLoadedContent.h>
-#include <MessageStore.h>
-#include <BasicDeliverBody.h>
-#include <BasicGetOkBody.h>
-
-using namespace boost;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-Message::Message(const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) : publisher(_publisher),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- redelivered(false),
- size(0),
- persistenceId(0) {}
-
-Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
- publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
-
- decode(buffer, headersOnly, contentChunkSize);
-}
-
-Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
-
-Message::~Message(){
- if (content.get()) content->destroy();
-}
-
-void Message::setHeader(AMQHeaderBody::shared_ptr _header){
- this->header = _header;
-}
-
-void Message::addContent(AMQContentBody::shared_ptr data){
- if (!content.get()) {
- content = std::auto_ptr<Content>(new InMemoryContent());
- }
- content->add(data);
- size += data->size();
-}
-
-bool Message::isComplete(){
- return header.get() && (header->getContentSize() == contentSize());
-}
-
-void Message::redeliver(){
- redelivered = true;
-}
-
-void Message::deliver(OutputHandler* out, int channel,
- const string& consumerTag, u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
- sendContent(out, channel, framesize, version);
-}
-
-void Message::sendGetOk(OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
- sendContent(out, channel, framesize, version);
-}
-
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
- AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
- out->send(new AMQFrame(*version, channel, headerBody));
-
- Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->send(*version, out, channel, framesize);
-}
-
-BasicHeaderProperties* Message::getHeaderProperties(){
- return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
-}
-
-const ConnectionToken* const Message::getPublisher(){
- return publisher;
-}
-
-bool Message::isPersistent()
-{
- if(!header) return false;
- BasicHeaderProperties* props = getHeaderProperties();
- return props && props->getDeliveryMode() == PERSISTENT;
-}
-
-void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
-{
- decodeHeader(buffer);
- if (!headersOnly) decodeContent(buffer, contentChunkSize);
-}
-
-void Message::decodeHeader(Buffer& buffer)
-{
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
-
- u_int32_t headerSize = buffer.getLong();
- AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
- headerBody->decode(buffer, headerSize);
- setHeader(headerBody);
-}
-
-void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
-{
- u_int64_t expected = expectedContentSize();
- if (expected != buffer.available()) {
- std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
- throw Exception("Cannot decode content, buffer not large enough.");
- }
-
- if (!chunkSize || chunkSize > expected) {
- chunkSize = expected;
- }
-
- u_int64_t total = 0;
- while (total < expectedContentSize()) {
- u_int64_t remaining = expected - total;
- AMQContentBody::shared_ptr contentBody(new AMQContentBody());
- contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(contentBody);
- total += chunkSize;
- }
-}
-
-void Message::encode(Buffer& buffer)
-{
- encodeHeader(buffer);
- encodeContent(buffer);
-}
-
-void Message::encodeHeader(Buffer& buffer)
-{
- buffer.putShortString(exchange);
- buffer.putShortString(routingKey);
- buffer.putLong(header->size());
- header->encode(buffer);
-}
-
-void Message::encodeContent(Buffer& buffer)
-{
- Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->encode(buffer);
-}
-
-u_int32_t Message::encodedSize()
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-u_int32_t Message::encodedContentSize()
-{
- Mutex::ScopedLock locker(contentLock);
- return content.get() ? content->size() : 0;
-}
-
-u_int32_t Message::encodedHeaderSize()
-{
- return exchange.size() + 1
- + routingKey.size() + 1
- + header->size() + 4;//4 extra bytes for size
-}
-
-u_int64_t Message::expectedContentSize()
-{
- return header.get() ? header->getContentSize() : 0;
-}
-
-void Message::releaseContent(MessageStore* store)
-{
- Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && persistenceId == 0) {
- store->stage(this);
- }
- if (!content.get() || content->size() > 0) {
- //set content to lazy loading mode (but only if there is stored content):
-
- //Note: the LazyLoadedContent instance contains a raw pointer to the message, however it is
- // then set as a member of that message so its lifetime is guaranteed to be no longer than
- // that of the message itself
- content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, expectedContentSize()));
- }
-}
-
-void Message::setContent(std::auto_ptr<Content>& _content)
-{
- Mutex::ScopedLock locker(contentLock);
- content = _content;
-}