summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-11-08 18:46:53 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-11-08 18:46:53 +0000
commitd9ba9eae91290581b62f67239300897a96827e83 (patch)
tree67ea6a18828a42cafea5fce43b36b8b5f7c5becf /cpp
parent10a794c9d2fede2a0db9cf80e95f19d56e931196 (diff)
downloadqpid-python-d9ba9eae91290581b62f67239300897a96827e83.tar.gz
- enable the ability to lazy load from async store
- the ci has a raw ptr for Queue in QueuedMessage, if any has any concerns, ping me and I will convert it to an auto_ptr Carl. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593251 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Consumer.h13
-rw-r--r--cpp/src/qpid/broker/DeliveryAdapter.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp4
-rw-r--r--cpp/src/qpid/broker/Message.h3
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp2
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp8
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.h3
-rw-r--r--cpp/src/qpid/broker/MessageStore.h14
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp7
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.h3
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp2
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp8
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp6
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp2
-rw-r--r--cpp/src/tests/DeliveryRecordTest.cpp2
-rw-r--r--cpp/src/tests/TxAckTest.cpp2
21 files changed, 56 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 8c57d7d2b8..fb60fc88a8 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -21,6 +21,11 @@
#ifndef _Consumer_
#define _Consumer_
+namespace qpid {
+ namespace broker {
+ class Queue;
+}}
+
#include "Message.h"
namespace qpid {
@@ -30,9 +35,11 @@ namespace qpid {
{
Message::shared_ptr payload;
framing::SequenceNumber position;
-
- QueuedMessage(Message::shared_ptr msg, framing::SequenceNumber sn) : payload(msg), position(sn) {}
- QueuedMessage() {}
+ Queue* queue;
+
+ QueuedMessage(Queue* q, Message::shared_ptr msg, framing::SequenceNumber sn) :
+ payload(msg), position(sn), queue(q) {}
+ QueuedMessage(Queue* q) : queue(q) {}
};
diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h
index 28e2e3eb93..4c2b2f615f 100644
--- a/cpp/src/qpid/broker/DeliveryAdapter.h
+++ b/cpp/src/qpid/broker/DeliveryAdapter.h
@@ -42,7 +42,7 @@ namespace broker {
class DeliveryAdapter
{
public:
- virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0;
+ virtual DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) = 0;
virtual ~DeliveryAdapter(){}
};
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 176209cd4d..f20aff1f23 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -89,7 +89,7 @@ void DeliveryRecord::redeliver(SemanticState* const session) {
requeue();
}else{
msg.payload->redeliver();//mark as redelivered
- id = session->redeliver(msg.payload, token);
+ id = session->redeliver(msg, token);
}
}
}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 6e3e6a55f7..4e075e73a3 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -150,7 +150,7 @@ void Message::releaseContent(MessageStore* _store)
setContentReleased();
}
-void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const
+void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
if (isContentReleased()) {
//load content from store in chunks of maxContentSize
@@ -162,7 +162,7 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) con
AMQFrame frame(0, AMQContentBody());
string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(*this, data, offset,
+ store->loadContent(queue, *this, data, offset,
remaining > maxContentSize ? maxContentSize : remaining);
frame.setBof(false);
frame.setEof(true);
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index d043d50ad0..f706a65e52 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -41,6 +41,7 @@ class ConnectionToken;
class Exchange;
class ExchangeRegistry;
class MessageStore;
+class Queue;
class Message : public PersistableMessage {
public:
@@ -114,7 +115,7 @@ public:
*/
void releaseContent(MessageStore* store);
- void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const;
+ void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
bool isContentLoaded() const;
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index e65db391b5..42448babb5 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -54,7 +54,7 @@ void MessageBuilder::handle(AMQFrame& frame)
message->getFrames().append(frame);
//have we reached the staging limit? if so stage message and release content
if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) {
- message->releaseContent(store);
+ message->releaseContent(store);
staging = true;
}
}
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 2eea97ced0..b29850f9e1 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -113,7 +113,7 @@ DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::st
return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode));
}
-void MessageDelivery::deliver(Message::shared_ptr msg,
+void MessageDelivery::deliver(QueuedMessage& msg,
framing::FrameHandler& handler,
DeliveryId id,
DeliveryToken::shared_ptr token,
@@ -124,9 +124,9 @@ void MessageDelivery::deliver(Message::shared_ptr msg,
//have one content class for 0-10 proper
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
- AMQFrame method = t->sendMethod(msg, id);
+ AMQFrame method = t->sendMethod(msg.payload, id);
method.setEof(false);
handler.handle(method);
- msg->sendHeader(handler, framesize);
- msg->sendContent(handler, framesize);
+ msg.payload->sendHeader(handler, framesize);
+ msg.payload->sendContent(*(msg.queue), handler, framesize);
}
diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h
index 3beb268ca7..ac7818feed 100644
--- a/cpp/src/qpid/broker/MessageDelivery.h
+++ b/cpp/src/qpid/broker/MessageDelivery.h
@@ -23,6 +23,7 @@
*/
#include <boost/shared_ptr.hpp>
#include "DeliveryId.h"
+#include "Consumer.h"
#include "qpid/framing/FrameHandler.h"
namespace qpid {
@@ -43,7 +44,7 @@ public:
u_int8_t confirmMode,
u_int8_t acquireMode);
- static void deliver(boost::shared_ptr<Message> msg, framing::FrameHandler& out,
+ static void deliver(QueuedMessage& msg, framing::FrameHandler& out,
DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize);
};
diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h
index b88abc277a..04dbb22376 100644
--- a/cpp/src/qpid/broker/MessageStore.h
+++ b/cpp/src/qpid/broker/MessageStore.h
@@ -85,9 +85,16 @@ public:
* point). If the message has not yet been stored it will
* store the headers as well as any content passed in. A
* persistence id will be set on the message which can be
- * used to load the content or to append to it.
+ * used to load the content or to append to it.
+ *
+ * TODO ::If it is know
+ * which queue the message is to be staged/ release to in cases
+ * of flowing tmp messages to disk for memory conservation set
+ * the queue ptr. This allows the store to optimize the read/writes
+ * for that queue and avoid searching based on id. Set queue = 0 for
+ * large message staging when the queue is not known.
*/
- virtual void stage(PersistableMessage& msg) = 0;
+ virtual void stage( PersistableMessage& msg) = 0;
/**
* Destroys a previously staged message. This only needs
@@ -110,7 +117,8 @@ public:
* content should be loaded, not the headers or related
* meta-data).
*/
- virtual void loadContent(const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0;
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0;
/**
* Enqueues a message, storing the message if it has not
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 1ddad65c56..adb41f6094 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -71,7 +71,7 @@ void MessageStoreModule::recover(RecoveryManager& registry)
store->recover(registry);
}
-void MessageStoreModule::stage(PersistableMessage& msg)
+void MessageStoreModule::stage( PersistableMessage& msg)
{
store->stage(msg);
}
@@ -86,9 +86,10 @@ void MessageStoreModule::appendContent(const PersistableMessage& msg, const std:
store->appendContent(msg, data);
}
-void MessageStoreModule::loadContent(const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length)
+void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue,
+ const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length)
{
- store->loadContent(msg, data, offset, length);
+ store->loadContent(queue, msg, data, offset, length);
}
void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue)
diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h
index 1d256b972b..6738f0e539 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.h
+++ b/cpp/src/qpid/broker/MessageStoreModule.h
@@ -58,7 +58,8 @@ public:
void stage(PersistableMessage& msg);
void destroy(PersistableMessage& msg);
void appendContent(const PersistableMessage& msg, const std::string& data);
- void loadContent(const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+ void loadContent(const qpid::broker::PersistableQueue& queue,
+ const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 2b7e8ff32d..eb20ab6936 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -93,7 +93,7 @@ void NullMessageStore::appendContent(const PersistableMessage&, const string&)
QPID_LOG(info, "Can't append content. Persistence not enabled.");
}
-void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, uint32_t)
+void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const PersistableMessage&, string&, uint64_t, uint32_t)
{
QPID_LOG(info, "Can't load content. Persistence not enabled.");
}
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index b58bd20ab3..caf018655c 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -59,7 +59,8 @@ public:
virtual void stage(PersistableMessage& msg);
virtual void destroy(PersistableMessage& msg);
virtual void appendContent(const PersistableMessage& msg, const std::string& data);
- virtual void loadContent(const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length);
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 1484fe464e..5745c85331 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -212,7 +212,7 @@ bool Queue::getNextMessage(QueuedMessage& msg)
void Queue::dispatch()
{
- QueuedMessage msg;
+ QueuedMessage msg(this);
while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
if (dispatch(msg)) {
pop();
@@ -242,7 +242,7 @@ void Queue::serviceAllBrowsers()
void Queue::serviceBrowser(Consumer::ptr browser)
{
- QueuedMessage msg;
+ QueuedMessage msg(this);
while (seek(msg, browser->position) && browser->deliver(msg)) {
browser->position = msg.position;
}
@@ -318,7 +318,7 @@ void Queue::cancel(Consumer::ptr c, Consumers& consumers)
QueuedMessage Queue::dequeue(){
Mutex::ScopedLock locker(messageLock);
- QueuedMessage msg;
+ QueuedMessage msg(this);
if(!messages.empty()){
msg = messages.front();
@@ -350,7 +350,7 @@ void Queue::pop(){
void Queue::push(Message::shared_ptr& msg){
Mutex::ScopedLock locker(messageLock);
- messages.push_back(QueuedMessage(msg, ++sequence));
+ messages.push_back(QueuedMessage(this, msg, ++sequence));
if (policy.get()) {
policy->enqueued(msg->contentSize());
if (policy->limitExceeded()) {
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 3a20a2d41c..69dd1fd67e 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -163,7 +163,7 @@ void SemanticHandler::handleContent(AMQFrame& frame)
}
}
-DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
+DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
MessageDelivery::deliver(
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 9380708ec5..8b27bc53c3 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -75,7 +75,7 @@ class SemanticHandler : public DeliveryAdapter,
void sendCompletion();
//delivery adapter methods:
- DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
+ DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
Connection& getConnection() { return session.getConnection(); }
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d844cc5086..fa2ea38333 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -277,7 +277,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
Mutex::ScopedLock locker(parent->deliveryLock);
DeliveryId deliveryTag =
- parent->deliveryAdapter.deliver(msg.payload, token);
+ parent->deliveryAdapter.deliver(msg, token);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
} else if (acquire && !ackExpected) {
@@ -471,7 +471,7 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
QueuedMessage msg = queue->dequeue();
if(msg.payload){
Mutex::ScopedLock locker(deliveryLock);
- DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token);
+ DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -481,7 +481,7 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue
}
}
-DeliveryId SemanticState::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
+DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock locker(deliveryLock);
return deliveryAdapter.deliver(msg, token);
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index bb126287a3..d0e3eed8e1 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -183,7 +183,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
- DeliveryId redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
+ DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index 612a9fc8bc..f0ccf73189 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -251,7 +251,7 @@ class BrokerChannelTest : public CppUnit::TestCase
policy.update(settings);
store.expect();
- store.stage(*msg3);
+ store.stage(0, *msg3);
store.test();
Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0));
diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp
index 011d8bf694..9487f743d6 100644
--- a/cpp/src/tests/DeliveryRecordTest.cpp
+++ b/cpp/src/tests/DeliveryRecordTest.cpp
@@ -51,7 +51,7 @@ public:
list<DeliveryRecord> records;
for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
- records.push_back(DeliveryRecord(QueuedMessage(), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false));
+ records.push_back(DeliveryRecord(QueuedMessage(0), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false));
}
records.sort();
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index 73628f25b5..dd57736a0c 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -76,7 +76,7 @@ public:
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
messages.push_back(msg);
- QueuedMessage qm;
+ QueuedMessage qm(queue.get());
qm.payload = msg;
deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true));
}