summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-06 20:27:33 +0000
committerGordon Sim <gsim@apache.org>2007-09-06 20:27:33 +0000
commitb33a63b36c659a894143382d0a61efe6a598fcc6 (patch)
tree0efc848ae9cc6064d615c6968b1d127e92b231d3 /cpp/src
parent748698e4b8d5bd0c3ccec4ca898d334c13fc0795 (diff)
downloadqpid-python-b33a63b36c659a894143382d0a61efe6a598fcc6.tar.gz
Implementation of execution.result on the client side
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am13
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxAck.h4
-rw-r--r--cpp/src/qpid/broker/Session.h4
-rw-r--r--cpp/src/qpid/broker/TxAck.cpp1
-rw-r--r--cpp/src/qpid/broker/TxAck.h6
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp65
-rw-r--r--cpp/src/qpid/client/ClientChannel.h7
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp12
-rw-r--r--cpp/src/qpid/client/Completion.h53
-rw-r--r--cpp/src/qpid/client/CompletionTracker.cpp74
-rw-r--r--cpp/src/qpid/client/CompletionTracker.h31
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp9
-rw-r--r--cpp/src/qpid/client/Execution.h40
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp87
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h37
-rw-r--r--cpp/src/qpid/client/Future.h97
-rw-r--r--cpp/src/qpid/client/FutureCompletion.cpp21
-rw-r--r--cpp/src/qpid/client/FutureCompletion.h10
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp5
-rw-r--r--cpp/src/qpid/client/FutureResponse.h4
-rw-r--r--cpp/src/qpid/client/FutureResult.cpp43
-rw-r--r--cpp/src/qpid/client/FutureResult.h46
-rw-r--r--cpp/src/qpid/client/Response.h19
-rw-r--r--cpp/src/qpid/client/ScopedAssociation.h53
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp94
-rw-r--r--cpp/src/qpid/client/SessionCore.h22
-rw-r--r--cpp/src/qpid/client/TypedResult.h51
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h4
-rw-r--r--cpp/src/qpid/framing/AccumulatedAck.cpp (renamed from cpp/src/qpid/broker/AccumulatedAck.cpp)22
-rw-r--r--cpp/src/qpid/framing/AccumulatedAck.h (renamed from cpp/src/qpid/broker/AccumulatedAck.h)24
-rw-r--r--cpp/src/qpid/framing/StructHelper.h2
-rw-r--r--cpp/src/tests/AccumulatedAckTest.cpp8
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp71
-rw-r--r--cpp/src/tests/InProcessBroker.h1
-rw-r--r--cpp/src/tests/Makefile.am5
39 files changed, 826 insertions, 228 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index a3c10d53f4..d97265c1d6 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -98,6 +98,7 @@ libqpidcommon_la_LIBADD = \
libqpidcommon_la_SOURCES = \
$(rgen_common_cpp) \
$(platform_src) \
+ qpid/framing/AccumulatedAck.cpp \
qpid/framing/AMQBody.cpp \
qpid/framing/AMQMethodBody.cpp \
qpid/framing/AMQContentBody.cpp \
@@ -155,7 +156,6 @@ libqpidcommon_la_SOURCES = \
libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams
libqpidbroker_la_SOURCES = \
- qpid/broker/AccumulatedAck.cpp \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
@@ -218,14 +218,13 @@ libqpidclient_la_SOURCES = \
qpid/client/ExecutionHandler.cpp \
qpid/client/FutureCompletion.cpp \
qpid/client/FutureResponse.cpp \
- qpid/client/FutureFactory.cpp \
+ qpid/client/FutureResult.cpp \
qpid/client/SessionCore.cpp \
qpid/client/StateManager.cpp
nobase_include_HEADERS = \
$(platform_hdr) \
- qpid/broker/AccumulatedAck.h \
qpid/broker/BrokerExchange.h \
qpid/broker/BrokerQueue.h \
qpid/broker/Consumer.h \
@@ -295,6 +294,7 @@ nobase_include_HEADERS = \
qpid/client/Connection.h \
qpid/client/ConnectionImpl.h \
qpid/client/Connector.h \
+ qpid/client/Completion.h \
qpid/client/MessageListener.h \
qpid/client/BlockingQueue.h \
qpid/client/Correlator.h \
@@ -302,13 +302,18 @@ nobase_include_HEADERS = \
qpid/client/ChannelHandler.h \
qpid/client/ChainableFrameHandler.h \
qpid/client/ConnectionHandler.h \
+ qpid/client/Execution.h \
qpid/client/ExecutionHandler.h \
+ qpid/client/Future.h \
qpid/client/FutureCompletion.h \
qpid/client/FutureResponse.h \
- qpid/client/FutureFactory.h \
+ qpid/client/FutureResult.h \
qpid/client/Response.h \
+ qpid/client/ScopedAssociation.h \
qpid/client/SessionCore.h \
qpid/client/StateManager.h \
+ qpid/client/TypedResult.h \
+ qpid/framing/AccumulatedAck.h \
qpid/framing/AMQBody.h \
qpid/framing/AMQContentBody.h \
qpid/framing/AMQDataBlock.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 1a44b09188..b3a8e135a3 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -171,7 +171,6 @@ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
queue->getSettings(),
queue->getMessageCount(),
queue->getConsumerCount());
-
}
void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 7649715ade..9b33fd5f10 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -65,7 +65,7 @@ bool DeliveryRecord::after(DeliveryId tag) const{
return id > tag;
}
-bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{
return range->covers(id);
}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 583579ac10..3caac6bf40 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -25,7 +25,7 @@
#include <list>
#include <vector>
#include <ostream>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "BrokerQueue.h"
#include "Consumer.h"
#include "DeliveryId.h"
@@ -56,7 +56,7 @@ class DeliveryRecord{
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
- bool coveredBy(const AccumulatedAck* const range) const;
+ bool coveredBy(const framing::AccumulatedAck* const range) const;
void requeue() const;
void release();
void reject();
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index badf3564e7..25186b4102 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -26,7 +26,7 @@ using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h
index 84afd00c9c..c61b279c42 100644
--- a/cpp/src/qpid/broker/DtxAck.h
+++ b/cpp/src/qpid/broker/DtxAck.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -34,7 +34,7 @@ namespace qpid {
std::list<DeliveryRecord> pending;
public:
- DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h
index 8458f4cabf..6b9d3e9557 100644
--- a/cpp/src/qpid/broker/Session.h
+++ b/cpp/src/qpid/broker/Session.h
@@ -22,7 +22,6 @@
*
*/
-#include "AccumulatedAck.h"
#include "Consumer.h"
#include "Deliverable.h"
#include "DeliveryAdapter.h"
@@ -35,6 +34,7 @@
#include "TxBuffer.h"
#include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid/shared_ptr.h"
#include <boost/ptr_container/ptr_vector.hpp>
@@ -116,7 +116,7 @@ class Session : public framing::FrameHandler::Chains,
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
- AccumulatedAck accumulatedAck;
+ framing::AccumulatedAck accumulatedAck;
bool opened;
bool flowActive;
diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp
index 958dbcbec0..05ea755d71 100644
--- a/cpp/src/qpid/broker/TxAck.cpp
+++ b/cpp/src/qpid/broker/TxAck.cpp
@@ -25,6 +25,7 @@ using std::bind1st;
using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
+using qpid::framing::AccumulatedAck;
TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
acked(_acked), unacked(_unacked){
diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h
index 5e6d0a370c..c8383b6314 100644
--- a/cpp/src/qpid/broker/TxAck.h
+++ b/cpp/src/qpid/broker/TxAck.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -35,7 +35,7 @@ namespace qpid {
* transactional channel.
*/
class TxAck : public TxOp{
- AccumulatedAck& acked;
+ framing::AccumulatedAck& acked;
std::list<DeliveryRecord>& unacked;
public:
@@ -44,7 +44,7 @@ namespace qpid {
* acks received
* @param unacked the record of delivered messages
*/
- TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index cc2b7aedc8..1a0fd25bc3 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -56,7 +56,7 @@ class ScopedSync
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false),
- uniqueId(true)/*could eventually be the session id*/, nameCounter(0)
+ uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false)
{
}
@@ -65,26 +65,25 @@ Channel::~Channel()
join();
}
-void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s)
+void Channel::open(const Session& s)
{
+ Mutex::ScopedLock l(lock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
-
- connection = c;
- sessionCore = s;
- session = auto_ptr<Session>(new Session(c, s));
+ active = true;
+ session = s;
}
bool Channel::isOpen() const {
Mutex::ScopedLock l(lock);
- return connection;
+ return active;
}
void Channel::setQos() {
- session->basicQos(0, getPrefetch(), false);
+ session.basicQos(0, getPrefetch(), false);
if(isTransactional()) {
//I think this is wrong! should only send TxSelect once...
- session->txSelect();
+ session.txSelect();
}
}
@@ -95,13 +94,13 @@ void Channel::setPrefetch(uint16_t _prefetch){
void Channel::declareExchange(Exchange& exchange, bool synch){
FieldTable args;
- ScopedSync s(*session, synch);
- session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
+ ScopedSync s(session, synch);
+ session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args);
}
void Channel::deleteExchange(Exchange& exchange, bool synch){
- ScopedSync s(*session, synch);
- session->exchangeDelete(0, exchange.getName(), false);
+ ScopedSync s(session, synch);
+ session.exchangeDelete(0, exchange.getName(), false);
}
void Channel::declareQueue(Queue& queue, bool synch){
@@ -112,30 +111,30 @@ void Channel::declareQueue(Queue& queue, bool synch){
}
FieldTable args;
- ScopedSync s(*session, synch);
- session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
+ ScopedSync s(session, synch);
+ session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(),
queue.isExclusive(), queue.isAutoDelete(), args);
}
void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){
- ScopedSync s(*session, synch);
- session->queueDelete(0, queue.getName(), ifunused, ifempty);
+ ScopedSync s(session, synch);
+ session.queueDelete(0, queue.getName(), ifunused, ifempty);
}
void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){
string e = exchange.getName();
string q = queue.getName();
- ScopedSync s(*session, synch);
- session->queueBind(0, q, e, key, args);
+ ScopedSync s(session, synch);
+ session.queueBind(0, q, e, key, args);
}
void Channel::commit(){
- session->txCommit();
+ session.txCommit();
}
void Channel::rollback(){
- session->txRollback();
+ session.txRollback();
}
void Channel::consume(
@@ -155,8 +154,8 @@ void Channel::consume(
c.ackMode = ackMode;
c.lastDeliveryTag = 0;
}
- ScopedSync s(*session, synch);
- session->basicConsume(0, queue.getName(), tag, noLocal,
+ ScopedSync s(session, synch);
+ session.basicConsume(0, queue.getName(), tag, noLocal,
ackMode == NO_ACK, false, !synch,
fields ? *fields : FieldTable());
}
@@ -171,13 +170,13 @@ void Channel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- ScopedSync s(*session, synch);
- session->basicCancel(tag);
+ ScopedSync s(session, synch);
+ session.basicCancel(tag);
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK);
- sessionCore->flush();//TODO: need to expose the ability to request completion info through session
+ Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK);
+ session.execution().sendFlushRequest();
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
@@ -194,19 +193,15 @@ void Channel::publish(const Message& msg, const Exchange& exchange,
const string e = exchange.getName();
string key = routingKey;
- session->basicPublish(0, e, key, mandatory, immediate, msg);
+ session.basicPublish(0, e, key, mandatory, immediate, msg);
}
void Channel::close()
{
- session->close();
+ session.close();
{
Mutex::ScopedLock l(lock);
- if (connection);
- {
- sessionCore.reset();
- connection.reset();
- }
+ active = false;
}
stop();
}
@@ -232,7 +227,7 @@ void Channel::join() {
void Channel::run() {
try {
while (true) {
- FrameSet::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session.get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index c355fe007a..7ba4b0a246 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -79,18 +79,17 @@ class Channel : private sys::Runnable
bool running;
ConsumerMap consumers;
- ConnectionImpl::shared_ptr connection;
- std::auto_ptr<Session> session;
- SessionCore::shared_ptr sessionCore;
+ Session session;
framing::ChannelId channelId;
BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
uint32_t nameCounter;
+ bool active;
void stop();
void setQos();
- void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr);
+ void open(const Session& session);
void closeInternal();
void join();
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index e1581503f9..8c5f83f9f5 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -25,6 +25,7 @@
#include "Connection.h"
#include "ClientChannel.h"
#include "ClientMessage.h"
+#include "ScopedAssociation.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
@@ -66,18 +67,15 @@ void Connection::open(
}
void Connection::openChannel(Channel& channel) {
- ChannelId id = ++channelIdCounter;
- SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- impl->allocated(session);
- channel.open(impl, session);
- session->open();
+ channel.open(newSession());
}
Session Connection::newSession() {
ChannelId id = ++channelIdCounter;
SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size));
- impl->allocated(session);
- return Session(impl, session);
+ ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl));
+ session->open();
+ return Session(assoc);
}
void Connection::close()
diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h
new file mode 100644
index 0000000000..000bba2138
--- /dev/null
+++ b/cpp/src/qpid/client/Completion.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Completion_
+#define _Completion_
+
+#include <boost/shared_ptr.hpp>
+#include "Future.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Completion
+{
+protected:
+ Future future;
+ SessionCore::shared_ptr session;
+
+public:
+ Completion(Future f, SessionCore::shared_ptr s) : future(f), session(s) {}
+
+ void sync()
+ {
+ future.sync(*session);
+ }
+
+ bool isComplete() {
+ return future.isComplete();
+ }
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp
index 996971dbd2..46a7384ac2 100644
--- a/cpp/src/qpid/client/CompletionTracker.cpp
+++ b/cpp/src/qpid/client/CompletionTracker.cpp
@@ -20,45 +20,101 @@
*/
#include "CompletionTracker.h"
+#include <algorithm>
using qpid::client::CompletionTracker;
using namespace qpid::framing;
using namespace boost;
+namespace
+{
+const std::string empty;
+}
+
CompletionTracker::CompletionTracker() {}
CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {}
+void CompletionTracker::close()
+{
+ sys::Mutex::ScopedLock l(lock);
+ while (!listeners.empty()) {
+ Record r(listeners.front());
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ r.completed();
+ }
+ listeners.pop_front();
+ }
+}
void CompletionTracker::completed(const SequenceNumber& _mark)
{
sys::Mutex::ScopedLock l(lock);
mark = _mark;
- while (!listeners.empty() && !(listeners.front().first > mark)) {
- Listener f(listeners.front().second);
+ while (!listeners.empty() && !(listeners.front().id > mark)) {
+ Record r(listeners.front());
{
sys::Mutex::ScopedUnlock u(lock);
- f();
+ r.completed();
}
- listeners.pop();
+ listeners.pop_front();
+ }
+}
+
+void CompletionTracker::received(const SequenceNumber& id, const std::string& result)
+{
+ sys::Mutex::ScopedLock l(lock);
+ Listeners::iterator i = seek(id);
+ if (i != listeners.end() && i->id == id) {
+ i->received(result);
+ listeners.erase(i);
}
}
-void CompletionTracker::listen(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener)
{
- if (!add(point, listener)) {
+ if (!add(Record(point, listener))) {
listener();
}
}
-bool CompletionTracker::add(const SequenceNumber& point, Listener listener)
+void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener)
+{
+ if (!add(Record(point, listener))) {
+ listener(empty);
+ }
+}
+
+bool CompletionTracker::add(const Record& record)
{
sys::Mutex::ScopedLock l(lock);
- if (point < mark) {
+ if (record.id < mark) {
return false;
} else {
- listeners.push(make_pair(point, listener));
+ //insert at the correct position
+ Listeners::iterator i = seek(record.id);
+ if (i == listeners.end()) i = listeners.begin();
+ listeners.insert(i, record);
+
return true;
}
}
+CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point)
+{
+ Listeners::iterator i = listeners.begin();
+ while (i != listeners.end() && i->id < point) i++;
+ return i;
+}
+
+void CompletionTracker::Record::completed()
+{
+ if (f) f();
+ else if(g) g(empty);//won't get a result if command is now complete
+}
+
+void CompletionTracker::Record::received(const std::string& result)
+{
+ if (g) g(result);
+}
diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h
index 30999b4184..05cdc45c9f 100644
--- a/cpp/src/qpid/client/CompletionTracker.h
+++ b/cpp/src/qpid/client/CompletionTracker.h
@@ -19,7 +19,7 @@
*
*/
-#include <queue>
+#include <list>
#include <boost/function.hpp>
#include "qpid/framing/amqp_framing.h"
#include "qpid/framing/SequenceNumber.h"
@@ -34,19 +34,40 @@ namespace client {
class CompletionTracker
{
public:
- typedef boost::function<void()> Listener;
+ //typedef boost::function<void()> CompletionListener;
+ typedef boost::function0<void> CompletionListener;
+ typedef boost::function<void(const std::string&)> ResultListener;
CompletionTracker();
CompletionTracker(const framing::SequenceNumber& mark);
void completed(const framing::SequenceNumber& mark);
- void listen(const framing::SequenceNumber& point, Listener l);
+ void received(const framing::SequenceNumber& id, const std::string& result);
+ void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l);
+ void listenForResult(const framing::SequenceNumber& point, ResultListener l);
+ void close();
private:
+ struct Record
+ {
+ framing::SequenceNumber id;
+ CompletionListener f;
+ ResultListener g;
+
+ Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {}
+ Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {}
+ void completed();
+ void received(const std::string& result);
+
+ };
+
+ typedef std::list<Record> Listeners;
+
sys::Mutex lock;
framing::SequenceNumber mark;
- std::queue< std::pair<framing::SequenceNumber, Listener> > listeners;
+ Listeners listeners;
- bool add(const framing::SequenceNumber& point, Listener l);
+ bool add(const Record& r);
+ Listeners::iterator seek(const framing::SequenceNumber&);
};
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index b4d2156c31..5ff34cde4e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -60,11 +60,11 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
uint16_t id = frame.getChannel();
- SessionCore::shared_ptr session = sessions[id];
- if (!session) {
+ SessionMap::iterator i = sessions.find(id);
+ if (i == sessions.end()) {
throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
}
- session->handle(frame);
+ i->second->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -111,7 +111,8 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
-void ConnectionImpl::shutdown() {
+void ConnectionImpl::shutdown()
+{
//this indicates that the socket to the server has closed
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
i->second->closed(0, "Unexpected socket closure.");
diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h
new file mode 100644
index 0000000000..1e8c48734d
--- /dev/null
+++ b/cpp/src/qpid/client/Execution.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Execution_
+#define _Execution_
+
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace client {
+
+class Execution
+{
+public:
+ virtual ~Execution() {}
+ virtual void sendSyncRequest() = 0;
+ virtual void sendFlushRequest() = 0;
+ virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0;
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index d10b3d3fe8..1520ba2272 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -97,8 +97,7 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra
void ExecutionHandler::flush()
{
- //send completion
- incoming.lwm = incoming.hwm;
+ sendCompletion();
}
void ExecutionHandler::noop()
@@ -106,48 +105,88 @@ void ExecutionHandler::noop()
//do nothing
}
-void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void ExecutionHandler::result(uint32_t command, const std::string& data)
{
- //TODO: need to signal the result to the appropriate listener
+ completion.received(command, data);
}
void ExecutionHandler::sync()
{
- //TODO: implement (the application is in charge of completion of
- //some commands, so need to track completion for them).
+ //TODO: implement - need to note the mark requested and then
+ //remember to send a response when that point is reached
+}
- //This shouldn't ever need to be called by the server (in my
- //opinion) as the server never needs to synchronise with the
- //clients execution
+void ExecutionHandler::flushTo(const framing::SequenceNumber& point)
+{
+ if (point > outgoing.lwm) {
+ sendFlushRequest();
+ }
}
-void ExecutionHandler::sendFlush()
+void ExecutionHandler::sendFlushRequest()
{
AMQFrame frame(0, ExecutionFlushBody());
- out(frame);
+ out(frame);
}
-void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::syncTo(const framing::SequenceNumber& point)
{
- //allocate id:
- ++outgoing.hwm;
- //register listeners if necessary:
- if (f) {
- completion.listen(outgoing.hwm, f);
- }
- if (g) {
- correlation.listen(g);
+ if (point > outgoing.lwm) {
+ sendSyncRequest();
+ }
+}
+
+
+void ExecutionHandler::sendSyncRequest()
+{
+ AMQFrame frame(0, ExecutionSyncBody());
+ out(frame);
+}
+
+void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send)
+{
+ if (id > completionStatus.mark) {
+ if (cumulative) {
+ completionStatus.update(completionStatus.mark, id);
+ } else {
+ completionStatus.update(id, id);
+ }
}
+ if (send) {
+ sendCompletion();
+ }
+}
- AMQFrame frame(0/*id will be filled in be channel handler*/, command);
+
+void ExecutionHandler::sendCompletion()
+{
+ SequenceNumberSet range;
+ completionStatus.collectRanges(range);
+ AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range));
+ out(frame);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
+{
+ SequenceNumber id = ++outgoing.hwm;
+ if(l) {
+ completion.listenForResult(id, l);
+ }
+ AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
out(frame);
+ return id;
}
-void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f, Correlator::Listener g)
+SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
+ CompletionTracker::ResultListener l)
{
- send(command, f, g);
+ SequenceNumber id = send(command, l);
+ sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ return id;
+}
+void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+{
AMQHeaderBody header;
BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index f740e14185..a42697e26a 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -22,28 +22,34 @@
#define _ExecutionHandler_
#include <queue>
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
+#include "Execution.h"
namespace qpid {
namespace client {
class ExecutionHandler :
private framing::AMQP_ServerOperations::ExecutionHandler,
- public ChainableFrameHandler
+ public ChainableFrameHandler,
+ public Execution
{
framing::Window incoming;
framing::Window outgoing;
framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
+ BlockingQueue<framing::FrameSet::shared_ptr> received;
framing::ProtocolVersion version;
uint64_t maxFrameSize;
+ framing::AccumulatedAck completionStatus;
void complete(uint32_t mark, const framing::SequenceNumberSet& range);
void flush();
@@ -51,22 +57,29 @@ class ExecutionHandler :
void result(uint32_t command, const std::string& data);
void sync();
+ void sendCompletion();
+
+ void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+
public:
- BlockingQueue<framing::FrameSet::shared_ptr> received;
+ typedef CompletionTracker::ResultListener ResultListener;
ExecutionHandler(uint64_t maxFrameSize = 65536);
- void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
-
void handle(framing::AMQFrame& frame);
- void send(const framing::AMQBody& command,
- CompletionTracker::Listener f = CompletionTracker::Listener(),
- Correlator::Listener g = Correlator::Listener());
- void sendContent(const framing::AMQBody& command,
- const framing::BasicHeaderProperties& headers, const std::string& data,
- CompletionTracker::Listener f = CompletionTracker::Listener(),
- Correlator::Listener g = Correlator::Listener());
- void sendFlush();
+ framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener());
+ framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content,
+ ResultListener=ResultListener());
+ void sendSyncRequest();
+ void sendFlushRequest();
+ void completed(const framing::SequenceNumber& id, bool cumulative, bool send);
+ void syncTo(const framing::SequenceNumber& point);
+ void flushTo(const framing::SequenceNumber& point);
+
+ void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
+ Correlator& getCorrelator() { return correlation; }
+ CompletionTracker& getCompletionTracker() { return completion; }
+ BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; }
};
}}
diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h
new file mode 100644
index 0000000000..c2f3b426da
--- /dev/null
+++ b/cpp/src/qpid/client/Future.h
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _Future_
+#define _Future_
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include "qpid/Exception.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/StructHelper.h"
+#include "FutureCompletion.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+class Future : private framing::StructHelper
+{
+ framing::SequenceNumber command;
+ boost::shared_ptr<FutureResponse> response;
+ boost::shared_ptr<FutureResult> result;
+ bool complete;
+
+public:
+ Future() : complete(false) {}
+ Future(const framing::SequenceNumber& id) : command(id), complete(false) {}
+
+ void sync(SessionCore& session)
+ {
+ if (!complete) {
+ FutureCompletion callback;
+ session.getExecution().flushTo(command);
+ session.getExecution().getCompletionTracker().listenForCompletion(
+ command,
+ boost::bind(&FutureCompletion::completed, &callback)
+ );
+ callback.waitForCompletion();
+ session.checkClosed();
+ complete = true;
+ }
+ }
+
+ framing::AMQMethodBody* getResponse(SessionCore& session)
+ {
+ if (response) {
+ session.getExecution().getCompletionTracker().listenForCompletion(
+ command,
+ boost::bind(&FutureResponse::completed, response)
+ );
+ return response->getResponse(session);
+ } else {
+ throw Exception("Response not expected");
+ }
+ }
+
+ template <class T> void decodeResult(T& value, SessionCore& session)
+ {
+ if (result) {
+ decode(value, result->getResult(session));
+ } else {
+ throw Exception("Result not expected");
+ }
+ }
+
+ bool isComplete() {
+ return complete;
+ }
+
+ void setCommandId(const framing::SequenceNumber& id) { command = id; }
+ void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; }
+ void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; }
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/FutureCompletion.cpp b/cpp/src/qpid/client/FutureCompletion.cpp
index 6fc3d5f088..130c65d6aa 100644
--- a/cpp/src/qpid/client/FutureCompletion.cpp
+++ b/cpp/src/qpid/client/FutureCompletion.cpp
@@ -24,9 +24,9 @@
using namespace qpid::client;
using namespace qpid::sys;
-FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {}
+FutureCompletion::FutureCompletion() : complete(false) {}
-bool FutureCompletion::isComplete()
+bool FutureCompletion::isComplete() const
{
Monitor::ScopedLock l(lock);
return complete;
@@ -39,23 +39,10 @@ void FutureCompletion::completed()
lock.notifyAll();
}
-void FutureCompletion::waitForCompletion()
+void FutureCompletion::waitForCompletion() const
{
Monitor::ScopedLock l(lock);
- while (!complete && !closed) {
+ while (!complete) {
lock.wait();
}
- if (closed) {
- throw ChannelException(code, text);
- }
-}
-
-void FutureCompletion::close(uint16_t _code, const std::string& _text)
-{
- Monitor::ScopedLock l(lock);
- complete = true;
- closed = true;
- code = _code;
- text = _text;
- lock.notifyAll();
}
diff --git a/cpp/src/qpid/client/FutureCompletion.h b/cpp/src/qpid/client/FutureCompletion.h
index 3487a0910a..1897230230 100644
--- a/cpp/src/qpid/client/FutureCompletion.h
+++ b/cpp/src/qpid/client/FutureCompletion.h
@@ -31,19 +31,15 @@ namespace client {
class FutureCompletion
{
protected:
- sys::Monitor lock;
+ mutable sys::Monitor lock;
bool complete;
- bool closed;
- uint16_t code;
- std::string text;
public:
FutureCompletion();
virtual ~FutureCompletion(){}
- bool isComplete();
- void waitForCompletion();
+ bool isComplete() const;
+ void waitForCompletion() const;
void completed();
- void close(uint16_t code, const std::string& text);
};
}}
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index e63dc9c192..afdd35c5eb 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -21,14 +21,17 @@
#include "FutureResponse.h"
+#include "SessionCore.h"
+
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
-AMQMethodBody* FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
{
waitForCompletion();
+ session.checkClosed();
return response.get();
}
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
index 75b1f72c04..1e8a7eb456 100644
--- a/cpp/src/qpid/client/FutureResponse.h
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -29,11 +29,13 @@
namespace qpid {
namespace client {
+class SessionCore;
+
class FutureResponse : public FutureCompletion
{
framing::MethodHolder response;
public:
- framing::AMQMethodBody* getResponse();
+ framing::AMQMethodBody* getResponse(SessionCore& session);
void received(framing::AMQMethodBody* response);
};
diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp
new file mode 100644
index 0000000000..a523129206
--- /dev/null
+++ b/cpp/src/qpid/client/FutureResult.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FutureResult.h"
+
+#include "SessionCore.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+const std::string& FutureResult::getResult(SessionCore& session) const
+{
+ waitForCompletion();
+ session.checkClosed();
+ return result;
+}
+
+void FutureResult::received(const std::string& r)
+{
+ Monitor::ScopedLock l(lock);
+ result = r;
+ complete = true;
+ lock.notifyAll();
+}
diff --git a/cpp/src/qpid/client/FutureResult.h b/cpp/src/qpid/client/FutureResult.h
new file mode 100644
index 0000000000..3117b63802
--- /dev/null
+++ b/cpp/src/qpid/client/FutureResult.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _FutureResult_
+#define _FutureResult_
+
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "FutureCompletion.h"
+
+namespace qpid {
+namespace client {
+
+class SessionCore;
+
+class FutureResult : public FutureCompletion
+{
+ std::string result;
+public:
+ const std::string& getResult(SessionCore& session) const;
+ void received(const std::string& result);
+};
+
+}}
+
+
+
+#endif
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
index 7866df916c..f9a9f97b75 100644
--- a/cpp/src/qpid/client/Response.h
+++ b/cpp/src/qpid/client/Response.h
@@ -24,34 +24,27 @@
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
-#include "FutureResponse.h"
+#include "Completion.h"
namespace qpid {
namespace client {
-class Response
+class Response : public Completion
{
- boost::shared_ptr<FutureResponse> future;
-
public:
- Response(boost::shared_ptr<FutureResponse> f) : future(f) {}
+ Response(Future f, SessionCore::shared_ptr s) : Completion(f, s) {}
template <class T> T& as()
{
- framing::AMQMethodBody* response(future->getResponse());
- assert(response);
+ framing::AMQMethodBody* response(future.getResponse(*session));
return *boost::polymorphic_downcast<T*>(response);
}
+
template <class T> bool isA()
{
- framing::AMQMethodBody* response(future->getResponse());
+ framing::AMQMethodBody* response(future.getResponse(*session));
return response && response->isA<T>();
}
-
- void sync()
- {
- return future->waitForCompletion();
- }
};
}}
diff --git a/cpp/src/qpid/client/ScopedAssociation.h b/cpp/src/qpid/client/ScopedAssociation.h
new file mode 100644
index 0000000000..861a28c0f8
--- /dev/null
+++ b/cpp/src/qpid/client/ScopedAssociation.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ScopedAssociation_
+#define _ScopedAssociation_
+
+#include "ConnectionImpl.h"
+#include "SessionCore.h"
+
+namespace qpid {
+namespace client {
+
+struct ScopedAssociation
+{
+ typedef boost::shared_ptr<ScopedAssociation> shared_ptr;
+
+ SessionCore::shared_ptr session;
+ ConnectionImpl::shared_ptr connection;
+
+ ScopedAssociation() {}
+
+ ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c)
+ {
+ connection->allocated(session);
+ }
+
+ ~ScopedAssociation()
+ {
+ if (connection && session) connection->released(session);
+ }
+};
+
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 1b04e74af4..8dfe42989b 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -21,12 +21,15 @@
#include "SessionCore.h"
#include <boost/bind.hpp>
+#include "Future.h"
+#include "FutureResponse.h"
+#include "FutureResult.h"
using namespace qpid::client;
using namespace qpid::framing;
SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out,
- uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false)
+ uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false)
{
l2.out = boost::bind(&FrameHandler::handle, out, _1);
l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1);
@@ -39,47 +42,15 @@ void SessionCore::open()
l2.open(id);
}
-void SessionCore::flush()
+ExecutionHandler& SessionCore::getExecution()
{
- l3.sendFlush();
-}
-
-Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
-{
- boost::shared_ptr<FutureResponse> f(futures.createResponse());
- if (expectResponse) {
- l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
- } else {
- l3.send(method, boost::bind(&FutureResponse::completed, f));
- }
- if (sync) {
- flush();
- f->waitForCompletion();
- }
- return Response(f);
-}
-
-Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
-{
- //TODO: lots of duplication between these two send methods; refactor
- boost::shared_ptr<FutureResponse> f(futures.createResponse());
- if (expectResponse) {
- l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
- boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1));
- } else {
- l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(),
- boost::bind(&FutureResponse::completed, f));
- }
- if (sync) {
- flush();
- f->waitForCompletion();
- }
- return Response(f);
+ checkClosed();
+ return l3;
}
FrameSet::shared_ptr SessionCore::get()
{
- return l3.received.pop();
+ return l3.getReceived().pop();
}
void SessionCore::setSync(bool s)
@@ -95,12 +66,13 @@ bool SessionCore::isSync()
void SessionCore::close()
{
l2.close();
- l3.received.close();
+ stop();
}
void SessionCore::stop()
{
- l3.received.close();
+ l3.getReceived().close();
+ l3.getCompletionTracker().close();
}
void SessionCore::handle(AMQFrame& frame)
@@ -110,6 +82,46 @@ void SessionCore::handle(AMQFrame& frame)
void SessionCore::closed(uint16_t code, const std::string& text)
{
- l3.received.close();
- futures.close(code, text);
+ stop();
+
+ isClosed = true;
+ reason.code = code;
+ reason.text = text;
+}
+
+void SessionCore::checkClosed()
+{
+ if (isClosed) {
+ throw ChannelException(reason.code, reason.text);
+ }
+}
+
+Future SessionCore::send(const AMQBody& command)
+{
+ Future f;
+ //any result/response listeners must be set before the command is sent
+ if (command.getMethod()->resultExpected()) {
+ boost::shared_ptr<FutureResult> r(new FutureResult());
+ f.setFutureResult(r);
+ //result listener is tied to command id, and is set when that
+ //is allocated by the execution handler, so pass it to send
+ f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1)));
+ } else {
+ if (command.getMethod()->responseExpected()) {
+ boost::shared_ptr<FutureResponse> r(new FutureResponse());
+ f.setFutureResponse(r);
+ l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1));
+ }
+
+ f.setCommandId(l3.send(command));
+ }
+ return f;
+}
+
+Future SessionCore::send(const AMQBody& command, const MethodContent& content)
+{
+ //content bearing methods don't currently have responses or
+ //results, if that changes should follow procedure for the other
+ //send method impl:
+ return Future(l3.send(command, content));
}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 0febb956b9..80fe13715f 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -22,6 +22,7 @@
#ifndef _SessionCore_
#define _SessionCore_
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/FrameHandler.h"
@@ -29,35 +30,44 @@
#include "qpid/framing/MethodContent.h"
#include "ChannelHandler.h"
#include "ExecutionHandler.h"
-#include "FutureFactory.h"
-#include "Response.h"
namespace qpid {
namespace client {
+class Future;
+
class SessionCore : public framing::FrameHandler
{
+ struct Reason
+ {
+ uint16_t code;
+ std::string text;
+ };
+
ExecutionHandler l3;
ChannelHandler l2;
- FutureFactory futures;
const uint16_t id;
bool sync;
+ bool isClosed;
+ Reason reason;
public:
typedef boost::shared_ptr<SessionCore> shared_ptr;
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
- Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
- Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
framing::FrameSet::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
bool isSync();
- void flush();
void open();
void close();
void stop();
void closed(uint16_t code, const std::string& text);
+ void checkClosed();
+ ExecutionHandler& getExecution();
+
+ Future send(const framing::AMQBody& command);
+ Future send(const framing::AMQBody& command, const framing::MethodContent& content);
//for incoming frames:
void handle(framing::AMQFrame& frame);
diff --git a/cpp/src/qpid/client/TypedResult.h b/cpp/src/qpid/client/TypedResult.h
new file mode 100644
index 0000000000..38892c42bd
--- /dev/null
+++ b/cpp/src/qpid/client/TypedResult.h
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef _TypedResult_
+#define _TypedResult_
+
+#include "Completion.h"
+
+namespace qpid {
+namespace client {
+
+template <class T> class TypedResult : public Completion
+{
+ T result;
+ bool decoded;
+
+public:
+ TypedResult(Future f, SessionCore::shared_ptr s) : Completion(f, s), decoded(false) {}
+
+ T& get()
+ {
+ if (!decoded) {
+ future.decodeResult(result, *session);
+ decoded = true;
+ }
+
+ return result;
+ }
+};
+
+}}
+
+#endif
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index a5c14a37e9..f0043d9d3b 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -50,7 +50,9 @@ class AMQMethodBody : public AMQBody {
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
virtual bool isContentBearing() const = 0;
-
+ virtual bool resultExpected() const = 0;
+ virtual bool responseExpected() const = 0;
+
void invoke(AMQP_ServerOperations&);
bool invoke(Invocable*);
diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp
index 5603f39410..9daae5494c 100644
--- a/cpp/src/qpid/broker/AccumulatedAck.cpp
+++ b/cpp/src/qpid/framing/AccumulatedAck.cpp
@@ -26,9 +26,9 @@
using std::list;
using std::max;
using std::min;
-using namespace qpid::broker;
+using namespace qpid::framing;
-void AccumulatedAck::update(DeliveryId first, DeliveryId last){
+void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){
assert(first <= last);
if (last < mark) return;
@@ -84,7 +84,7 @@ void AccumulatedAck::clear(){
ranges.clear();
}
-bool AccumulatedAck::covers(DeliveryId tag) const{
+bool AccumulatedAck::covers(SequenceNumber tag) const{
if (tag <= mark) return true;
for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
if (i->contains(tag)) return true;
@@ -92,7 +92,15 @@ bool AccumulatedAck::covers(DeliveryId tag) const{
return false;
}
-bool Range::contains(DeliveryId i) const
+void AccumulatedAck::collectRanges(SequenceNumberSet& set) const
+{
+ for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ set.push_back(i->start);
+ set.push_back(i->end);
+ }
+}
+
+bool Range::contains(SequenceNumber i) const
{
return i >= start && i <= end;
}
@@ -113,7 +121,7 @@ bool Range::merge(const Range& r)
}
}
-bool Range::mergeable(const DeliveryId& s) const
+bool Range::mergeable(const SequenceNumber& s) const
{
if (contains(s) || start - s == 1) {
return true;
@@ -122,11 +130,11 @@ bool Range::mergeable(const DeliveryId& s) const
}
}
-Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {}
+Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {}
namespace qpid{
-namespace broker{
+namespace framing{
std::ostream& operator<<(std::ostream& out, const Range& r)
{
out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]";
diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h
index 9c7cc3d887..f75842968f 100644
--- a/cpp/src/qpid/broker/AccumulatedAck.h
+++ b/cpp/src/qpid/framing/AccumulatedAck.h
@@ -25,21 +25,22 @@
#include <functional>
#include <list>
#include <ostream>
-#include "DeliveryId.h"
+#include "SequenceNumber.h"
+#include "SequenceNumberSet.h"
namespace qpid {
- namespace broker {
+ namespace framing {
struct Range
{
- DeliveryId start;
- DeliveryId end;
+ SequenceNumber start;
+ SequenceNumber end;
- Range(DeliveryId s, DeliveryId e);
- bool contains(DeliveryId i) const;
+ Range(SequenceNumber s, SequenceNumber e);
+ bool contains(SequenceNumber i) const;
bool intersect(const Range& r) const;
bool merge(const Range& r);
- bool mergeable(const DeliveryId& r) const;
+ bool mergeable(const SequenceNumber& r) const;
};
/**
* Keeps an accumulated record of acked messages (by delivery
@@ -50,18 +51,19 @@ namespace qpid {
/**
* Everything up to this value has been acked.
*/
- DeliveryId mark;
+ SequenceNumber mark;
/**
* List of individually acked messages greater than the
* 'mark'.
*/
std::list<Range> ranges;
- explicit AccumulatedAck(DeliveryId r) : mark(r) {}
- void update(DeliveryId firstTag, DeliveryId lastTag);
+ explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {}
+ void update(SequenceNumber firstTag, SequenceNumber lastTag);
void consolidate();
void clear();
- bool covers(DeliveryId tag) const;
+ bool covers(SequenceNumber tag) const;
+ void collectRanges(SequenceNumberSet& set) const;
};
std::ostream& operator<<(std::ostream&, const Range&);
std::ostream& operator<<(std::ostream&, const AccumulatedAck&);
diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h
index 753a593523..6b111e1f9e 100644
--- a/cpp/src/qpid/framing/StructHelper.h
+++ b/cpp/src/qpid/framing/StructHelper.h
@@ -44,7 +44,7 @@ public:
rbuffer.getRawData(data, size);
}
- template <class T> void decode(T t, std::string& data) {
+ template <class T> void decode(T& t, const std::string& data) {
char* bytes = static_cast<char*>(::alloca(data.length()));
Buffer wbuffer(bytes, data.length());
wbuffer.putRawData(data);
diff --git a/cpp/src/tests/AccumulatedAckTest.cpp b/cpp/src/tests/AccumulatedAckTest.cpp
index 601af532fa..62245e463b 100644
--- a/cpp/src/tests/AccumulatedAckTest.cpp
+++ b/cpp/src/tests/AccumulatedAckTest.cpp
@@ -19,13 +19,13 @@
* under the License.
*
*/
-#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/framing/AccumulatedAck.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include <list>
using std::list;
-using namespace qpid::broker;
+using namespace qpid::framing;
class AccumulatedAckTest : public CppUnit::TestCase
{
@@ -44,12 +44,12 @@ class AccumulatedAckTest : public CppUnit::TestCase
public:
bool covers(const AccumulatedAck& ack, int i)
{
- return ack.covers(DeliveryId(i));
+ return ack.covers(SequenceNumber(i));
}
void update(AccumulatedAck& ack, int start, int end)
{
- ack.update(DeliveryId(start), DeliveryId(end));
+ ack.update(SequenceNumber(start), SequenceNumber(end));
}
void testGeneral()
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
new file mode 100644
index 0000000000..1acac9c980
--- /dev/null
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <vector>
+#include "qpid_test_plugin.h"
+#include "InProcessBroker.h"
+#include "qpid/client/Session.h"
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+class ClientSessionTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ClientSessionTest);
+ CPPUNIT_TEST(testQueueQuery);;
+ CPPUNIT_TEST_SUITE_END();
+
+ boost::shared_ptr<Connector> broker;
+ Connection connection;
+ Session session;
+
+ public:
+
+ ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker)
+ {
+ connection.open("");
+ session = connection.newSession();
+ }
+
+ void testQueueQuery()
+ {
+ std::string name("my-queue");
+ std::string alternate("amq.fanout");
+ session.queueDeclare(0, name, alternate, false, false, true, true, FieldTable());
+ TypedResult<QueueQueryResult> result = session.queueQuery(name);
+ CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
+ CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
+ CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
+ }
+
+ void testCompletion()
+ {
+ std::string queue("my-queue");
+ std::string dest("my-dest");
+ session.queueDeclare(0, queue, "", false, false, true, true, FieldTable());
+ //subcribe to the queue with confirm_mode = 1
+ session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable());
+ //publish some messages
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest);
diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h
index 0e5f3895f9..531ebd8fa7 100644
--- a/cpp/src/tests/InProcessBroker.h
+++ b/cpp/src/tests/InProcessBroker.h
@@ -101,6 +101,7 @@ class InProcessBroker : public client::Connector {
) : sender(sender_), conversation(conversation_), in(ih) {}
void send(framing::AMQFrame& frame) {
+ //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl;
conversation.push_back(TaggedFrame(sender, frame));
in->received(frame);
}
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 874a9a448a..de7a12c027 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -94,10 +94,11 @@ broker_unit_tests = \
TxPublishTest \
ValueTest \
MessageHandlerTest \
- MessageBuilderTest
+ MessageBuilderTest \
+ ClientSessionTest
#client_unit_tests = \
- ClientChannelTest
+ ClientChannelTest
framing_unit_tests = \
FieldTableTest \