diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client/amqp0_10')
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp | 111 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h | 85 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h | 50 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 53 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 55 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h | 17 |
12 files changed, 415 insertions, 120 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp new file mode 100644 index 0000000000..80be5c56f3 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AcceptTracker.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +void AcceptTracker::State::accept() +{ + unconfirmed.add(unaccepted); + unaccepted.clear(); +} + +void AcceptTracker::State::release() +{ + unaccepted.clear(); +} + +uint32_t AcceptTracker::State::acceptsPending() +{ + return unconfirmed.size(); +} + +void AcceptTracker::State::completed(qpid::framing::SequenceSet& set) +{ + unconfirmed.remove(set); +} + +void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id) +{ + aggregateState.unaccepted.add(id); + destinationState[destination].unaccepted.add(id); +} + +void AcceptTracker::accept(qpid::client::AsyncSession& session) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.accept(); + } + Record record; + record.status = session.messageAccept(aggregateState.unaccepted); + record.accepted = aggregateState.unaccepted; + pending.push_back(record); + aggregateState.accept(); +} + +void AcceptTracker::release(qpid::client::AsyncSession& session) +{ + session.messageRelease(aggregateState.unaccepted); + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.release(); + } + aggregateState.release(); +} + +uint32_t AcceptTracker::acceptsPending() +{ + checkPending(); + return aggregateState.acceptsPending(); +} + +uint32_t AcceptTracker::acceptsPending(const std::string& destination) +{ + checkPending(); + return destinationState[destination].acceptsPending(); +} + +void AcceptTracker::reset() +{ + destinationState.clear(); + aggregateState.unaccepted.clear(); + aggregateState.unconfirmed.clear(); + pending.clear(); +} + +void AcceptTracker::checkPending() +{ + while (!pending.empty() && pending.front().status.isComplete()) { + completed(pending.front().accepted); + pending.pop_front(); + } +} + +void AcceptTracker::completed(qpid::framing::SequenceSet& set) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.completed(set); + } + aggregateState.completed(set); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h new file mode 100644 index 0000000000..fb58a3a8c8 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -0,0 +1,85 @@ +#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H +#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Completion.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" +#include <deque> +#include <map> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Tracks the set of messages requiring acceptance, and those for + * which an accept has been issued but is yet to be confirmed + * complete. + */ +class AcceptTracker +{ + public: + void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); + void accept(qpid::client::AsyncSession&); + void release(qpid::client::AsyncSession&); + uint32_t acceptsPending(); + uint32_t acceptsPending(const std::string& destination); + void reset(); + private: + struct State + { + /** + * ids of messages that have been delivered but not yet + * accepted + */ + qpid::framing::SequenceSet unaccepted; + /** + * ids of messages for which an accpet has been issued but not + * yet confirmed as completed + */ + qpid::framing::SequenceSet unconfirmed; + + void accept(); + void release(); + uint32_t acceptsPending(); + void completed(qpid::framing::SequenceSet&); + }; + typedef std::map<std::string, State> StateMap; + struct Record + { + qpid::client::Completion status; + qpid::framing::SequenceSet accepted; + }; + typedef std::deque<Record> Records; + + State aggregateState; + StateMap destinationState; + Records pending; + + void checkPending(); + void completed(qpid::framing::SequenceSet&); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp deleted file mode 100644 index 52b623b65c..0000000000 --- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "CompletionTracker.h" - -namespace qpid { -namespace client { -namespace amqp0_10 { - -using qpid::framing::SequenceNumber; - -void CompletionTracker::track(SequenceNumber command, void* token) -{ - tokens[command] = token; -} - -void CompletionTracker::completedTo(SequenceNumber command) -{ - Tokens::iterator i = tokens.lower_bound(command); - if (i != tokens.end()) { - lastCompleted = i->second; - tokens.erase(tokens.begin(), ++i); - } -} - -void* CompletionTracker::getLastCompletedToken() -{ - return lastCompleted; -} - -}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h deleted file mode 100644 index 6147c5682e..0000000000 --- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H -#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/SequenceNumber.h" -#include <map> - -namespace qpid { -namespace client { -namespace amqp0_10 { - -/** - * Provides a mapping from command ids to application supplied - * 'tokens', and is used to determine when the sending or - * acknowledging of a specific message is complete. - */ -class CompletionTracker -{ - public: - void track(qpid::framing::SequenceNumber command, void* token); - void completedTo(qpid::framing::SequenceNumber command); - void* getLastCompletedToken(); - private: - typedef std::map<qpid::framing::SequenceNumber, void*> Tokens; - Tokens tokens; - void* lastCompleted; -}; -}}} // namespace qpid::client::amqp0_10 - -#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index b0a16674e1..d22208368b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -81,12 +81,31 @@ struct MatchAndTrack } } }; + +struct Match +{ + const std::string destination; + uint32_t matched; + + Match(const std::string& d) : destination(d), matched(0) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ++matched; + return true; + } else { + return false; + } + } +}; } void IncomingMessages::setSession(qpid::client::AsyncSession s) { session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); + acceptTracker.reset(); } bool IncomingMessages::get(Handler& handler, Duration timeout) @@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) void IncomingMessages::accept() { - session.messageAccept(unaccepted); - unaccepted.clear(); + acceptTracker.accept(session); } void IncomingMessages::releaseAll() @@ -121,8 +139,7 @@ void IncomingMessages::releaseAll() GetAny handler; while (process(&handler, 0)) ; //now release all messages - session.messageRelease(unaccepted); - unaccepted.clear(); + acceptTracker.release(session); } void IncomingMessages::releasePending(const std::string& destination) @@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) return false; } +uint32_t IncomingMessages::pendingAccept() +{ + return acceptTracker.acceptsPending(); +} +uint32_t IncomingMessages::pendingAccept(const std::string& destination) +{ + return acceptTracker.acceptsPending(destination); +} + +uint32_t IncomingMessages::available() +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + //return the count of received messages + return received.size(); +} + +uint32_t IncomingMessages::available(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + + //count all messages for this destination from received list + return std::for_each(received.begin(), received.end(), Match(destination)).matched; +} + void populate(qpid::messaging::Message& message, FrameSet& command); /** @@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m } const MessageTransferBody* transfer = command->as<MessageTransferBody>(); if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { - unaccepted.add(command->getId()); + acceptTracker.delivered(transfer->getDestination(), command->getId()); } session.markCompleted(command->getId(), false, false); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 5e28877305..e84cd18892 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -27,6 +27,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/sys/BlockingQueue.h" #include "qpid/sys/Time.h" +#include "qpid/client/amqp0_10/AcceptTracker.h" namespace qpid { @@ -74,13 +75,19 @@ class IncomingMessages void accept(); void releaseAll(); void releasePending(const std::string& destination); + + uint32_t pendingAccept(); + uint32_t pendingAccept(const std::string& destination); + + uint32_t available(); + uint32_t available(const std::string& destination); private: typedef std::deque<FrameSetPtr> FrameSetQueue; qpid::client::AsyncSession session; - qpid::framing::SequenceSet unaccepted; boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; FrameSetQueue received; + AcceptTracker acceptTracker; bool process(Handler*, qpid::sys::Duration); void retrieve(FrameSetPtr, qpid::messaging::Message*); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 31efff38a6..da91c4a160 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -120,6 +120,21 @@ qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; const std::string& ReceiverImpl::getName() const { return destination; } +uint32_t ReceiverImpl::getCapacity() +{ + return capacity; +} + +uint32_t ReceiverImpl::available() +{ + return parent.available(destination); +} + +uint32_t ReceiverImpl::pendingAck() +{ + return parent.pendingAck(destination); +} + ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a, const qpid::messaging::Filter* f, diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 509c784513..b941348fc8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -62,6 +62,9 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void stop(); const std::string& getName() const; void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t available(); + uint32_t pendingAck(); void setListener(qpid::messaging::MessageListener* listener); qpid::messaging::MessageListener* getListener(); void received(qpid::messaging::Message& message); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index c619d1226a..4cd2dc0521 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -32,11 +32,12 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address, const qpid::messaging::Variant::Map& _options) : parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), - capacity(50), window(0) {} + capacity(50), window(0), flushed(false) {} -void SenderImpl::send(const qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& message) { - execute1<Send>(&m); + Send f(*this, &message); + while (f.repeat) parent.execute(f); } void SenderImpl::cancel() @@ -44,6 +45,20 @@ void SenderImpl::cancel() execute<Cancel>(); } +void SenderImpl::setCapacity(uint32_t c) +{ + bool flush = c < capacity; + capacity = c; + execute1<CheckPendingSends>(flush); +} +uint32_t SenderImpl::getCapacity() { return capacity; } +uint32_t SenderImpl::pending() +{ + CheckPendingSends f(*this, false); + parent.execute(f); + return f.pending; +} + void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { session = s; @@ -60,18 +75,31 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) } } +void SenderImpl::waitForCapacity() +{ + //TODO: add option to throw exception rather than blocking? + if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + //Initial implementation is very basic. As outgoing is + //currently only reduced on receiving completions and we are + //blocking anyway we may as well sync(). If successful that + //should clear all outstanding sends. + session.sync(); + checkPendingSends(false); + } + //flush periodically and check for conmpleted sends + if (++window > (capacity / 4)) {//TODO: make this configurable? + checkPendingSends(true); + window = 0; + } +} + void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: make recoding for replay optional + //TODO: make recording for replay optional (would still want to track completion however) std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); outgoing.push_back(msg.release()); sink->send(session, name, outgoing.back()); - if (++window > (capacity / 2)) {//TODO: make this configurable? - session.flush(); - checkPendingSends(); - window = 0; - } } void SenderImpl::replay() @@ -81,11 +109,18 @@ void SenderImpl::replay() } } -void SenderImpl::checkPendingSends() +uint32_t SenderImpl::checkPendingSends(bool flush) { + if (flush) { + session.flush(); + flushed = true; + } else { + flushed = false; + } while (!outgoing.empty() && outgoing.front().status.isComplete()) { outgoing.pop_front(); } + return outgoing.size(); } void SenderImpl::cancelImpl() diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 4ba793d71c..028d26bda7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -51,6 +51,9 @@ class SenderImpl : public qpid::messaging::SenderImpl const qpid::messaging::Variant::Map& options); void send(const qpid::messaging::Message&); void cancel(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t pending(); void init(qpid::client::AsyncSession, AddressResolution&); private: @@ -69,14 +72,17 @@ class SenderImpl : public qpid::messaging::SenderImpl OutgoingMessages outgoing; uint32_t capacity; uint32_t window; + bool flushed; - void checkPendingSends(); + uint32_t checkPendingSends(bool flush); void replay(); + void waitForCapacity(); //logic for application visible methods: void sendImpl(const qpid::messaging::Message&); void cancelImpl(); + //functors for application visible methods (allowing locking and //retry to be centralised): struct Command @@ -89,9 +95,17 @@ class SenderImpl : public qpid::messaging::SenderImpl struct Send : Command { const qpid::messaging::Message* message; - - Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} - void operator()() { impl.sendImpl(*message); } + bool repeat; + + Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {} + void operator()() + { + impl.waitForCapacity(); + //from this point message will be recorded if there is any + //failure (and replayed) so need not repeat the call + repeat = false; + impl.sendImpl(*message); + } }; struct Cancel : Command @@ -100,6 +114,14 @@ class SenderImpl : public qpid::messaging::SenderImpl void operator()() { impl.cancelImpl(); } }; + struct CheckPendingSends : Command + { + bool flush; + uint32_t pending; + CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {} + void operator()() { pending = impl.checkPendingSends(flush); } + }; + //helper templates for some common patterns template <class F> void execute() { @@ -107,10 +129,10 @@ class SenderImpl : public qpid::messaging::SenderImpl parent.execute(f); } - template <class F, class P> void execute1(P p) + template <class F, class P> bool execute1(P p) { F f(*this, p); - parent.execute(f); + return parent.execute(f); } }; }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 0e6c430d89..bc6289d84b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -298,6 +298,61 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t } } +uint32_t SessionImpl::available() +{ + return get1<Available, uint32_t>((const std::string*) 0); +} +uint32_t SessionImpl::available(const std::string& destination) +{ + return get1<Available, uint32_t>(&destination); +} + +struct SessionImpl::Available : Command +{ + const std::string* destination; + uint32_t result; + + Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.availableImpl(destination); } +}; + +uint32_t SessionImpl::availableImpl(const std::string* destination) +{ + if (destination) { + return incoming.available(*destination); + } else { + return incoming.available(); + } +} + +uint32_t SessionImpl::pendingAck() +{ + return get1<PendingAck, uint32_t>((const std::string*) 0); +} + +uint32_t SessionImpl::pendingAck(const std::string& destination) +{ + return get1<PendingAck, uint32_t>(&destination); +} + +struct SessionImpl::PendingAck : Command +{ + const std::string* destination; + uint32_t result; + + PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.pendingAckImpl(destination); } +}; + +uint32_t SessionImpl::pendingAckImpl(const std::string* destination) +{ + if (destination) { + return incoming.pendingAccept(*destination); + } else { + return incoming.pendingAccept(); + } +} + void SessionImpl::syncImpl() { session.sync(); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 1c7db17bbb..b453f3f08f 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -83,6 +83,12 @@ class SessionImpl : public qpid::messaging::SessionImpl void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); + uint32_t available(); + uint32_t available(const std::string& destination); + + uint32_t pendingAck(); + uint32_t pendingAck(const std::string& destination); + void setSession(qpid::client::Session); template <class T> bool execute(T& f) @@ -128,6 +134,8 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, const qpid::messaging::Filter* filter, const qpid::messaging::VariantMap& options); + uint32_t availableImpl(const std::string* destination); + uint32_t pendingAckImpl(const std::string* destination); //functors for public facing methods (allows locking and retry //logic to be centralised) @@ -178,6 +186,8 @@ class SessionImpl : public qpid::messaging::SessionImpl struct CreateSender; struct CreateReceiver; + struct PendingAck; + struct Available; //helper templates for some common patterns template <class F> bool execute() @@ -196,6 +206,13 @@ class SessionImpl : public qpid::messaging::SessionImpl F f(*this, p); return execute(f); } + + template <class F, class R, class P> R get1(P p) + { + F f(*this, p); + while (!execute(f)) {} + return f.result; + } }; }}} // namespace qpid::client::amqp0_10 |