summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-23 16:21:56 +0000
committerAlan Conway <aconway@apache.org>2008-10-23 16:21:56 +0000
commit9d6e5ee84b3c3a53dfe78d3b5b74986495e7abee (patch)
tree257bed543782bbb7ca80a411a6d26b04a9b0784d /cpp/src
parent1b127dfaac12835181f61637fb751380aff78e7e (diff)
downloadqpid-python-9d6e5ee84b3c3a53dfe78d3b5b74986495e7abee.tar.gz
Minor changes to provide access for cluster to replicate delivery records.
- broker::Queue: find message by position, set position. - broker::SemanticState: make record() public, add eachUnacked(), fix typo "NotifyEnabld" - broker::DeliveryRecord: added more public accessors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707406 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp11
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h26
-rw-r--r--cpp/src/qpid/broker/Queue.cpp27
-rw-r--r--cpp/src/qpid/broker/Queue.h16
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.h12
-rw-r--r--cpp/src/qpid/client/FlowControl.h2
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp2
-rw-r--r--cpp/src/qpid/log/Selector.h2
9 files changed, 77 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 1d6c60b569..65c1f0a1fa 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -31,17 +31,16 @@ using namespace qpid::framing;
using std::string;
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
- Queue::shared_ptr _queue,
- const std::string _tag,
- bool _acquired, bool accepted,
+ const Queue::shared_ptr& _queue,
+ const std::string& _tag,
+ bool _acquired,
+ bool accepted,
bool _windowing) : msg(_msg),
queue(_queue),
tag(_tag),
acquired(_acquired),
acceptExpected(!accepted),
cancelled(false),
- credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
- size(msg.payload ? msg.payload->contentSize() : 0),
completed(false),
ended(accepted),
windowing(_windowing)
@@ -154,7 +153,7 @@ void DeliveryRecord::reject()
uint32_t DeliveryRecord::getCredit() const
{
- return credit;
+ return msg.payload ? msg.payload->getRequiredCredit() : 0;
}
void DeliveryRecord::acquire(DeliveryIds& results) {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index d631fe124c..6be6a9249a 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -46,17 +46,21 @@ class DeliveryRecord{
bool acquired;
bool acceptExpected;
bool cancelled;
- const uint32_t credit;
- const uint64_t size;
bool completed;
bool ended;
const bool windowing;
public:
- DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue,
- const std::string tag,
- bool acquired, bool confirmed, bool windowing);
+ DeliveryRecord(
+ const QueuedMessage& msg,
+ const Queue::shared_ptr& queue,
+ const std::string& tag,
+ bool acquired,
+ bool accepted,
+ bool windowing
+ );
+
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
@@ -76,13 +80,21 @@ class DeliveryRecord{
bool isAcquired() const { return acquired; }
bool isComplete() const { return completed; }
bool isRedundant() const { return ended && (!windowing || completed); }
-
+ bool isCancelled() const { return cancelled; }
+ bool isAccepted() const { return !acceptExpected; }
+ bool isEnded() const { return ended; }
+ bool isWindowing() const { return windowing; }
+
uint32_t getCredit() const;
- const std::string& getTag() const { return tag; }
+ const std::string& getTag() const { return tag; }
void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
+ const QueuedMessage& getMessage() const { return msg; }
+ framing::SequenceNumber getId() const { return id; }
+ Queue::shared_ptr getQueue() const { return queue; }
+
friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 1f508a1cc7..52404c826c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -365,6 +365,25 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
return false;
}
+namespace {
+struct PositionEquals {
+ SequenceNumber pos;
+ PositionEquals(SequenceNumber p) : pos(p) {}
+ bool operator()(const QueuedMessage& msg) const { return msg.position == pos; }
+};
+}// namespace
+
+bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+ Mutex::ScopedLock locker(messageLock);
+ Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
+ if (i == messages.end())
+ return false;
+ else {
+ msg = *i;
+ return true;
+ }
+}
+
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
@@ -827,3 +846,11 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
return status;
}
+
+void Queue::setPosition(SequenceNumber n) {
+ if (n <= sequence)
+ throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
+ << " for queue " << name));
+ sequence = n;
+ --sequence; // Decrement so ++sequence will return n.
+}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index d9af63d3d9..6becb77ff5 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -66,7 +66,7 @@ namespace qpid {
typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
- typedef std::map<string,QueuedMessage*> LVQ;
+ typedef std::map<string,QueuedMessage*> LVQ;
const string name;
const bool autodelete;
@@ -95,7 +95,7 @@ namespace qpid {
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- RateTracker dequeueTracker;
+RateTracker dequeueTracker;
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -227,6 +227,13 @@ namespace qpid {
*/
QueuedMessage get();
+ /** Get the message at position pos
+ *@param msg out parameter, assigned to the message found.
+ *@param pos position to search for.
+ *@return True if there is a message at pos, false otherwise.
+ */
+ bool find(QueuedMessage& msg, framing::SequenceNumber pos) const;
+
const QueuePolicy* getPolicy();
void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
@@ -264,6 +271,11 @@ namespace qpid {
void popMsg(QueuedMessage& qmsg);
+ /** Set the position sequence number for the next message on the queue.
+ * Must be >= the current sequence number.
+ * Used by cluster to replicate queues.
+ */
+ void setPosition(framing::SequenceNumber pos);
};
}
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 915b7e147c..42ef8030a6 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -610,7 +610,7 @@ void SemanticState::ConsumerImpl::disableNotify()
notifyEnabled = false;
}
-bool SemanticState::ConsumerImpl::isNotifyEnabld() {
+bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
Mutex::ScopedLock l(lock);
return notifyEnabled;
}
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 866ef4c209..dbb3e1d3b6 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -62,7 +62,7 @@ class SemanticState : public sys::OutputTask,
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
{
- qpid::sys::Mutex lock;
+ mutable qpid::sys::Mutex lock;
SemanticState* const parent;
const string name;
const Queue::shared_ptr queue;
@@ -97,7 +97,7 @@ class SemanticState : public sys::OutputTask,
void disableNotify();
void enableNotify();
void notify();
- bool isNotifyEnabld();
+ bool isNotifyEnabled() const;
void setWindowMode();
void setCreditMode();
@@ -106,7 +106,7 @@ class SemanticState : public sys::OutputTask,
void flush();
void stop();
void complete(DeliveryRecord&);
- Queue::shared_ptr getQueue() { return queue; }
+ Queue::shared_ptr getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
@@ -147,7 +147,6 @@ class SemanticState : public sys::OutputTask,
const string userID;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
- void record(const DeliveryRecord& delivery);
void checkDtxTimeout();
void complete(DeliveryRecord&);
@@ -213,8 +212,13 @@ class SemanticState : public sys::OutputTask,
void attached();
void detached();
+ // Used by cluster to re-create replica sessions
static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
+
template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+ template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); }
+
+ void record(const DeliveryRecord& delivery);
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h
index a4ed9879f4..081061ac02 100644
--- a/cpp/src/qpid/client/FlowControl.h
+++ b/cpp/src/qpid/client/FlowControl.h
@@ -27,7 +27,7 @@ namespace client {
/**
* Flow control works by associating a finite amount of "credit"
- * associated with a subscription.
+ * with a subscription.
*
* Credit includes a message count and a byte count. Each message
* received decreases the message count by one, and the byte count by
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 4bc001b4c6..802019feb1 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -241,7 +241,7 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
ProtocolVersion(),
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabld()
+ ci->isNotifyEnabled()
);
client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
diff --git a/cpp/src/qpid/log/Selector.h b/cpp/src/qpid/log/Selector.h
index 2acef4687a..89989ebf92 100644
--- a/cpp/src/qpid/log/Selector.h
+++ b/cpp/src/qpid/log/Selector.h
@@ -56,7 +56,7 @@ class Selector {
/** Enable based on a 'level[+]:file' string */
void enable(const std::string& enableStr);
- /** True if level is enabld for file. */
+ /** True if level is enabled for file. */
bool isEnabled(Level level, const std::string& function);
private: