diff options
author | Alan Conway <aconway@apache.org> | 2008-10-16 21:05:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-10-16 21:05:45 +0000 |
commit | 8245f06869fce0b75e04aba5a1827d0a165fa19d (patch) | |
tree | 6dc9eba4bf558900105489edd6e2129c87c88501 | |
parent | de9dd5021a7837355798d1a7d5e8e01a878dc41a (diff) | |
download | qpid-python-8245f06869fce0b75e04aba5a1827d0a165fa19d.tar.gz |
Added missing message.subscribe arguments to SemanticState::ConsumerImpl for replication (and future use.)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@705359 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 30 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/DumpClient.cpp | 11 |
4 files changed, 45 insertions, 27 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 26aea36b8a..177157bbb6 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -86,11 +86,11 @@ bool SemanticState::exists(const string& consumerTag){ void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire, - bool exclusive, const FieldTable*) + bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire)); + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception outputTasks.addOutputTask(c.get()); consumers[tagInOut] = c; @@ -233,13 +233,19 @@ void SemanticState::record(const DeliveryRecord& delivery) } SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - DeliveryToken::shared_ptr _token, - const string& _name, - Queue::shared_ptr _queue, - bool ack, - bool _nolocal, - bool _acquire - ) : + DeliveryToken::shared_ptr _token, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _nolocal, + bool _acquire, + bool _exclusive, + const string& _resumeId, + uint64_t _resumeTtl, + const framing::FieldTable& _arguments + + +) : Consumer(_acquire), parent(_parent), token(_token), @@ -249,7 +255,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, nolocal(_nolocal), acquire(_acquire), blocked(true), - windowing(true), + windowing(true), + exclusive(_exclusive), + resumeId(_resumeId), + resumeTtl(_resumeTtl), + arguments(_arguments), msgCredit(0), byteCredit(0), notifyEnabled(true) {} diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 4d23220692..7b72921210 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -73,6 +73,10 @@ class SemanticState : public sys::OutputTask, const bool acquire; bool blocked; bool windowing; + bool exclusive; + string resumeId; + uint64_t resumeTtl; + framing::FieldTable arguments; uint32_t msgCredit; uint32_t byteCredit; bool notifyEnabled; @@ -85,7 +89,8 @@ class SemanticState : public sys::OutputTask, ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, - bool ack, bool nolocal, bool acquire); + bool ack, bool nolocal, bool acquire, bool exclusive, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); bool deliver(QueuedMessage& msg); @@ -114,8 +119,12 @@ class SemanticState : public sys::OutputTask, bool isAckExpected() const { return ackExpected; } bool isAcquire() const { return acquire; } bool isWindowing() const { return windowing; } + bool isExclusive() const { return exclusive; } uint32_t getMsgCredit() const { return msgCredit; } uint32_t getByteCredit() const { return byteCredit; } + std::string getResumeId() const { return resumeId; }; + uint64_t getResumeTtl() const { return resumeTtl; } + const framing::FieldTable& getArguments() const { return arguments; } }; private: @@ -168,7 +177,9 @@ class SemanticState : public sys::OutputTask, *@param tagInOut - if empty it is updated with the generated token. */ void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, - bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0); + bool nolocal, bool ackRequired, bool acquire, bool exclusive, + const string& resumeId=string(), uint64_t resumeTtl=0, + const framing::FieldTable& = framing::FieldTable()); void cancel(const string& tag); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 9c23abeba9..e09c94398b 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -457,13 +457,13 @@ void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, b void SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, - const string& destination, - uint8_t acceptMode, - uint8_t acquireMode, - bool exclusive, - const string& /*resumeId*/,//TODO implement resume behaviour. Need to update cluster. - uint64_t /*resumeTtl*/, - const FieldTable& arguments) + const string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + bool exclusive, + const string& resumeId, + uint64_t resumeTtl, + const FieldTable& arguments) { AclModule* acl = getBroker().getAcl(); @@ -481,7 +481,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, string tag = destination; state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode), tag, queue, false, //TODO get rid of no-local - acceptMode == 0, acquireMode == 0, exclusive, &arguments); + acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments); ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); if (agent) diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp index 853eb689c6..58aa14655c 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp +++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp @@ -219,13 +219,10 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) { arg::destination = ci->getName(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, - arg::exclusive = false , // FIXME aconway 2008-09-23: duplicate from consumer - - // TODO aconway 2008-09-23: remaining args not used by current broker. - // Update this code when they are. - arg::resumeId=std::string(), - arg::resumeTtl=0, - arg::arguments=FieldTable() + arg::exclusive = ci->isExclusive(), + arg::resumeId = ci->getResumeId(), + arg::resumeTtl = ci->getResumeTtl(), + arg::arguments = ci->getArguments() ); shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); |