summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp219
1 files changed, 177 insertions, 42 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 331bb5e716..47ca7a7ae8 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -19,8 +19,9 @@
*
*/
-#include "Message.h"
-#include "ExchangeRegistry.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -30,17 +31,43 @@
#include "qpid/framing/TypeFilter.h"
#include "qpid/log/Statement.h"
+#include <time.h>
+
using boost::intrusive_ptr;
-using namespace qpid::broker;
-using namespace qpid::framing;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_MSEC;
+using qpid::sys::FAR_FUTURE;
using std::string;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace broker {
TransferAdapter Message::TRANSFER;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
+Message::Message(const framing::SequenceNumber& id) :
+ frames(id), persistenceId(0), redelivered(false), loaded(false),
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
Message::~Message()
{
+ if (expiryPolicy)
+ expiryPolicy->forget(*this);
+}
+
+void Message::forcePersistent()
+{
+ // only set forced bit if we actually need to force.
+ if (! getAdapter().isPersistent(frames) ){
+ forcePersistentPolicy = true;
+ }
+}
+
+bool Message::isForcedPersistent()
+{
+ return forcePersistentPolicy;
}
std::string Message::getRoutingKey() const
@@ -71,9 +98,9 @@ const FieldTable* Message::getApplicationHeaders() const
return getAdapter().getApplicationHeaders(frames);
}
-bool Message::isPersistent()
+bool Message::isPersistent() const
{
- return getAdapter().isPersistent(frames);
+ return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
bool Message::requiresAccept()
@@ -81,12 +108,16 @@ bool Message::requiresAccept()
return getAdapter().requiresAccept(frames);
}
-uint32_t Message::getRequiredCredit() const
+uint32_t Message::getRequiredCredit()
{
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- return sum.getSize();
+ sys::Mutex::ScopedLock l(lock);
+ if (!requiredCredit) {
+ //add up payload for all header and content frames in the frameset
+ SumBodySize sum;
+ frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
+ requiredCredit = sum.getSize();
+ }
+ return requiredCredit;
}
void Message::encode(framing::Buffer& buffer) const
@@ -96,7 +127,7 @@ void Message::encode(framing::Buffer& buffer) const
frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
//then encode the payload of each content frame
- EncodeBody f2(buffer);
+ framing::EncodeBody f2(buffer);
frames.map_if(f2, TypeFilter<CONTENT_BODY>());
}
@@ -141,9 +172,9 @@ void Message::decodeContent(framing::Buffer& buffer)
if (buffer.available()) {
//get the data as a string and set that as the content
//body on a frame then add that frame to the frameset
- AMQFrame frame;
- frame.setBody(AMQContentBody());
+ AMQFrame frame((AMQContentBody()));
frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frame.setFirstSegment(false);
frames.append(frame);
} else {
//adjust header flags
@@ -154,17 +185,31 @@ void Message::decodeContent(framing::Buffer& buffer)
loaded = true;
}
-void Message::releaseContent(MessageStore* _store)
+void Message::tryReleaseContent()
{
- if (!store) {
- store = _store;
+ if (checkContentReleasable()) {
+ releaseContent();
}
+}
+
+void Message::releaseContent(MessageStore* s)
+{
+ //deprecated, use setStore(store); releaseContent(); instead
+ if (!store) setStore(s);
+ releaseContent();
+}
+
+void Message::releaseContent()
+{
+ sys::Mutex::ScopedLock l(lock);
if (store) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
staged = true;
}
+ //ensure required credit is cached before content frames are released
+ getRequiredCredit();
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
@@ -182,30 +227,37 @@ void Message::destroy()
}
}
-void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
+{
+ intrusive_ptr<const PersistableMessage> pmsg(this);
+
+ bool done = false;
+ string& data = frame.castBody<AMQContentBody>()->getData();
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
+ frame.setBof(false);
+ frame.setEof(true);
+ QPID_LOG(debug, "loaded frame" << frame);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ if (!done) {
+ frame.setEos(false);
+ } else return false;
+ return true;
+}
+
+void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
- if (isContentReleased()) {
- //load content from store in chunks of maxContentSize
+ sys::Mutex::ScopedLock l(lock);
+ if (isContentReleased() && !frames.isComplete()) {
+ sys::Mutex::ScopedUnlock u(lock);
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- for (uint64_t offset = 0; !done; offset += maxContentSize)
+ bool morecontent = true;
+ for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
- AMQFrame frame(in_place<AMQContentBody>());
- string& data = frame.castBody<AMQContentBody>()->getData();
-
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- }
- QPID_LOG(debug, "loaded frame for delivery: " << frame);
+ AMQFrame frame((AMQContentBody()));
+ morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
}
} else {
@@ -253,14 +305,14 @@ bool Message::isContentLoaded() const
namespace
{
- const std::string X_QPID_TRACE("x-qpid.trace");
+const std::string X_QPID_TRACE("x-qpid.trace");
}
bool Message::isExcluded(const std::vector<std::string>& excludes) const
{
const FieldTable* headers = getApplicationHeaders();
if (headers) {
- std::string traceStr = headers->getString(X_QPID_TRACE);
+ std::string traceStr = headers->getAsString(X_QPID_TRACE);
if (traceStr.size()) {
std::vector<std::string> trace = split(traceStr, ", ");
@@ -281,7 +333,7 @@ void Message::addTraceId(const std::string& id)
sys::Mutex::ScopedLock l(lock);
if (isA<MessageTransferBody>()) {
FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
- std::string trace = headers.getString(X_QPID_TRACE);
+ std::string trace = headers.getAsString(X_QPID_TRACE);
if (trace.empty()) {
headers.setString(X_QPID_TRACE, id);
} else if (trace.find(id) == std::string::npos) {
@@ -291,3 +343,86 @@ void Message::addTraceId(const std::string& id)
}
}
}
+
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+{
+ DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props->getTtl()) {
+ // AMQP requires setting the expiration property to be posix
+ // time_t in seconds. TTL is in milliseconds
+ if (!props->getExpiration()) {
+ //only set expiration in delivery properties if not already set
+ time_t now = ::time(0);
+ props->setExpiration(now + (props->getTtl()/1000));
+ }
+ // Use higher resolution time for the internal expiry calculation.
+ expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+ setExpiryPolicy(e);
+ }
+}
+
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
+ if (expiryPolicy)
+ expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
+{
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
+}
+
+boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
+{
+ sys::Mutex::ScopedLock l(lock);
+ Replacement::iterator i = replacement.find(qfor);
+ if (i != replacement.end()){
+ return i->second;
+ }
+ return empty;
+}
+
+void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
+{
+ sys::Mutex::ScopedLock l(lock);
+ replacement[qfor] = msg;
+}
+
+void Message::allEnqueuesComplete() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ MessageCallback* cb = enqueueCallback;
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
+}
+
+void Message::allDequeuesComplete() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ MessageCallback* cb = dequeueCallback;
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
+}
+
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ enqueueCallback = &cb;
+}
+
+void Message::resetEnqueueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ enqueueCallback = 0;
+}
+
+void Message::setDequeueCompleteCallback(MessageCallback& cb) {
+ sys::Mutex::ScopedLock l(callbackLock);
+ dequeueCallback = &cb;
+}
+
+void Message::resetDequeueCompleteCallback() {
+ sys::Mutex::ScopedLock l(callbackLock);
+ dequeueCallback = 0;
+}
+
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+ return getProperties<MessageProperties>()->getApplicationHeaders();
+}
+
+}} // namespace qpid::broker