diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-08 18:46:53 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-08 18:46:53 +0000 |
commit | d9ba9eae91290581b62f67239300897a96827e83 (patch) | |
tree | 67ea6a18828a42cafea5fce43b36b8b5f7c5becf /cpp | |
parent | 10a794c9d2fede2a0db9cf80e95f19d56e931196 (diff) | |
download | qpid-python-d9ba9eae91290581b62f67239300897a96827e83.tar.gz |
- enable the ability to lazy load from async store
- the ci has a raw ptr for Queue in QueuedMessage, if any has
any concerns, ping me and I will convert it to an auto_ptr
Carl.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593251 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
21 files changed, 56 insertions, 36 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 8c57d7d2b8..fb60fc88a8 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -21,6 +21,11 @@ #ifndef _Consumer_ #define _Consumer_ +namespace qpid { + namespace broker { + class Queue; +}} + #include "Message.h" namespace qpid { @@ -30,9 +35,11 @@ namespace qpid { { Message::shared_ptr payload; framing::SequenceNumber position; - - QueuedMessage(Message::shared_ptr msg, framing::SequenceNumber sn) : payload(msg), position(sn) {} - QueuedMessage() {} + Queue* queue; + + QueuedMessage(Queue* q, Message::shared_ptr msg, framing::SequenceNumber sn) : + payload(msg), position(sn), queue(q) {} + QueuedMessage(Queue* q) : queue(q) {} }; diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index 28e2e3eb93..4c2b2f615f 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -42,7 +42,7 @@ namespace broker { class DeliveryAdapter { public: - virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0; + virtual DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) = 0; virtual ~DeliveryAdapter(){} }; diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 176209cd4d..f20aff1f23 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -89,7 +89,7 @@ void DeliveryRecord::redeliver(SemanticState* const session) { requeue(); }else{ msg.payload->redeliver();//mark as redelivered - id = session->redeliver(msg.payload, token); + id = session->redeliver(msg, token); } } } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 6e3e6a55f7..4e075e73a3 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -150,7 +150,7 @@ void Message::releaseContent(MessageStore* _store) setContentReleased(); } -void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const +void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const { if (isContentReleased()) { //load content from store in chunks of maxContentSize @@ -162,7 +162,7 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) con AMQFrame frame(0, AMQContentBody()); string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(*this, data, offset, + store->loadContent(queue, *this, data, offset, remaining > maxContentSize ? maxContentSize : remaining); frame.setBof(false); frame.setEof(true); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index d043d50ad0..f706a65e52 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -41,6 +41,7 @@ class ConnectionToken; class Exchange; class ExchangeRegistry; class MessageStore; +class Queue; class Message : public PersistableMessage { public: @@ -114,7 +115,7 @@ public: */ void releaseContent(MessageStore* store); - void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const; + void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const; void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const; bool isContentLoaded() const; diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index e65db391b5..42448babb5 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -54,7 +54,7 @@ void MessageBuilder::handle(AMQFrame& frame) message->getFrames().append(frame); //have we reached the staging limit? if so stage message and release content if (state == CONTENT && stagingThreshold && message->getFrames().getContentSize() >= stagingThreshold) { - message->releaseContent(store); + message->releaseContent(store); staging = true; } } diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 2eea97ced0..b29850f9e1 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -113,7 +113,7 @@ DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::st return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode)); } -void MessageDelivery::deliver(Message::shared_ptr msg, +void MessageDelivery::deliver(QueuedMessage& msg, framing::FrameHandler& handler, DeliveryId id, DeliveryToken::shared_ptr token, @@ -124,9 +124,9 @@ void MessageDelivery::deliver(Message::shared_ptr msg, //have one content class for 0-10 proper boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); - AMQFrame method = t->sendMethod(msg, id); + AMQFrame method = t->sendMethod(msg.payload, id); method.setEof(false); handler.handle(method); - msg->sendHeader(handler, framesize); - msg->sendContent(handler, framesize); + msg.payload->sendHeader(handler, framesize); + msg.payload->sendContent(*(msg.queue), handler, framesize); } diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h index 3beb268ca7..ac7818feed 100644 --- a/cpp/src/qpid/broker/MessageDelivery.h +++ b/cpp/src/qpid/broker/MessageDelivery.h @@ -23,6 +23,7 @@ */ #include <boost/shared_ptr.hpp> #include "DeliveryId.h" +#include "Consumer.h" #include "qpid/framing/FrameHandler.h" namespace qpid { @@ -43,7 +44,7 @@ public: u_int8_t confirmMode, u_int8_t acquireMode); - static void deliver(boost::shared_ptr<Message> msg, framing::FrameHandler& out, + static void deliver(QueuedMessage& msg, framing::FrameHandler& out, DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize); }; diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index b88abc277a..04dbb22376 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -85,9 +85,16 @@ public: * point). If the message has not yet been stored it will * store the headers as well as any content passed in. A * persistence id will be set on the message which can be - * used to load the content or to append to it. + * used to load the content or to append to it. + * + * TODO ::If it is know + * which queue the message is to be staged/ release to in cases + * of flowing tmp messages to disk for memory conservation set + * the queue ptr. This allows the store to optimize the read/writes + * for that queue and avoid searching based on id. Set queue = 0 for + * large message staging when the queue is not known. */ - virtual void stage(PersistableMessage& msg) = 0; + virtual void stage( PersistableMessage& msg) = 0; /** * Destroys a previously staged message. This only needs @@ -110,7 +117,8 @@ public: * content should be loaded, not the headers or related * meta-data). */ - virtual void loadContent(const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0; + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0; /** * Enqueues a message, storing the message if it has not diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 1ddad65c56..adb41f6094 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -71,7 +71,7 @@ void MessageStoreModule::recover(RecoveryManager& registry) store->recover(registry); } -void MessageStoreModule::stage(PersistableMessage& msg) +void MessageStoreModule::stage( PersistableMessage& msg) { store->stage(msg); } @@ -86,9 +86,10 @@ void MessageStoreModule::appendContent(const PersistableMessage& msg, const std: store->appendContent(msg, data); } -void MessageStoreModule::loadContent(const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length) +void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue, + const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length) { - store->loadContent(msg, data, offset, length); + store->loadContent(queue, msg, data, offset, length); } void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 1d256b972b..6738f0e539 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -58,7 +58,8 @@ public: void stage(PersistableMessage& msg); void destroy(PersistableMessage& msg); void appendContent(const PersistableMessage& msg, const std::string& data); - void loadContent(const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + void loadContent(const qpid::broker::PersistableQueue& queue, + const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 2b7e8ff32d..eb20ab6936 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -93,7 +93,7 @@ void NullMessageStore::appendContent(const PersistableMessage&, const string&) QPID_LOG(info, "Can't append content. Persistence not enabled."); } -void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, uint32_t) +void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const PersistableMessage&, string&, uint64_t, uint32_t) { QPID_LOG(info, "Can't load content. Persistence not enabled."); } diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index b58bd20ab3..caf018655c 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -59,7 +59,8 @@ public: virtual void stage(PersistableMessage& msg); virtual void destroy(PersistableMessage& msg); virtual void appendContent(const PersistableMessage& msg, const std::string& data); - virtual void loadContent(const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 1484fe464e..5745c85331 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -212,7 +212,7 @@ bool Queue::getNextMessage(QueuedMessage& msg) void Queue::dispatch() { - QueuedMessage msg; + QueuedMessage msg(this); while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){ if (dispatch(msg)) { pop(); @@ -242,7 +242,7 @@ void Queue::serviceAllBrowsers() void Queue::serviceBrowser(Consumer::ptr browser) { - QueuedMessage msg; + QueuedMessage msg(this); while (seek(msg, browser->position) && browser->deliver(msg)) { browser->position = msg.position; } @@ -318,7 +318,7 @@ void Queue::cancel(Consumer::ptr c, Consumers& consumers) QueuedMessage Queue::dequeue(){ Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; + QueuedMessage msg(this); if(!messages.empty()){ msg = messages.front(); @@ -350,7 +350,7 @@ void Queue::pop(){ void Queue::push(Message::shared_ptr& msg){ Mutex::ScopedLock locker(messageLock); - messages.push_back(QueuedMessage(msg, ++sequence)); + messages.push_back(QueuedMessage(this, msg, ++sequence)); if (policy.get()) { policy->enqueued(msg->contentSize()); if (policy->limitExceeded()) { diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 3a20a2d41c..69dd1fd67e 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -163,7 +163,7 @@ void SemanticHandler::handleContent(AMQFrame& frame) } } -DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) +DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); MessageDelivery::deliver( diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 9380708ec5..8b27bc53c3 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -75,7 +75,7 @@ class SemanticHandler : public DeliveryAdapter, void sendCompletion(); //delivery adapter methods: - DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); + DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } Connection& getConnection() { return session.getConnection(); } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d844cc5086..fa2ea38333 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -277,7 +277,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) Mutex::ScopedLock locker(parent->deliveryLock); DeliveryId deliveryTag = - parent->deliveryAdapter.deliver(msg.payload, token); + parent->deliveryAdapter.deliver(msg, token); if (windowing || ackExpected) { parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); } else if (acquire && !ackExpected) { @@ -471,7 +471,7 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue QueuedMessage msg = queue->dequeue(); if(msg.payload){ Mutex::ScopedLock locker(deliveryLock); - DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token); + DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -481,7 +481,7 @@ bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue } } -DeliveryId SemanticState::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) +DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock locker(deliveryLock); return deliveryAdapter.deliver(msg, token); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index bb126287a3..d0e3eed8e1 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -183,7 +183,7 @@ class SemanticState : public framing::FrameHandler::Chains, void ackRange(DeliveryId deliveryTag, DeliveryId endTag); void recover(bool requeue); void flow(bool active); - DeliveryId redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); + DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired); void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp index 612a9fc8bc..f0ccf73189 100644 --- a/cpp/src/tests/BrokerChannelTest.cpp +++ b/cpp/src/tests/BrokerChannelTest.cpp @@ -251,7 +251,7 @@ class BrokerChannelTest : public CppUnit::TestCase policy.update(settings); store.expect(); - store.stage(*msg3); + store.stage(0, *msg3); store.test(); Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp index 011d8bf694..9487f743d6 100644 --- a/cpp/src/tests/DeliveryRecordTest.cpp +++ b/cpp/src/tests/DeliveryRecordTest.cpp @@ -51,7 +51,7 @@ public: list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - records.push_back(DeliveryRecord(QueuedMessage(), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false)); + records.push_back(DeliveryRecord(QueuedMessage(0), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false)); } records.sort(); diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index 73628f25b5..dd57736a0c 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -76,7 +76,7 @@ public: msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); - QueuedMessage qm; + QueuedMessage qm(queue.get()); qm.payload = msg; deliveries.push_back(DeliveryRecord(qm, queue, "xyz", DeliveryToken::shared_ptr(), (i+1), true)); } |