diff options
author | Alan Conway <aconway@apache.org> | 2008-10-23 16:21:56 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-23 16:21:56 +0000 |
commit | 9d6e5ee84b3c3a53dfe78d3b5b74986495e7abee (patch) | |
tree | 257bed543782bbb7ca80a411a6d26b04a9b0784d /cpp/src | |
parent | 1b127dfaac12835181f61637fb751380aff78e7e (diff) | |
download | qpid-python-9d6e5ee84b3c3a53dfe78d3b5b74986495e7abee.tar.gz |
Minor changes to provide access for cluster to replicate delivery records.
- broker::Queue: find message by position, set position.
- broker::SemanticState: make record() public, add eachUnacked(), fix typo "NotifyEnabld"
- broker::DeliveryRecord: added more public accessors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707406 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/FlowControl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/log/Selector.h | 2 |
9 files changed, 77 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 1d6c60b569..65c1f0a1fa 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -31,17 +31,16 @@ using namespace qpid::framing; using std::string; DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, - Queue::shared_ptr _queue, - const std::string _tag, - bool _acquired, bool accepted, + const Queue::shared_ptr& _queue, + const std::string& _tag, + bool _acquired, + bool accepted, bool _windowing) : msg(_msg), queue(_queue), tag(_tag), acquired(_acquired), acceptExpected(!accepted), cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), completed(false), ended(accepted), windowing(_windowing) @@ -154,7 +153,7 @@ void DeliveryRecord::reject() uint32_t DeliveryRecord::getCredit() const { - return credit; + return msg.payload ? msg.payload->getRequiredCredit() : 0; } void DeliveryRecord::acquire(DeliveryIds& results) { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index d631fe124c..6be6a9249a 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -46,17 +46,21 @@ class DeliveryRecord{ bool acquired; bool acceptExpected; bool cancelled; - const uint32_t credit; - const uint64_t size; bool completed; bool ended; const bool windowing; public: - DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, - const std::string tag, - bool acquired, bool confirmed, bool windowing); + DeliveryRecord( + const QueuedMessage& msg, + const Queue::shared_ptr& queue, + const std::string& tag, + bool acquired, + bool accepted, + bool windowing + ); + bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; @@ -76,13 +80,21 @@ class DeliveryRecord{ bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } bool isRedundant() const { return ended && (!windowing || completed); } - + bool isCancelled() const { return cancelled; } + bool isAccepted() const { return !acceptExpected; } + bool isEnded() const { return ended; } + bool isWindowing() const { return windowing; } + uint32_t getCredit() const; - const std::string& getTag() const { return tag; } + const std::string& getTag() const { return tag; } void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize); void setId(DeliveryId _id) { id = _id; } + const QueuedMessage& getMessage() const { return msg; } + framing::SequenceNumber getId() const { return id; } + Queue::shared_ptr getQueue() const { return queue; } + friend bool operator<(const DeliveryRecord&, const DeliveryRecord&); friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 1f508a1cc7..52404c826c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -365,6 +365,25 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { return false; } +namespace { +struct PositionEquals { + SequenceNumber pos; + PositionEquals(SequenceNumber p) : pos(p) {} + bool operator()(const QueuedMessage& msg) const { return msg.position == pos; } +}; +}// namespace + +bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const { + Mutex::ScopedLock locker(messageLock); + Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos)); + if (i == messages.end()) + return false; + else { + msg = *i; + return true; + } +} + void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { @@ -827,3 +846,11 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::setPosition(SequenceNumber n) { + if (n <= sequence) + throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence + << " for queue " << name)); + sequence = n; + --sequence; // Decrement so ++sequence will return n. +} diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index d9af63d3d9..6becb77ff5 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -66,7 +66,7 @@ namespace qpid { typedef std::list<Consumer::shared_ptr> Listeners; typedef std::deque<QueuedMessage> Messages; - typedef std::map<string,QueuedMessage*> LVQ; + typedef std::map<string,QueuedMessage*> LVQ; const string name; const bool autodelete; @@ -95,7 +95,7 @@ namespace qpid { boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; - RateTracker dequeueTracker; +RateTracker dequeueTracker; void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -227,6 +227,13 @@ namespace qpid { */ QueuedMessage get(); + /** Get the message at position pos + *@param msg out parameter, assigned to the message found. + *@param pos position to search for. + *@return True if there is a message at pos, false otherwise. + */ + bool find(QueuedMessage& msg, framing::SequenceNumber pos) const; + const QueuePolicy* getPolicy(); void setAlternateExchange(boost::shared_ptr<Exchange> exchange); @@ -264,6 +271,11 @@ namespace qpid { void popMsg(QueuedMessage& qmsg); + /** Set the position sequence number for the next message on the queue. + * Must be >= the current sequence number. + * Used by cluster to replicate queues. + */ + void setPosition(framing::SequenceNumber pos); }; } } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 915b7e147c..42ef8030a6 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -610,7 +610,7 @@ void SemanticState::ConsumerImpl::disableNotify() notifyEnabled = false; } -bool SemanticState::ConsumerImpl::isNotifyEnabld() { +bool SemanticState::ConsumerImpl::isNotifyEnabled() const { Mutex::ScopedLock l(lock); return notifyEnabled; } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 866ef4c209..dbb3e1d3b6 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -62,7 +62,7 @@ class SemanticState : public sys::OutputTask, class ConsumerImpl : public Consumer, public sys::OutputTask, public boost::enable_shared_from_this<ConsumerImpl> { - qpid::sys::Mutex lock; + mutable qpid::sys::Mutex lock; SemanticState* const parent; const string name; const Queue::shared_ptr queue; @@ -97,7 +97,7 @@ class SemanticState : public sys::OutputTask, void disableNotify(); void enableNotify(); void notify(); - bool isNotifyEnabld(); + bool isNotifyEnabled() const; void setWindowMode(); void setCreditMode(); @@ -106,7 +106,7 @@ class SemanticState : public sys::OutputTask, void flush(); void stop(); void complete(DeliveryRecord&); - Queue::shared_ptr getQueue() { return queue; } + Queue::shared_ptr getQueue() const { return queue; } bool isBlocked() const { return blocked; } bool setBlocked(bool set) { std::swap(set, blocked); return set; } @@ -147,7 +147,6 @@ class SemanticState : public sys::OutputTask, const string userID; void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy); - void record(const DeliveryRecord& delivery); void checkDtxTimeout(); void complete(DeliveryRecord&); @@ -213,8 +212,13 @@ class SemanticState : public sys::OutputTask, void attached(); void detached(); + // Used by cluster to re-create replica sessions static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); } + template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); } + template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); } + + void record(const DeliveryRecord& delivery); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h index a4ed9879f4..081061ac02 100644 --- a/cpp/src/qpid/client/FlowControl.h +++ b/cpp/src/qpid/client/FlowControl.h @@ -27,7 +27,7 @@ namespace client { /** * Flow control works by associating a finite amount of "credit" - * associated with a subscription. + * with a subscription. * * Credit includes a message count and a byte count. Each message * received decreases the message count by one, and the byte count by diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 4bc001b4c6..802019feb1 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -241,7 +241,7 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { ProtocolVersion(), ci->getName(), ci->isBlocked(), - ci->isNotifyEnabld() + ci->isNotifyEnabled() ); client::SessionBase_0_10Access(shadowSession).get()->send(state); QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); diff --git a/cpp/src/qpid/log/Selector.h b/cpp/src/qpid/log/Selector.h index 2acef4687a..89989ebf92 100644 --- a/cpp/src/qpid/log/Selector.h +++ b/cpp/src/qpid/log/Selector.h @@ -56,7 +56,7 @@ class Selector { /** Enable based on a 'level[+]:file' string */ void enable(const std::string& enableStr); - /** True if level is enabld for file. */ + /** True if level is enabled for file. */ bool isEnabled(Level level, const std::string& function); private: |