summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp542
1 files changed, 152 insertions, 390 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 4dd8a349dd..c48e9bcfa4 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -20,19 +20,12 @@
*/
#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/MapHandler.h"
#include "qpid/StringUtils.h"
-#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/SendContent.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/TypeFilter.h"
-#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include <algorithm>
+#include <string.h>
#include <time.h>
using boost::intrusive_ptr;
@@ -41,492 +34,261 @@ using qpid::sys::Duration;
using qpid::sys::TIME_MSEC;
using qpid::sys::FAR_FUTURE;
using std::string;
-using namespace qpid::framing;
namespace qpid {
namespace broker {
-TransferAdapter Message::TRANSFER;
-
-Message::Message(const framing::SequenceNumber& id) :
- frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), dequeueCallback(0),
- inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
-{}
-
-Message::~Message() {}
-
-void Message::forcePersistent()
+Message::Message() : deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) {}
+Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
+ : encoding(e), persistentContext(p), deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false)
{
- sys::Mutex::ScopedLock l(lock);
- // only set forced bit if we actually need to force.
- if (! getAdapter().isPersistent(frames) ){
- forcePersistentPolicy = true;
- }
+ if (persistentContext) persistentContext->setIngressCompletion(e);
}
+Message::~Message() {}
-bool Message::isForcedPersistent()
-{
- return forcePersistentPolicy;
-}
std::string Message::getRoutingKey() const
{
- return getAdapter().getRoutingKey(frames);
-}
-
-std::string Message::getExchangeName() const
-{
- return getAdapter().getExchange(frames);
-}
-
-const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
-{
- if (!exchange) {
- exchange = registry.get(getExchangeName());
- }
- return exchange;
-}
-
-bool Message::isImmediate() const
-{
- return getAdapter().isImmediate(frames);
-}
-
-const FieldTable* Message::getApplicationHeaders() const
-{
- sys::Mutex::ScopedLock l(lock);
- return getAdapter().getApplicationHeaders(frames);
-}
-
-std::string Message::getAppId() const
-{
- sys::Mutex::ScopedLock l(lock);
- return getAdapter().getAppId(frames);
+ return getEncoding().getRoutingKey();
}
bool Message::isPersistent() const
{
- sys::Mutex::ScopedLock l(lock);
- return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
+ return getEncoding().isPersistent();
}
-bool Message::requiresAccept()
+uint64_t Message::getContentSize() const
{
- return getAdapter().requiresAccept(frames);
+ return getEncoding().getContentSize();
}
-uint32_t Message::getRequiredCredit()
+boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
{
- sys::Mutex::ScopedLock l(lock);
- if (!requiredCredit) {
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- requiredCredit = sum.getSize();
- }
- return requiredCredit;
+ return encoding;
}
-void Message::encode(framing::Buffer& buffer) const
-{
- sys::Mutex::ScopedLock l(lock);
- //encode method and header frames
- EncodeFrame f1(buffer);
- frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
-
- //then encode the payload of each content frame
- framing::EncodeBody f2(buffer);
- frames.map_if(f2, TypeFilter<CONTENT_BODY>());
-}
-
-void Message::encodeContent(framing::Buffer& buffer) const
-{
- sys::Mutex::ScopedLock l(lock);
- //encode the payload of each content frame
- EncodeBody f2(buffer);
- frames.map_if(f2, TypeFilter<CONTENT_BODY>());
-}
-
-uint32_t Message::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t Message::encodedContentSize() const
-{
- sys::Mutex::ScopedLock l(lock);
- return frames.getContentSize();
-}
-
-uint32_t Message::encodedHeaderSize() const
-{
- sys::Mutex::ScopedLock l(lock); // prevent modifications while computing size
- //add up the size for all method and header frames in the frameset
- SumFrameSize sum;
- frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
- return sum.getSize();
-}
-
-void Message::decodeHeader(framing::Buffer& buffer)
+namespace
{
- AMQFrame method;
- method.decode(buffer);
- frames.append(method);
-
- AMQFrame header;
- header.decode(buffer);
- frames.append(header);
+const std::string X_QPID_TRACE("x-qpid.trace");
}
-void Message::decodeContent(framing::Buffer& buffer)
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
{
- if (buffer.available()) {
- //get the data as a string and set that as the content
- //body on a frame then add that frame to the frameset
- AMQFrame frame((AMQContentBody()));
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
- frame.setFirstSegment(false);
- frames.append(frame);
- } else {
- //adjust header flags
- MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ std::string traceStr = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+ if (traceStr.size()) {
+ std::vector<std::string> trace = split(traceStr, ", ");
+ for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
+ for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
+ if (*i == *j) {
+ return true;
+ }
+ }
+ }
}
- //mark content loaded
- loaded = true;
+ return false;
}
-// Used for testing only
-void Message::tryReleaseContent()
+void Message::addTraceId(const std::string& id)
{
- if (checkContentReleasable()) {
- releaseContent();
+ std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+ if (trace.empty()) {
+ annotations[X_QPID_TRACE] = id;
+ } else if (trace.find(id) == std::string::npos) {
+ trace += ",";
+ trace += id;
+ annotations[X_QPID_TRACE] = trace;
}
+ annotationsChanged();
}
-void Message::releaseContent(MessageStore* s)
+void Message::clearTrace()
{
- //deprecated, use setStore(store); releaseContent(); instead
- if (!store) setStore(s);
- releaseContent();
+ annotations[X_QPID_TRACE] = std::string();
+ annotationsChanged();
}
-void Message::releaseContent()
+void Message::setTimestamp()
{
- sys::Mutex::ScopedLock l(lock);
- if (store) {
- if (!getPersistenceId()) {
- intrusive_ptr<PersistableMessage> pmsg(this);
- store->stage(pmsg);
- staged = true;
- }
- //ensure required credit and size is cached before content frames are released
- getRequiredCredit();
- contentSize();
- //remove any content frames from the frameset
- frames.remove(TypeFilter<CONTENT_BODY>());
- setContentReleased();
- }
+ timestamp = ::time(0); // AMQP-0.10: posix time_t - secs since Epoch
}
-void Message::destroy()
+uint64_t Message::getTimestamp() const
{
- if (staged) {
- if (store) {
- store->destroy(*this);
- } else {
- QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
- }
- }
+ return timestamp;
}
-bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
+uint64_t Message::getTtl() const
{
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- QPID_LOG(debug, "loaded frame" << frame);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- } else return false;
- return true;
-}
-
-void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
-{
- sys::Mutex::ScopedLock l(lock);
- if (isContentReleased() && !frames.isComplete()) {
- sys::Mutex::ScopedUnlock u(lock);
- uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- bool morecontent = true;
- for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
- AMQFrame frame((AMQContentBody()));
- morecontent = getContentFrame(queue, frame, maxContentSize, offset);
- out.handle(frame);
- }
- queue.countLoadedFromDisk(contentSize());
+ uint64_t ttl;
+ if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
+ sys::AbsTime current(
+ expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ sys::Duration ttl(current, getExpiration());
+ // convert from ns to ms; set to 1 if expired
+ return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
} else {
- Count c;
- frames.map_if(c, TypeFilter<CONTENT_BODY>());
-
- SendContent f(out, maxFrameSize, c.getCount());
- frames.map_if(f, TypeFilter<CONTENT_BODY>());
+ return 0;
}
}
-void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
-{
- sys::Mutex::ScopedLock l(lock);
- Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
- //as frame (and pointer to body) has now been passed to handler,
- //subsequent modifications should use a copy
- copyHeaderOnWrite = true;
-}
-
-// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
-// 0-8/0-9 message differences.
-MessageAdapter& Message::getAdapter() const
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- if (!adapter) {
- if(frames.isA<MessageTransferBody>()) {
- adapter = &TRANSFER;
- } else {
- const AMQMethodBody* method = frames.getMethod();
- if (!method) throw Exception("Can't adapt message with no method");
- else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
+ //TODO: this is still quite 0-10 specific...
+ uint64_t ttl;
+ if (getEncoding().getTtl(ttl)) {
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration duration(std::min(ttl * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), duration);
+ setExpiryPolicy(e);
}
}
- return *adapter;
}
-uint64_t Message::contentSize() const
+void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
{
- return frames.getContentSize();
+ annotations[key] = value;
+ annotationsChanged();
}
-bool Message::isContentLoaded() const
+void Message::annotationsChanged()
{
- return loaded;
+ if (persistentContext) {
+ persistentContext = persistentContext->merge(annotations);
+ persistentContext->setIngressCompletion(encoding);
+ }
}
-
-namespace
-{
-const std::string X_QPID_TRACE("x-qpid.trace");
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
}
-bool Message::isExcluded(const std::vector<std::string>& excludes) const
+bool Message::hasExpired() const
{
- sys::Mutex::ScopedLock l(lock);
- const FieldTable* headers = getApplicationHeaders();
- if (headers) {
- std::string traceStr = headers->getAsString(X_QPID_TRACE);
- if (traceStr.size()) {
- std::vector<std::string> trace = split(traceStr, ", ");
-
- for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
- for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
- if (*i == *j) {
- return true;
- }
- }
- }
- }
- }
- return false;
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
}
-class CloneHeaderBody
-{
-public:
- void operator()(AMQFrame& f)
- {
- f.cloneBody();
- }
-};
-
-AMQHeaderBody* Message::getHeaderBody()
+uint8_t Message::getPriority() const
{
- // expects lock to be held
- if (copyHeaderOnWrite) {
- CloneHeaderBody f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
- copyHeaderOnWrite = false;
- }
- return frames.getHeaders();
+ return getEncoding().getPriority();
}
-void Message::addTraceId(const std::string& id)
+bool Message::getIsManagementMessage() const { return isManagementMessage; }
+void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+qpid::framing::SequenceNumber Message::getSequence() const
{
- sys::Mutex::ScopedLock l(lock);
- if (isA<MessageTransferBody>()) {
- FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
- std::string trace = headers.getAsString(X_QPID_TRACE);
- if (trace.empty()) {
- headers.setString(X_QPID_TRACE, id);
- } else if (trace.find(id) == std::string::npos) {
- trace += ",";
- trace += id;
- headers.setString(X_QPID_TRACE, trace);
- }
- }
+ return sequence;
}
-
-void Message::clearTrace()
+void Message::setSequence(const qpid::framing::SequenceNumber& s)
{
- sys::Mutex::ScopedLock l(lock);
- if (isA<MessageTransferBody>()) {
- FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
- std::string trace = headers.getAsString(X_QPID_TRACE);
- if (!trace.empty()) {
- headers.setString(X_QPID_TRACE, "");
- }
- }
+ sequence = s;
}
-void Message::setTimestamp()
+MessageState Message::getState() const
{
- sys::Mutex::ScopedLock l(lock);
- DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
- time_t now = ::time(0);
- props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+ return state;
}
-
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setState(MessageState s)
{
- sys::Mutex::ScopedLock l(lock);
- DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
- if (props->getTtl()) {
- // AMQP requires setting the expiration property to be posix
- // time_t in seconds. TTL is in milliseconds
- if (!props->getExpiration()) {
- //only set expiration in delivery properties if not already set
- time_t now = ::time(0);
- props->setExpiration(now + (props->getTtl()/1000));
- }
- if (e) {
- // Use higher resolution time for the internal expiry calculation.
- // Prevent overflow as a signed int64_t
- Duration ttl(std::min(props->getTtl() * TIME_MSEC,
- (uint64_t) std::numeric_limits<int64_t>::max()));
- expiration = AbsTime(e->getCurrentTime(), ttl);
- setExpiryPolicy(e);
- }
- }
+ state = s;
}
-void Message::adjustTtl()
+const qpid::types::Variant::Map& Message::getAnnotations() const
{
- sys::Mutex::ScopedLock l(lock);
- DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
- if (props->getTtl()) {
- if (expiration < FAR_FUTURE) {
- sys::AbsTime current(
- expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
- sys::Duration ttl(current, getExpiration());
- // convert from ns to ms; set to 1 if expired
- props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
- }
- }
+ return annotations;
}
-void Message::setRedelivered()
+qpid::types::Variant Message::getAnnotation(const std::string& key) const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+ qpid::types::Variant::Map::const_iterator i = annotations.find(key);
+ if (i != annotations.end()) return i->second;
+ //FIXME: modify Encoding interface to allow retrieval of
+ //annotations of different types from the message data as received
+ //off the wire
+ return qpid::types::Variant(getEncoding().getAnnotationAsString(key));
}
-void Message::insertCustomProperty(const std::string& key, int64_t value)
+std::string Message::getUserId() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+ return encoding->getUserId();
}
-void Message::insertCustomProperty(const std::string& key, const std::string& value)
+Message::Encoding& Message::getEncoding()
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+ return *encoding;
}
-
-void Message::removeCustomProperty(const std::string& key)
+const Message::Encoding& Message::getEncoding() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+ return *encoding;
}
-
-void Message::setExchange(const std::string& exchange)
+Message::operator bool() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+ return encoding;
}
-void Message::clearApplicationHeadersFlag()
+std::string Message::getContent() const
{
- sys::Mutex::ScopedLock l(lock);
- getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
-}
-
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
- expiryPolicy = e;
+ return encoding->getContent();
}
-bool Message::hasExpired()
+std::string Message::getPropertyAsString(const std::string& key) const
{
- return expiryPolicy && expiryPolicy->hasExpired(*this);
+ return encoding->getPropertyAsString(key);
}
-
namespace {
-struct ScopedSet {
- sys::Monitor& lock;
- bool& flag;
- ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
- sys::Monitor::ScopedLock sl(lock);
- flag = true;
+class PropertyRetriever : public MapHandler
+{
+ public:
+ PropertyRetriever(const std::string& key) : name(key) {}
+ void handleVoid(const CharSequence&) {}
+ void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
+ void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
+ void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
+ void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
+ void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
+ void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
+ void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
+ void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
+ void handleFloat(const CharSequence& key, float value) { handle(key, value); }
+ void handleDouble(const CharSequence& key, double value) { handle(key, value); }
+ void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+ {
+ if (matches(key)) result = std::string(value.data, value.size);
}
- ~ScopedSet(){
- sys::Monitor::ScopedLock sl(lock);
- flag = false;
- lock.notifyAll();
+ qpid::types::Variant getResult() { return result; }
+
+ private:
+ std::string name;
+ qpid::types::Variant result;
+
+ bool matches(const CharSequence& key)
+ {
+ return ::strncmp(key.data, name.data(), std::min(key.size, name.size())) == 0;
}
-};
-}
-void Message::allDequeuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = dequeueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
+ template <typename T> void handle(const CharSequence& key, T value)
+ {
+ if (matches(key)) result = value;
+ }
+};
}
-
-void Message::setDequeueCompleteCallback(MessageCallback& cb) {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- dequeueCallback = &cb;
+qpid::types::Variant Message::getProperty(const std::string& key) const
+{
+ PropertyRetriever r(key);
+ encoding->processProperties(r);
+ return r.getResult();
}
-void Message::resetDequeueCompleteCallback() {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- dequeueCallback = 0;
+boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
+{
+ return persistentContext;
}
-uint8_t Message::getPriority() const {
- sys::Mutex::ScopedLock l(lock);
- return getAdapter().getPriority(frames);
+void Message::processProperties(MapHandler& handler) const
+{
+ encoding->processProperties(handler);
}
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
-
}} // namespace qpid::broker