diff options
author | Gordon Sim <gsim@apache.org> | 2009-09-14 10:21:49 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2009-09-14 10:21:49 +0000 |
commit | 1dc1518686b9829d5bb60deb985b4bef74ff786b (patch) | |
tree | 6683e7adebd4783a71b3dde43ccfedd7d207021b /qpid/cpp | |
parent | 251066fa400f831c7c1d231ff9ff95d4056bd004 (diff) | |
download | qpid-python-1dc1518686b9829d5bb60deb985b4bef74ff786b.tar.gz |
Added available and pendingAck properties to Receiver; added capacity and pending properties to Sender.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@814562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
25 files changed, 569 insertions, 149 deletions
diff --git a/qpid/cpp/include/qpid/messaging/Receiver.h b/qpid/cpp/include/qpid/messaging/Receiver.h index e51e1093d1..a4fdd7a34b 100644 --- a/qpid/cpp/include/qpid/messaging/Receiver.h +++ b/qpid/cpp/include/qpid/messaging/Receiver.h @@ -40,7 +40,7 @@ class MessageListener; class ReceiverImpl; /** - * A pull style interface for message retrieval. + * Interface through which messages are received. */ class Receiver : public qpid::client::Handle<ReceiverImpl> { @@ -75,7 +75,7 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); /** * Retrieves a message for this receivers subscription or waits - * for upto the specified timeout for one to become + * for up to the specified timeout for one to become * available. Unlike get() this method will check with the server * that there is no message for the subscription this receiver is * serving before throwing an exception. @@ -87,8 +87,8 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> */ QPID_CLIENT_EXTERN void start(); /** - * Stops the message flow for this receiver (without actually - * cancelling the subscription). + * Stops the message flow for this receiver (but does not cancel + * the subscription). */ QPID_CLIENT_EXTERN void stop(); /** @@ -97,14 +97,35 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> * requested by a client via fetch() (or pushed to a listener). */ QPID_CLIENT_EXTERN void setCapacity(uint32_t); + /** + * Returns the capacity of the receiver. The capacity determines + * how many incoming messages can be held in the receiver before + * being requested by a client via fetch() (or pushed to a + * listener). + */ + QPID_CLIENT_EXTERN uint32_t getCapacity(); + /** + * Returns the number of messages received and waiting to be + * fetched. + */ + QPID_CLIENT_EXTERN uint32_t available(); + /** + * Returns a count of the number of messages received on this + * receiver that have been acknowledged, but for which that + * acknowledgement has not yet been confirmed as processed by the + * server. + */ + QPID_CLIENT_EXTERN uint32_t pendingAck(); /** - * Cancels this receiver + * Cancels this receiver. */ QPID_CLIENT_EXTERN void cancel(); /** - * Set a message listener for receiving messages asynchronously. + * Set a message listener for this receiver. + * + * @see Session::dispatch() */ QPID_CLIENT_EXTERN void setListener(MessageListener* listener); private: diff --git a/qpid/cpp/include/qpid/messaging/Sender.h b/qpid/cpp/include/qpid/messaging/Sender.h index 45ec659ecf..9b83a04d60 100644 --- a/qpid/cpp/include/qpid/messaging/Sender.h +++ b/qpid/cpp/include/qpid/messaging/Sender.h @@ -23,6 +23,7 @@ */ #include "qpid/client/ClientImportExport.h" #include "qpid/client/Handle.h" +#include "qpid/sys/IntegerTypes.h" namespace qpid { namespace client { @@ -49,6 +50,24 @@ class Sender : public qpid::client::Handle<SenderImpl> QPID_CLIENT_EXTERN void send(const Message& message); QPID_CLIENT_EXTERN void cancel(); + + /** + * Sets the capacity for the sender. The capacity determines how + * many outgoing messages can be held pending confirmation of + * receipt by the broker. + */ + QPID_CLIENT_EXTERN void setCapacity(uint32_t); + /** + * Returns the capacity of the sender. + * @see setCapacity + */ + QPID_CLIENT_EXTERN uint32_t getCapacity(); + /** + * Returns the number of sent messages pending confirmation of + * receipt by the broker. (These are the 'in-doubt' messages). + */ + QPID_CLIENT_EXTERN uint32_t pending(); + private: friend class qpid::client::PrivateImplRef<Sender>; }; diff --git a/qpid/cpp/include/qpid/messaging/Session.h b/qpid/cpp/include/qpid/messaging/Session.h index 1d88882db6..979e27adae 100644 --- a/qpid/cpp/include/qpid/messaging/Session.h +++ b/qpid/cpp/include/qpid/messaging/Session.h @@ -75,6 +75,17 @@ class Session : public qpid::client::Handle<SessionImpl> QPID_CLIENT_EXTERN void sync(); QPID_CLIENT_EXTERN void flush(); + /** + * Returns the number of messages received and waiting to be + * fetched. + */ + QPID_CLIENT_EXTERN uint32_t available(); + /** + * Returns a count of the number of messages received this session + * that have been acknowledged, but for which that acknowledgement + * has not yet been confirmed as processed by the server. + */ + QPID_CLIENT_EXTERN uint32_t pendingAck(); QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); QPID_CLIENT_EXTERN bool dispatch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); @@ -88,9 +99,6 @@ class Session : public qpid::client::Handle<SessionImpl> QPID_CLIENT_EXTERN Receiver createReceiver(const std::string& address, const Filter& filter, const VariantMap& options = VariantMap()); QPID_CLIENT_EXTERN Address createTempQueue(const std::string& baseName = std::string()); - - QPID_CLIENT_EXTERN void* getLastConfirmedSent(); - QPID_CLIENT_EXTERN void* getLastConfirmedAcknowledged(); private: friend class qpid::client::PrivateImplRef<Session>; }; diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index ff0188890b..786facced9 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -537,12 +537,12 @@ set (libqpidclient_SOURCES qpid/messaging/Sender.cpp qpid/messaging/SenderImpl.h qpid/messaging/Variant.cpp + qpid/client/amqp0_10/AcceptTracker.h + qpid/client/amqp0_10/AcceptTracker.cpp qpid/client/amqp0_10/AddressResolution.h qpid/client/amqp0_10/AddressResolution.cpp qpid/client/amqp0_10/Codecs.cpp qpid/client/amqp0_10/CodecsInternal.h - qpid/client/amqp0_10/CompletionTracker.h - qpid/client/amqp0_10/CompletionTracker.cpp qpid/client/amqp0_10/ConnectionImpl.h qpid/client/amqp0_10/ConnectionImpl.cpp qpid/client/amqp0_10/IncomingMessages.h diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index aa074560d9..daae675749 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -698,14 +698,14 @@ libqpidclient_la_SOURCES = \ qpid/messaging/SenderImpl.h \ qpid/messaging/ReceiverImpl.h \ qpid/messaging/SessionImpl.h \ + qpid/client/amqp0_10/AcceptTracker.h \ + qpid/client/amqp0_10/AcceptTracker.cpp \ qpid/client/amqp0_10/AddressResolution.h \ qpid/client/amqp0_10/AddressResolution.cpp \ qpid/client/amqp0_10/Codecs.cpp \ qpid/client/amqp0_10/CodecsInternal.h \ qpid/client/amqp0_10/ConnectionImpl.h \ qpid/client/amqp0_10/ConnectionImpl.cpp \ - qpid/client/amqp0_10/CompletionTracker.h \ - qpid/client/amqp0_10/CompletionTracker.cpp \ qpid/client/amqp0_10/IncomingMessages.h \ qpid/client/amqp0_10/IncomingMessages.cpp \ qpid/client/amqp0_10/MessageSink.h \ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp new file mode 100644 index 0000000000..80be5c56f3 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AcceptTracker.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +void AcceptTracker::State::accept() +{ + unconfirmed.add(unaccepted); + unaccepted.clear(); +} + +void AcceptTracker::State::release() +{ + unaccepted.clear(); +} + +uint32_t AcceptTracker::State::acceptsPending() +{ + return unconfirmed.size(); +} + +void AcceptTracker::State::completed(qpid::framing::SequenceSet& set) +{ + unconfirmed.remove(set); +} + +void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id) +{ + aggregateState.unaccepted.add(id); + destinationState[destination].unaccepted.add(id); +} + +void AcceptTracker::accept(qpid::client::AsyncSession& session) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.accept(); + } + Record record; + record.status = session.messageAccept(aggregateState.unaccepted); + record.accepted = aggregateState.unaccepted; + pending.push_back(record); + aggregateState.accept(); +} + +void AcceptTracker::release(qpid::client::AsyncSession& session) +{ + session.messageRelease(aggregateState.unaccepted); + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.release(); + } + aggregateState.release(); +} + +uint32_t AcceptTracker::acceptsPending() +{ + checkPending(); + return aggregateState.acceptsPending(); +} + +uint32_t AcceptTracker::acceptsPending(const std::string& destination) +{ + checkPending(); + return destinationState[destination].acceptsPending(); +} + +void AcceptTracker::reset() +{ + destinationState.clear(); + aggregateState.unaccepted.clear(); + aggregateState.unconfirmed.clear(); + pending.clear(); +} + +void AcceptTracker::checkPending() +{ + while (!pending.empty() && pending.front().status.isComplete()) { + completed(pending.front().accepted); + pending.pop_front(); + } +} + +void AcceptTracker::completed(qpid::framing::SequenceSet& set) +{ + for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) { + i->second.completed(set); + } + aggregateState.completed(set); +} + +}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h new file mode 100644 index 0000000000..fb58a3a8c8 --- /dev/null +++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h @@ -0,0 +1,85 @@ +#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H +#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Completion.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" +#include <deque> +#include <map> + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Tracks the set of messages requiring acceptance, and those for + * which an accept has been issued but is yet to be confirmed + * complete. + */ +class AcceptTracker +{ + public: + void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id); + void accept(qpid::client::AsyncSession&); + void release(qpid::client::AsyncSession&); + uint32_t acceptsPending(); + uint32_t acceptsPending(const std::string& destination); + void reset(); + private: + struct State + { + /** + * ids of messages that have been delivered but not yet + * accepted + */ + qpid::framing::SequenceSet unaccepted; + /** + * ids of messages for which an accpet has been issued but not + * yet confirmed as completed + */ + qpid::framing::SequenceSet unconfirmed; + + void accept(); + void release(); + uint32_t acceptsPending(); + void completed(qpid::framing::SequenceSet&); + }; + typedef std::map<std::string, State> StateMap; + struct Record + { + qpid::client::Completion status; + qpid::framing::SequenceSet accepted; + }; + typedef std::deque<Record> Records; + + State aggregateState; + StateMap destinationState; + Records pending; + + void checkPending(); + void completed(qpid::framing::SequenceSet&); +}; +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp deleted file mode 100644 index 52b623b65c..0000000000 --- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "CompletionTracker.h" - -namespace qpid { -namespace client { -namespace amqp0_10 { - -using qpid::framing::SequenceNumber; - -void CompletionTracker::track(SequenceNumber command, void* token) -{ - tokens[command] = token; -} - -void CompletionTracker::completedTo(SequenceNumber command) -{ - Tokens::iterator i = tokens.lower_bound(command); - if (i != tokens.end()) { - lastCompleted = i->second; - tokens.erase(tokens.begin(), ++i); - } -} - -void* CompletionTracker::getLastCompletedToken() -{ - return lastCompleted; -} - -}}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h deleted file mode 100644 index 6147c5682e..0000000000 --- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H -#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/SequenceNumber.h" -#include <map> - -namespace qpid { -namespace client { -namespace amqp0_10 { - -/** - * Provides a mapping from command ids to application supplied - * 'tokens', and is used to determine when the sending or - * acknowledging of a specific message is complete. - */ -class CompletionTracker -{ - public: - void track(qpid::framing::SequenceNumber command, void* token); - void completedTo(qpid::framing::SequenceNumber command); - void* getLastCompletedToken(); - private: - typedef std::map<qpid::framing::SequenceNumber, void*> Tokens; - Tokens tokens; - void* lastCompleted; -}; -}}} // namespace qpid::client::amqp0_10 - -#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index b0a16674e1..d22208368b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -81,12 +81,31 @@ struct MatchAndTrack } } }; + +struct Match +{ + const std::string destination; + uint32_t matched; + + Match(const std::string& d) : destination(d), matched(0) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ++matched; + return true; + } else { + return false; + } + } +}; } void IncomingMessages::setSession(qpid::client::AsyncSession s) { session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); + acceptTracker.reset(); } bool IncomingMessages::get(Handler& handler, Duration timeout) @@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) void IncomingMessages::accept() { - session.messageAccept(unaccepted); - unaccepted.clear(); + acceptTracker.accept(session); } void IncomingMessages::releaseAll() @@ -121,8 +139,7 @@ void IncomingMessages::releaseAll() GetAny handler; while (process(&handler, 0)) ; //now release all messages - session.messageRelease(unaccepted); - unaccepted.clear(); + acceptTracker.release(session); } void IncomingMessages::releasePending(const std::string& destination) @@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) return false; } +uint32_t IncomingMessages::pendingAccept() +{ + return acceptTracker.acceptsPending(); +} +uint32_t IncomingMessages::pendingAccept(const std::string& destination) +{ + return acceptTracker.acceptsPending(destination); +} + +uint32_t IncomingMessages::available() +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + //return the count of received messages + return received.size(); +} + +uint32_t IncomingMessages::available(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + + //count all messages for this destination from received list + return std::for_each(received.begin(), received.end(), Match(destination)).matched; +} + void populate(qpid::messaging::Message& message, FrameSet& command); /** @@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m } const MessageTransferBody* transfer = command->as<MessageTransferBody>(); if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { - unaccepted.add(command->getId()); + acceptTracker.delivered(transfer->getDestination(), command->getId()); } session.markCompleted(command->getId(), false, false); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 5e28877305..e84cd18892 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -27,6 +27,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/sys/BlockingQueue.h" #include "qpid/sys/Time.h" +#include "qpid/client/amqp0_10/AcceptTracker.h" namespace qpid { @@ -74,13 +75,19 @@ class IncomingMessages void accept(); void releaseAll(); void releasePending(const std::string& destination); + + uint32_t pendingAccept(); + uint32_t pendingAccept(const std::string& destination); + + uint32_t available(); + uint32_t available(const std::string& destination); private: typedef std::deque<FrameSetPtr> FrameSetQueue; qpid::client::AsyncSession session; - qpid::framing::SequenceSet unaccepted; boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; FrameSetQueue received; + AcceptTracker acceptTracker; bool process(Handler*, qpid::sys::Duration); void retrieve(FrameSetPtr, qpid::messaging::Message*); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 31efff38a6..da91c4a160 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -120,6 +120,21 @@ qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; const std::string& ReceiverImpl::getName() const { return destination; } +uint32_t ReceiverImpl::getCapacity() +{ + return capacity; +} + +uint32_t ReceiverImpl::available() +{ + return parent.available(destination); +} + +uint32_t ReceiverImpl::pendingAck() +{ + return parent.pendingAck(destination); +} + ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a, const qpid::messaging::Filter* f, diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 509c784513..b941348fc8 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -62,6 +62,9 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void stop(); const std::string& getName() const; void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t available(); + uint32_t pendingAck(); void setListener(qpid::messaging::MessageListener* listener); qpid::messaging::MessageListener* getListener(); void received(qpid::messaging::Message& message); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index c619d1226a..4cd2dc0521 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -32,11 +32,12 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address, const qpid::messaging::Variant::Map& _options) : parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), - capacity(50), window(0) {} + capacity(50), window(0), flushed(false) {} -void SenderImpl::send(const qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& message) { - execute1<Send>(&m); + Send f(*this, &message); + while (f.repeat) parent.execute(f); } void SenderImpl::cancel() @@ -44,6 +45,20 @@ void SenderImpl::cancel() execute<Cancel>(); } +void SenderImpl::setCapacity(uint32_t c) +{ + bool flush = c < capacity; + capacity = c; + execute1<CheckPendingSends>(flush); +} +uint32_t SenderImpl::getCapacity() { return capacity; } +uint32_t SenderImpl::pending() +{ + CheckPendingSends f(*this, false); + parent.execute(f); + return f.pending; +} + void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { session = s; @@ -60,18 +75,31 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) } } +void SenderImpl::waitForCapacity() +{ + //TODO: add option to throw exception rather than blocking? + if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + //Initial implementation is very basic. As outgoing is + //currently only reduced on receiving completions and we are + //blocking anyway we may as well sync(). If successful that + //should clear all outstanding sends. + session.sync(); + checkPendingSends(false); + } + //flush periodically and check for conmpleted sends + if (++window > (capacity / 4)) {//TODO: make this configurable? + checkPendingSends(true); + window = 0; + } +} + void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: make recoding for replay optional + //TODO: make recording for replay optional (would still want to track completion however) std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); outgoing.push_back(msg.release()); sink->send(session, name, outgoing.back()); - if (++window > (capacity / 2)) {//TODO: make this configurable? - session.flush(); - checkPendingSends(); - window = 0; - } } void SenderImpl::replay() @@ -81,11 +109,18 @@ void SenderImpl::replay() } } -void SenderImpl::checkPendingSends() +uint32_t SenderImpl::checkPendingSends(bool flush) { + if (flush) { + session.flush(); + flushed = true; + } else { + flushed = false; + } while (!outgoing.empty() && outgoing.front().status.isComplete()) { outgoing.pop_front(); } + return outgoing.size(); } void SenderImpl::cancelImpl() diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 4ba793d71c..028d26bda7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -51,6 +51,9 @@ class SenderImpl : public qpid::messaging::SenderImpl const qpid::messaging::Variant::Map& options); void send(const qpid::messaging::Message&); void cancel(); + void setCapacity(uint32_t); + uint32_t getCapacity(); + uint32_t pending(); void init(qpid::client::AsyncSession, AddressResolution&); private: @@ -69,14 +72,17 @@ class SenderImpl : public qpid::messaging::SenderImpl OutgoingMessages outgoing; uint32_t capacity; uint32_t window; + bool flushed; - void checkPendingSends(); + uint32_t checkPendingSends(bool flush); void replay(); + void waitForCapacity(); //logic for application visible methods: void sendImpl(const qpid::messaging::Message&); void cancelImpl(); + //functors for application visible methods (allowing locking and //retry to be centralised): struct Command @@ -89,9 +95,17 @@ class SenderImpl : public qpid::messaging::SenderImpl struct Send : Command { const qpid::messaging::Message* message; - - Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} - void operator()() { impl.sendImpl(*message); } + bool repeat; + + Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {} + void operator()() + { + impl.waitForCapacity(); + //from this point message will be recorded if there is any + //failure (and replayed) so need not repeat the call + repeat = false; + impl.sendImpl(*message); + } }; struct Cancel : Command @@ -100,6 +114,14 @@ class SenderImpl : public qpid::messaging::SenderImpl void operator()() { impl.cancelImpl(); } }; + struct CheckPendingSends : Command + { + bool flush; + uint32_t pending; + CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {} + void operator()() { pending = impl.checkPendingSends(flush); } + }; + //helper templates for some common patterns template <class F> void execute() { @@ -107,10 +129,10 @@ class SenderImpl : public qpid::messaging::SenderImpl parent.execute(f); } - template <class F, class P> void execute1(P p) + template <class F, class P> bool execute1(P p) { F f(*this, p); - parent.execute(f); + return parent.execute(f); } }; }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 0e6c430d89..bc6289d84b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -298,6 +298,61 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t } } +uint32_t SessionImpl::available() +{ + return get1<Available, uint32_t>((const std::string*) 0); +} +uint32_t SessionImpl::available(const std::string& destination) +{ + return get1<Available, uint32_t>(&destination); +} + +struct SessionImpl::Available : Command +{ + const std::string* destination; + uint32_t result; + + Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.availableImpl(destination); } +}; + +uint32_t SessionImpl::availableImpl(const std::string* destination) +{ + if (destination) { + return incoming.available(*destination); + } else { + return incoming.available(); + } +} + +uint32_t SessionImpl::pendingAck() +{ + return get1<PendingAck, uint32_t>((const std::string*) 0); +} + +uint32_t SessionImpl::pendingAck(const std::string& destination) +{ + return get1<PendingAck, uint32_t>(&destination); +} + +struct SessionImpl::PendingAck : Command +{ + const std::string* destination; + uint32_t result; + + PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.pendingAckImpl(destination); } +}; + +uint32_t SessionImpl::pendingAckImpl(const std::string* destination) +{ + if (destination) { + return incoming.pendingAccept(*destination); + } else { + return incoming.pendingAccept(); + } +} + void SessionImpl::syncImpl() { session.sync(); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 1c7db17bbb..b453f3f08f 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -83,6 +83,12 @@ class SessionImpl : public qpid::messaging::SessionImpl void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); + uint32_t available(); + uint32_t available(const std::string& destination); + + uint32_t pendingAck(); + uint32_t pendingAck(const std::string& destination); + void setSession(qpid::client::Session); template <class T> bool execute(T& f) @@ -128,6 +134,8 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address, const qpid::messaging::Filter* filter, const qpid::messaging::VariantMap& options); + uint32_t availableImpl(const std::string* destination); + uint32_t pendingAckImpl(const std::string* destination); //functors for public facing methods (allows locking and retry //logic to be centralised) @@ -178,6 +186,8 @@ class SessionImpl : public qpid::messaging::SessionImpl struct CreateSender; struct CreateReceiver; + struct PendingAck; + struct Available; //helper templates for some common patterns template <class F> bool execute() @@ -196,6 +206,13 @@ class SessionImpl : public qpid::messaging::SessionImpl F f(*this, p); return execute(f); } + + template <class F, class R, class P> R get1(P p) + { + F f(*this, p); + while (!execute(f)) {} + return f.result; + } }; }}} // namespace qpid::client::amqp0_10 diff --git a/qpid/cpp/src/qpid/messaging/Address.cpp b/qpid/cpp/src/qpid/messaging/Address.cpp index ed35054a00..813a8e1377 100644 --- a/qpid/cpp/src/qpid/messaging/Address.cpp +++ b/qpid/cpp/src/qpid/messaging/Address.cpp @@ -21,9 +21,6 @@ #include "qpid/messaging/Address.h" namespace qpid { -namespace client { -} - namespace messaging { Address::Address() {} diff --git a/qpid/cpp/src/qpid/messaging/Receiver.cpp b/qpid/cpp/src/qpid/messaging/Receiver.cpp index 2e8b89d27f..3290ea98ac 100644 --- a/qpid/cpp/src/qpid/messaging/Receiver.cpp +++ b/qpid/cpp/src/qpid/messaging/Receiver.cpp @@ -45,6 +45,9 @@ Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeou void Receiver::start() { impl->start(); } void Receiver::stop() { impl->stop(); } void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } +uint32_t Receiver::getCapacity() { return impl->getCapacity(); } +uint32_t Receiver::available() { return impl->available(); } +uint32_t Receiver::pendingAck() { return impl->pendingAck(); } void Receiver::cancel() { impl->cancel(); } void Receiver::setListener(MessageListener* listener) { impl->setListener(listener); } diff --git a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h index 77697b730c..7db20acc29 100644 --- a/qpid/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/qpid/cpp/src/qpid/messaging/ReceiverImpl.h @@ -44,6 +44,9 @@ class ReceiverImpl : public virtual qpid::RefCounted virtual void start() = 0; virtual void stop() = 0; virtual void setCapacity(uint32_t) = 0; + virtual uint32_t getCapacity() = 0; + virtual uint32_t available() = 0; + virtual uint32_t pendingAck() = 0; virtual void cancel() = 0; virtual void setListener(MessageListener*) = 0; }; diff --git a/qpid/cpp/src/qpid/messaging/Sender.cpp b/qpid/cpp/src/qpid/messaging/Sender.cpp index 8db700b060..62b2944701 100644 --- a/qpid/cpp/src/qpid/messaging/Sender.cpp +++ b/qpid/cpp/src/qpid/messaging/Sender.cpp @@ -40,5 +40,8 @@ Sender::~Sender() { PI::dtor(*this); } Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); } void Sender::send(const Message& message) { impl->send(message); } void Sender::cancel() { impl->cancel(); } +void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); } +uint32_t Sender::getCapacity() { return impl->getCapacity(); } +uint32_t Sender::pending() { return impl->pending(); } }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SenderImpl.h b/qpid/cpp/src/qpid/messaging/SenderImpl.h index 77d2cfaeaf..fa3794ca4e 100644 --- a/qpid/cpp/src/qpid/messaging/SenderImpl.h +++ b/qpid/cpp/src/qpid/messaging/SenderImpl.h @@ -37,6 +37,9 @@ class SenderImpl : public virtual qpid::RefCounted virtual ~SenderImpl() {} virtual void send(const Message& message) = 0; virtual void cancel() = 0; + virtual void setCapacity(uint32_t) = 0; + virtual uint32_t getCapacity() = 0; + virtual uint32_t pending() = 0; private: }; }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/Session.cpp b/qpid/cpp/src/qpid/messaging/Session.cpp index 284b20dacc..62b1ca0dcf 100644 --- a/qpid/cpp/src/qpid/messaging/Session.cpp +++ b/qpid/cpp/src/qpid/messaging/Session.cpp @@ -103,15 +103,7 @@ bool Session::dispatch(qpid::sys::Duration timeout) { return impl->dispatch(timeout); } - -void* Session::getLastConfirmedSent() -{ - return impl->getLastConfirmedSent(); -} - -void* Session::getLastConfirmedAcknowledged() -{ - return impl->getLastConfirmedAcknowledged(); -} +uint32_t Session::available() { return impl->available(); } +uint32_t Session::pendingAck() { return impl->pendingAck(); } }} // namespace qpid::messaging diff --git a/qpid/cpp/src/qpid/messaging/SessionImpl.h b/qpid/cpp/src/qpid/messaging/SessionImpl.h index 9b122a24bc..0933cea9c8 100644 --- a/qpid/cpp/src/qpid/messaging/SessionImpl.h +++ b/qpid/cpp/src/qpid/messaging/SessionImpl.h @@ -56,8 +56,8 @@ class SessionImpl : public virtual qpid::RefCounted virtual Sender createSender(const Address& address, const VariantMap& options) = 0; virtual Receiver createReceiver(const Address& address, const VariantMap& options) = 0; virtual Receiver createReceiver(const Address& address, const Filter& filter, const VariantMap& options) = 0; - virtual void* getLastConfirmedSent() = 0; - virtual void* getLastConfirmedAcknowledged() = 0; + virtual uint32_t available() = 0; + virtual uint32_t pendingAck() = 0; private: }; }} // namespace qpid::messaging diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 239b899dc0..4e69239842 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -198,7 +198,6 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders) Receiver receiver = fix.session.createReceiver(fix.queue); Message in; for (uint i = 0; i < 10; ++i) { - //Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC)); BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i); @@ -360,6 +359,83 @@ QPID_AUTO_TEST_CASE(testReject) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testAvailable) +{ + MultiQueueFixture fix; + + Receiver r1 = fix.session.createReceiver(fix.queues[0]); + r1.setCapacity(100); + r1.start(); + + Receiver r2 = fix.session.createReceiver(fix.queues[1]); + r2.setCapacity(100); + r2.start(); + + Sender s1 = fix.session.createSender(fix.queues[0]); + Sender s2 = fix.session.createSender(fix.queues[1]); + + for (uint i = 0; i < 10; ++i) { + s1.send(Message((boost::format("A_%1%") % (i+1)).str())); + } + for (uint i = 0; i < 5; ++i) { + s2.send(Message((boost::format("B_%1%") % (i+1)).str())); + } + sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched? + for (uint i = 0; i < 5; ++i) { + BOOST_CHECK_EQUAL(fix.session.available(), 15u - 2*i); + BOOST_CHECK_EQUAL(r1.available(), 10u - i); + BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str()); + BOOST_CHECK_EQUAL(r2.available(), 5u - i); + BOOST_CHECK_EQUAL(r2.fetch().getBytes(), (boost::format("B_%1%") % (i+1)).str()); + fix.session.acknowledge(); + } + for (uint i = 5; i < 10; ++i) { + BOOST_CHECK_EQUAL(fix.session.available(), 10u - i); + BOOST_CHECK_EQUAL(r1.available(), 10u - i); + BOOST_CHECK_EQUAL(r1.fetch().getBytes(), (boost::format("A_%1%") % (i+1)).str()); + } +} + +QPID_AUTO_TEST_CASE(testPendingAck) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + for (uint i = 0; i < 10; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + Receiver receiver = fix.session.createReceiver(fix.queue); + for (uint i = 0; i < 10; ++i) { + BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str()); + } + BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u); + fix.session.acknowledge(); + BOOST_CHECK_EQUAL(fix.session.pendingAck(), 10u); + fix.session.sync(); + BOOST_CHECK_EQUAL(fix.session.pendingAck(), 0u); +} + +QPID_AUTO_TEST_CASE(testPendingSend) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + for (uint i = 0; i < 10; ++i) { + sender.send(Message((boost::format("Message_%1%") % (i+1)).str())); + } + //Note: this test relies on 'inside knowledge' of the sender + //implementation and the fact that the simple test case makes it + //possible to predict when completion information will be sent to + //the client. TODO: is there a better way of testing this? + BOOST_CHECK_EQUAL(sender.pending(), 10u); + fix.session.sync(); + BOOST_CHECK_EQUAL(sender.pending(), 0u); + + Receiver receiver = fix.session.createReceiver(fix.queue); + for (uint i = 0; i < 10; ++i) { + BOOST_CHECK_EQUAL(receiver.fetch().getBytes(), (boost::format("Message_%1%") % (i+1)).str()); + } + fix.session.acknowledge(); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |