summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client')
-rw-r--r--qpid/cpp/src/qpid/client/Dispatcher.cpp11
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp111
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h85
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp48
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h50
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp53
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h9
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp15
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h3
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp55
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h34
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp55
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h17
13 files changed, 424 insertions, 122 deletions
diff --git a/qpid/cpp/src/qpid/client/Dispatcher.cpp b/qpid/cpp/src/qpid/client/Dispatcher.cpp
index 43cbf3aa4d..a715c623bf 100644
--- a/qpid/cpp/src/qpid/client/Dispatcher.cpp
+++ b/qpid/cpp/src/qpid/client/Dispatcher.cpp
@@ -29,7 +29,14 @@
#include "qpid/client/Message.h"
#include "qpid/client/MessageImpl.h"
-#include <boost/state_saver.hpp>
+#include <boost/version.hpp>
+#if (BOOST_VERSION >= 104000)
+# include <boost/serialization/state_saver.hpp>
+ using boost::serialization::state_saver;
+#else
+# include <boost/state_saver.hpp>
+ using boost::state_saver;
+#endif /* BOOST_VERSION */
using qpid::framing::FrameSet;
using qpid::framing::MessageTransferBody;
@@ -65,7 +72,7 @@ void Dispatcher::run()
Mutex::ScopedLock l(lock);
if (running)
throw Exception("Dispatcher is already running.");
- boost::state_saver<bool> reset(running); // Reset to false on exit.
+ state_saver<bool> reset(running); // Reset to false on exit.
running = true;
try {
while (!queue->isClosed()) {
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
new file mode 100644
index 0000000000..80be5c56f3
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AcceptTracker.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+void AcceptTracker::State::accept()
+{
+ unconfirmed.add(unaccepted);
+ unaccepted.clear();
+}
+
+void AcceptTracker::State::release()
+{
+ unaccepted.clear();
+}
+
+uint32_t AcceptTracker::State::acceptsPending()
+{
+ return unconfirmed.size();
+}
+
+void AcceptTracker::State::completed(qpid::framing::SequenceSet& set)
+{
+ unconfirmed.remove(set);
+}
+
+void AcceptTracker::delivered(const std::string& destination, const qpid::framing::SequenceNumber& id)
+{
+ aggregateState.unaccepted.add(id);
+ destinationState[destination].unaccepted.add(id);
+}
+
+void AcceptTracker::accept(qpid::client::AsyncSession& session)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.accept();
+ }
+ Record record;
+ record.status = session.messageAccept(aggregateState.unaccepted);
+ record.accepted = aggregateState.unaccepted;
+ pending.push_back(record);
+ aggregateState.accept();
+}
+
+void AcceptTracker::release(qpid::client::AsyncSession& session)
+{
+ session.messageRelease(aggregateState.unaccepted);
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.release();
+ }
+ aggregateState.release();
+}
+
+uint32_t AcceptTracker::acceptsPending()
+{
+ checkPending();
+ return aggregateState.acceptsPending();
+}
+
+uint32_t AcceptTracker::acceptsPending(const std::string& destination)
+{
+ checkPending();
+ return destinationState[destination].acceptsPending();
+}
+
+void AcceptTracker::reset()
+{
+ destinationState.clear();
+ aggregateState.unaccepted.clear();
+ aggregateState.unconfirmed.clear();
+ pending.clear();
+}
+
+void AcceptTracker::checkPending()
+{
+ while (!pending.empty() && pending.front().status.isComplete()) {
+ completed(pending.front().accepted);
+ pending.pop_front();
+ }
+}
+
+void AcceptTracker::completed(qpid::framing::SequenceSet& set)
+{
+ for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
+ i->second.completed(set);
+ }
+ aggregateState.completed(set);
+}
+
+}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
new file mode 100644
index 0000000000..fb58a3a8c8
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
@@ -0,0 +1,85 @@
+#ifndef QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+#define QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Completion.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include <deque>
+#include <map>
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+/**
+ * Tracks the set of messages requiring acceptance, and those for
+ * which an accept has been issued but is yet to be confirmed
+ * complete.
+ */
+class AcceptTracker
+{
+ public:
+ void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
+ void accept(qpid::client::AsyncSession&);
+ void release(qpid::client::AsyncSession&);
+ uint32_t acceptsPending();
+ uint32_t acceptsPending(const std::string& destination);
+ void reset();
+ private:
+ struct State
+ {
+ /**
+ * ids of messages that have been delivered but not yet
+ * accepted
+ */
+ qpid::framing::SequenceSet unaccepted;
+ /**
+ * ids of messages for which an accpet has been issued but not
+ * yet confirmed as completed
+ */
+ qpid::framing::SequenceSet unconfirmed;
+
+ void accept();
+ void release();
+ uint32_t acceptsPending();
+ void completed(qpid::framing::SequenceSet&);
+ };
+ typedef std::map<std::string, State> StateMap;
+ struct Record
+ {
+ qpid::client::Completion status;
+ qpid::framing::SequenceSet accepted;
+ };
+ typedef std::deque<Record> Records;
+
+ State aggregateState;
+ StateMap destinationState;
+ Records pending;
+
+ void checkPending();
+ void completed(qpid::framing::SequenceSet&);
+};
+}}} // namespace qpid::client::amqp0_10
+
+#endif /*!QPID_CLIENT_AMQP0_10_ACCEPTTRACKER_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
deleted file mode 100644
index 52b623b65c..0000000000
--- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.cpp
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "CompletionTracker.h"
-
-namespace qpid {
-namespace client {
-namespace amqp0_10 {
-
-using qpid::framing::SequenceNumber;
-
-void CompletionTracker::track(SequenceNumber command, void* token)
-{
- tokens[command] = token;
-}
-
-void CompletionTracker::completedTo(SequenceNumber command)
-{
- Tokens::iterator i = tokens.lower_bound(command);
- if (i != tokens.end()) {
- lastCompleted = i->second;
- tokens.erase(tokens.begin(), ++i);
- }
-}
-
-void* CompletionTracker::getLastCompletedToken()
-{
- return lastCompleted;
-}
-
-}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h b/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
deleted file mode 100644
index 6147c5682e..0000000000
--- a/qpid/cpp/src/qpid/client/amqp0_10/CompletionTracker.h
+++ /dev/null
@@ -1,50 +0,0 @@
-#ifndef QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
-#define QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/SequenceNumber.h"
-#include <map>
-
-namespace qpid {
-namespace client {
-namespace amqp0_10 {
-
-/**
- * Provides a mapping from command ids to application supplied
- * 'tokens', and is used to determine when the sending or
- * acknowledging of a specific message is complete.
- */
-class CompletionTracker
-{
- public:
- void track(qpid::framing::SequenceNumber command, void* token);
- void completedTo(qpid::framing::SequenceNumber command);
- void* getLastCompletedToken();
- private:
- typedef std::map<qpid::framing::SequenceNumber, void*> Tokens;
- Tokens tokens;
- void* lastCompleted;
-};
-}}} // namespace qpid::client::amqp0_10
-
-#endif /*!QPID_CLIENT_AMQP0_10_COMPLETIONTRACKER_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index b0a16674e1..d22208368b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -81,12 +81,31 @@ struct MatchAndTrack
}
}
};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
}
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
}
bool IncomingMessages::get(Handler& handler, Duration timeout)
@@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
void IncomingMessages::accept()
{
- session.messageAccept(unaccepted);
- unaccepted.clear();
+ acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
@@ -121,8 +139,7 @@ void IncomingMessages::releaseAll()
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
- session.messageRelease(unaccepted);
- unaccepted.clear();
+ acceptTracker.release(session);
}
void IncomingMessages::releasePending(const std::string& destination)
@@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+uint32_t IncomingMessages::pendingAccept()
+{
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+ //return the count of received messages
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+
+ //count all messages for this destination from received list
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
void populate(qpid::messaging::Message& message, FrameSet& command);
/**
@@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
- unaccepted.add(command->getId());
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
}
session.markCompleted(command->getId(), false, false);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 5e28877305..e84cd18892 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -27,6 +27,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/BlockingQueue.h"
#include "qpid/sys/Time.h"
+#include "qpid/client/amqp0_10/AcceptTracker.h"
namespace qpid {
@@ -74,13 +75,19 @@ class IncomingMessages
void accept();
void releaseAll();
void releasePending(const std::string& destination);
+
+ uint32_t pendingAccept();
+ uint32_t pendingAccept(const std::string& destination);
+
+ uint32_t available();
+ uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
qpid::client::AsyncSession session;
- qpid::framing::SequenceSet unaccepted;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
FrameSetQueue received;
+ AcceptTracker acceptTracker;
bool process(Handler*, qpid::sys::Duration);
void retrieve(FrameSetPtr, qpid::messaging::Message*);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 31efff38a6..da91c4a160 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -120,6 +120,21 @@ qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener;
const std::string& ReceiverImpl::getName() const { return destination; }
+uint32_t ReceiverImpl::getCapacity()
+{
+ return capacity;
+}
+
+uint32_t ReceiverImpl::available()
+{
+ return parent.available(destination);
+}
+
+uint32_t ReceiverImpl::pendingAck()
+{
+ return parent.pendingAck(destination);
+}
+
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
const qpid::messaging::Address& a,
const qpid::messaging::Filter* f,
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index 509c784513..b941348fc8 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -62,6 +62,9 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
void stop();
const std::string& getName() const;
void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t available();
+ uint32_t pendingAck();
void setListener(qpid::messaging::MessageListener* listener);
qpid::messaging::MessageListener* getListener();
void received(qpid::messaging::Message& message);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index c619d1226a..4cd2dc0521 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -32,11 +32,12 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address,
const qpid::messaging::Variant::Map& _options) :
parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
- capacity(50), window(0) {}
+ capacity(50), window(0), flushed(false) {}
-void SenderImpl::send(const qpid::messaging::Message& m)
+void SenderImpl::send(const qpid::messaging::Message& message)
{
- execute1<Send>(&m);
+ Send f(*this, &message);
+ while (f.repeat) parent.execute(f);
}
void SenderImpl::cancel()
@@ -44,6 +45,20 @@ void SenderImpl::cancel()
execute<Cancel>();
}
+void SenderImpl::setCapacity(uint32_t c)
+{
+ bool flush = c < capacity;
+ capacity = c;
+ execute1<CheckPendingSends>(flush);
+}
+uint32_t SenderImpl::getCapacity() { return capacity; }
+uint32_t SenderImpl::pending()
+{
+ CheckPendingSends f(*this, false);
+ parent.execute(f);
+ return f.pending;
+}
+
void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
session = s;
@@ -60,18 +75,31 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
}
}
+void SenderImpl::waitForCapacity()
+{
+ //TODO: add option to throw exception rather than blocking?
+ if (capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) {
+ //Initial implementation is very basic. As outgoing is
+ //currently only reduced on receiving completions and we are
+ //blocking anyway we may as well sync(). If successful that
+ //should clear all outstanding sends.
+ session.sync();
+ checkPendingSends(false);
+ }
+ //flush periodically and check for conmpleted sends
+ if (++window > (capacity / 4)) {//TODO: make this configurable?
+ checkPendingSends(true);
+ window = 0;
+ }
+}
+
void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: make recoding for replay optional
+ //TODO: make recording for replay optional (would still want to track completion however)
std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
msg->convert(m);
outgoing.push_back(msg.release());
sink->send(session, name, outgoing.back());
- if (++window > (capacity / 2)) {//TODO: make this configurable?
- session.flush();
- checkPendingSends();
- window = 0;
- }
}
void SenderImpl::replay()
@@ -81,11 +109,18 @@ void SenderImpl::replay()
}
}
-void SenderImpl::checkPendingSends()
+uint32_t SenderImpl::checkPendingSends(bool flush)
{
+ if (flush) {
+ session.flush();
+ flushed = true;
+ } else {
+ flushed = false;
+ }
while (!outgoing.empty() && outgoing.front().status.isComplete()) {
outgoing.pop_front();
}
+ return outgoing.size();
}
void SenderImpl::cancelImpl()
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 4ba793d71c..028d26bda7 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -51,6 +51,9 @@ class SenderImpl : public qpid::messaging::SenderImpl
const qpid::messaging::Variant::Map& options);
void send(const qpid::messaging::Message&);
void cancel();
+ void setCapacity(uint32_t);
+ uint32_t getCapacity();
+ uint32_t pending();
void init(qpid::client::AsyncSession, AddressResolution&);
private:
@@ -69,14 +72,17 @@ class SenderImpl : public qpid::messaging::SenderImpl
OutgoingMessages outgoing;
uint32_t capacity;
uint32_t window;
+ bool flushed;
- void checkPendingSends();
+ uint32_t checkPendingSends(bool flush);
void replay();
+ void waitForCapacity();
//logic for application visible methods:
void sendImpl(const qpid::messaging::Message&);
void cancelImpl();
+
//functors for application visible methods (allowing locking and
//retry to be centralised):
struct Command
@@ -89,9 +95,17 @@ class SenderImpl : public qpid::messaging::SenderImpl
struct Send : Command
{
const qpid::messaging::Message* message;
-
- Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {}
- void operator()() { impl.sendImpl(*message); }
+ bool repeat;
+
+ Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m), repeat(true) {}
+ void operator()()
+ {
+ impl.waitForCapacity();
+ //from this point message will be recorded if there is any
+ //failure (and replayed) so need not repeat the call
+ repeat = false;
+ impl.sendImpl(*message);
+ }
};
struct Cancel : Command
@@ -100,6 +114,14 @@ class SenderImpl : public qpid::messaging::SenderImpl
void operator()() { impl.cancelImpl(); }
};
+ struct CheckPendingSends : Command
+ {
+ bool flush;
+ uint32_t pending;
+ CheckPendingSends(SenderImpl& i, bool f) : Command(i), flush(f), pending(0) {}
+ void operator()() { pending = impl.checkPendingSends(flush); }
+ };
+
//helper templates for some common patterns
template <class F> void execute()
{
@@ -107,10 +129,10 @@ class SenderImpl : public qpid::messaging::SenderImpl
parent.execute(f);
}
- template <class F, class P> void execute1(P p)
+ template <class F, class P> bool execute1(P p)
{
F f(*this, p);
- parent.execute(f);
+ return parent.execute(f);
}
};
}}} // namespace qpid::client::amqp0_10
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 0e6c430d89..bc6289d84b 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -298,6 +298,61 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t
}
}
+uint32_t SessionImpl::available()
+{
+ return get1<Available, uint32_t>((const std::string*) 0);
+}
+uint32_t SessionImpl::available(const std::string& destination)
+{
+ return get1<Available, uint32_t>(&destination);
+}
+
+struct SessionImpl::Available : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.availableImpl(destination); }
+};
+
+uint32_t SessionImpl::availableImpl(const std::string* destination)
+{
+ if (destination) {
+ return incoming.available(*destination);
+ } else {
+ return incoming.available();
+ }
+}
+
+uint32_t SessionImpl::pendingAck()
+{
+ return get1<PendingAck, uint32_t>((const std::string*) 0);
+}
+
+uint32_t SessionImpl::pendingAck(const std::string& destination)
+{
+ return get1<PendingAck, uint32_t>(&destination);
+}
+
+struct SessionImpl::PendingAck : Command
+{
+ const std::string* destination;
+ uint32_t result;
+
+ PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.pendingAckImpl(destination); }
+};
+
+uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
+{
+ if (destination) {
+ return incoming.pendingAccept(*destination);
+ } else {
+ return incoming.pendingAccept();
+ }
+}
+
void SessionImpl::syncImpl()
{
session.sync();
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 1c7db17bbb..b453f3f08f 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -83,6 +83,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
+ uint32_t available();
+ uint32_t available(const std::string& destination);
+
+ uint32_t pendingAck();
+ uint32_t pendingAck(const std::string& destination);
+
void setSession(qpid::client::Session);
template <class T> bool execute(T& f)
@@ -128,6 +134,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address,
const qpid::messaging::Filter* filter,
const qpid::messaging::VariantMap& options);
+ uint32_t availableImpl(const std::string* destination);
+ uint32_t pendingAckImpl(const std::string* destination);
//functors for public facing methods (allows locking and retry
//logic to be centralised)
@@ -178,6 +186,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct CreateSender;
struct CreateReceiver;
+ struct PendingAck;
+ struct Available;
//helper templates for some common patterns
template <class F> bool execute()
@@ -196,6 +206,13 @@ class SessionImpl : public qpid::messaging::SessionImpl
F f(*this, p);
return execute(f);
}
+
+ template <class F, class R, class P> R get1(P p)
+ {
+ F f(*this, p);
+ while (!execute(f)) {}
+ return f.result;
+ }
};
}}} // namespace qpid::client::amqp0_10