summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Message.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp452
1 files changed, 452 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
new file mode 100644
index 0000000000..763dc55e40
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -0,0 +1,452 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/StringUtils.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/SendContent.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/TypeFilter.h"
+#include "qpid/log/Statement.h"
+
+#include <time.h>
+
+using boost::intrusive_ptr;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_MSEC;
+using qpid::sys::FAR_FUTURE;
+using std::string;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace broker {
+
+TransferAdapter Message::TRANSFER;
+
+Message::Message(const framing::SequenceNumber& id) :
+ frames(id), persistenceId(0), redelivered(false), loaded(false),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ expiration(FAR_FUTURE), dequeueCallback(0),
+ inCallback(false), requiredCredit(0), isManagementMessage(false)
+{}
+
+Message::Message(const Message& original) :
+ PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ expiration(original.expiration), dequeueCallback(0),
+ inCallback(false), requiredCredit(0)
+{
+ setExpiryPolicy(original.expiryPolicy);
+}
+
+Message::~Message()
+{
+ if (expiryPolicy)
+ expiryPolicy->forget(*this);
+}
+
+void Message::forcePersistent()
+{
+ // only set forced bit if we actually need to force.
+ if (! getAdapter().isPersistent(frames) ){
+ forcePersistentPolicy = true;
+ }
+}
+
+bool Message::isForcedPersistent()
+{
+ return forcePersistentPolicy;
+}
+
+std::string Message::getRoutingKey() const
+{
+ return getAdapter().getRoutingKey(frames);
+}
+
+std::string Message::getExchangeName() const
+{
+ return getAdapter().getExchange(frames);
+}
+
+const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
+{
+ if (!exchange) {
+ exchange = registry.get(getExchangeName());
+ }
+ return exchange;
+}
+
+bool Message::isImmediate() const
+{
+ return getAdapter().isImmediate(frames);
+}
+
+const FieldTable* Message::getApplicationHeaders() const
+{
+ return getAdapter().getApplicationHeaders(frames);
+}
+
+std::string Message::getAppId() const
+{
+ return getAdapter().getAppId(frames);
+}
+
+bool Message::isPersistent() const
+{
+ return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
+}
+
+bool Message::requiresAccept()
+{
+ return getAdapter().requiresAccept(frames);
+}
+
+uint32_t Message::getRequiredCredit()
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (!requiredCredit) {
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+ }
+ return requiredCredit;
+}
+
+void Message::encode(framing::Buffer& buffer) const
+{
+ //encode method and header frames
+ EncodeFrame f1(buffer);
+ frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+
+ //then encode the payload of each content frame
+ framing::EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+void Message::encodeContent(framing::Buffer& buffer) const
+{
+ //encode the payload of each content frame
+ EncodeBody f2(buffer);
+ frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+uint32_t Message::encodedSize() const
+{
+ return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t Message::encodedContentSize() const
+{
+ return frames.getContentSize();
+}
+
+uint32_t Message::encodedHeaderSize() const
+{
+ //add up the size for all method and header frames in the frameset
+ SumFrameSize sum;
+ frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+ return sum.getSize();
+}
+
+void Message::decodeHeader(framing::Buffer& buffer)
+{
+ AMQFrame method;
+ method.decode(buffer);
+ frames.append(method);
+
+ AMQFrame header;
+ header.decode(buffer);
+ frames.append(header);
+}
+
+void Message::decodeContent(framing::Buffer& buffer)
+{
+ if (buffer.available()) {
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame((AMQContentBody()));
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frame.setFirstSegment(false);
+ frames.append(frame);
+ } else {
+ //adjust header flags
+ MarkLastSegment f;
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ }
+ //mark content loaded
+ loaded = true;
+}
+
+// Used for testing only
+void Message::tryReleaseContent()
+{
+ if (checkContentReleasable()) {
+ releaseContent();
+ }
+}
+
+void Message::releaseContent(MessageStore* s)
+{
+ //deprecated, use setStore(store); releaseContent(); instead
+ if (!store) setStore(s);
+ releaseContent();
+}
+
+void Message::releaseContent()
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (store) {
+ if (!getPersistenceId()) {
+ intrusive_ptr<PersistableMessage> pmsg(this);
+ store->stage(pmsg);
+ staged = true;
+ }
+ //ensure required credit is cached before content frames are released
+ getRequiredCredit();
+ //remove any content frames from the frameset
+ frames.remove(TypeFilter<CONTENT_BODY>());
+ setContentReleased();
+ }
+}
+
+void Message::destroy()
+{
+ if (staged) {
+ if (store) {
+ store->destroy(*this);
+ } else {
+ QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+ }
+ }
+}
+
+bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
+{
+ intrusive_ptr<const PersistableMessage> pmsg(this);
+
+ bool done = false;
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
+}
+
+void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (isContentReleased() && !frames.isComplete()) {
+ sys::Mutex::ScopedUnlock u(lock);
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
+ {
+ AMQFrame frame((AMQContentBody()));
+ morecontent = getContentFrame(queue, frame, maxContentSize, offset);
+ out.handle(frame);
+ }
+ } else {
+ Count c;
+ frames.map_if(c, TypeFilter<CONTENT_BODY>());
+
+ SendContent f(out, maxFrameSize, c.getCount());
+ frames.map_if(f, TypeFilter<CONTENT_BODY>());
+ }
+}
+
+void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
+{
+ sys::Mutex::ScopedLock l(lock);
+ Relay f(out);
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+}
+
+// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
+// 0-8/0-9 message differences.
+MessageAdapter& Message::getAdapter() const
+{
+ if (!adapter) {
+ if(frames.isA<MessageTransferBody>()) {
+ adapter = &TRANSFER;
+ } else {
+ const AMQMethodBody* method = frames.getMethod();
+ if (!method) throw Exception("Can't adapt message with no method");
+ else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
+ }
+ }
+ return *adapter;
+}
+
+uint64_t Message::contentSize() const
+{
+ return frames.getContentSize();
+}
+
+bool Message::isContentLoaded() const
+{
+ return loaded;
+}
+
+
+namespace
+{
+const std::string X_QPID_TRACE("x-qpid.trace");
+}
+
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
+{
+ const FieldTable* headers = getApplicationHeaders();
+ if (headers) {
+ std::string traceStr = headers->getAsString(X_QPID_TRACE);
+ if (traceStr.size()) {
+ std::vector<std::string> trace = split(traceStr, ", ");
+
+ for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
+ for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
+ if (*i == *j) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+}
+
+void Message::addTraceId(const std::string& id)
+{
+ sys::Mutex::ScopedLock l(lock);
+ if (isA<MessageTransferBody>()) {
+ FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
+ std::string trace = headers.getAsString(X_QPID_TRACE);
+ if (trace.empty()) {
+ headers.setString(X_QPID_TRACE, id);
+ } else if (trace.find(id) == std::string::npos) {
+ trace += ",";
+ trace += id;
+ headers.setString(X_QPID_TRACE, trace);
+ }
+ }
+}
+
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+{
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props->getTtl()) {
+ // AMQP requires setting the expiration property to be posix
+ // time_t in seconds. TTL is in milliseconds
+ if (!props->getExpiration()) {
+ //only set expiration in delivery properties if not already set
+ time_t now = ::time(0);
+ props->setExpiration(now + (props->getTtl()/1000));
+ }
+ // Use higher resolution time for the internal expiry calculation.
+ Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits<int64_t>::max()));//Prevent overflow
+ expiration = AbsTime(AbsTime::now(), ttl);
+ setExpiryPolicy(e);
+ }
+}
+
+void Message::adjustTtl()
+{
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props->getTtl()) {
+ sys::Mutex::ScopedLock l(lock);
+ if (expiration < FAR_FUTURE) {
+ sys::Duration d(sys::AbsTime::now(), getExpiration());
+ props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
+ }
+ }
+}
+
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
+ if (expiryPolicy)
+ expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
+{
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
+}
+
+namespace {
+struct ScopedSet {
+ sys::Monitor& lock;
+ bool& flag;
+ ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+ sys::Monitor::ScopedLock sl(lock);
+ flag = true;
+ }
+ ~ScopedSet(){
+ sys::Monitor::ScopedLock sl(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
+}
+
+void Message::allDequeuesComplete() {
+ ScopedSet ss(callbackLock, inCallback);
+ MessageCallback* cb = dequeueCallback;
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
+}
+
+void Message::setDequeueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
+ dequeueCallback = &cb;
+}
+
+void Message::resetDequeueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
+ dequeueCallback = 0;
+}
+
+uint8_t Message::getPriority() const {
+ return getAdapter().getPriority(frames);
+}
+
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+ return getProperties<MessageProperties>()->getApplicationHeaders();
+}
+
+bool Message::getIsManagementMessage() const { return isManagementMessage; }
+void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+
+}} // namespace qpid::broker