diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-06 20:27:33 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-06 20:27:33 +0000 |
commit | b33a63b36c659a894143382d0a61efe6a598fcc6 (patch) | |
tree | 0efc848ae9cc6064d615c6968b1d127e92b231d3 /cpp/src | |
parent | 748698e4b8d5bd0c3ccec4ca898d334c13fc0795 (diff) | |
download | qpid-python-b33a63b36c659a894143382d0a61efe6a598fcc6.tar.gz |
Implementation of execution.result on the client side
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
39 files changed, 826 insertions, 228 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a3c10d53f4..d97265c1d6 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -98,6 +98,7 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(rgen_common_cpp) \ $(platform_src) \ + qpid/framing/AccumulatedAck.cpp \ qpid/framing/AMQBody.cpp \ qpid/framing/AMQMethodBody.cpp \ qpid/framing/AMQContentBody.cpp \ @@ -155,7 +156,6 @@ libqpidcommon_la_SOURCES = \ libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams libqpidbroker_la_SOURCES = \ - qpid/broker/AccumulatedAck.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ @@ -218,14 +218,13 @@ libqpidclient_la_SOURCES = \ qpid/client/ExecutionHandler.cpp \ qpid/client/FutureCompletion.cpp \ qpid/client/FutureResponse.cpp \ - qpid/client/FutureFactory.cpp \ + qpid/client/FutureResult.cpp \ qpid/client/SessionCore.cpp \ qpid/client/StateManager.cpp nobase_include_HEADERS = \ $(platform_hdr) \ - qpid/broker/AccumulatedAck.h \ qpid/broker/BrokerExchange.h \ qpid/broker/BrokerQueue.h \ qpid/broker/Consumer.h \ @@ -295,6 +294,7 @@ nobase_include_HEADERS = \ qpid/client/Connection.h \ qpid/client/ConnectionImpl.h \ qpid/client/Connector.h \ + qpid/client/Completion.h \ qpid/client/MessageListener.h \ qpid/client/BlockingQueue.h \ qpid/client/Correlator.h \ @@ -302,13 +302,18 @@ nobase_include_HEADERS = \ qpid/client/ChannelHandler.h \ qpid/client/ChainableFrameHandler.h \ qpid/client/ConnectionHandler.h \ + qpid/client/Execution.h \ qpid/client/ExecutionHandler.h \ + qpid/client/Future.h \ qpid/client/FutureCompletion.h \ qpid/client/FutureResponse.h \ - qpid/client/FutureFactory.h \ + qpid/client/FutureResult.h \ qpid/client/Response.h \ + qpid/client/ScopedAssociation.h \ qpid/client/SessionCore.h \ qpid/client/StateManager.h \ + qpid/client/TypedResult.h \ + qpid/framing/AccumulatedAck.h \ qpid/framing/AMQBody.h \ qpid/framing/AMQContentBody.h \ qpid/framing/AMQDataBlock.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 1a44b09188..b3a8e135a3 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -171,7 +171,6 @@ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) queue->getSettings(), queue->getMessageCount(), queue->getConsumerCount()); - } void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 7649715ade..9b33fd5f10 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -65,7 +65,7 @@ bool DeliveryRecord::after(DeliveryId tag) const{ return id > tag; } -bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ +bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{ return range->covers(id); } diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 583579ac10..3caac6bf40 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -25,7 +25,7 @@ #include <list> #include <vector> #include <ostream> -#include "AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "BrokerQueue.h" #include "Consumer.h" #include "DeliveryId.h" @@ -56,7 +56,7 @@ class DeliveryRecord{ bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; - bool coveredBy(const AccumulatedAck* const range) const; + bool coveredBy(const framing::AccumulatedAck* const range) const; void requeue() const; void release(); void reject(); diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index badf3564e7..25186b4102 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -26,7 +26,7 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) +DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 84afd00c9c..c61b279c42 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -24,7 +24,7 @@ #include <algorithm> #include <functional> #include <list> -#include "AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -34,7 +34,7 @@ namespace qpid { std::list<DeliveryRecord> pending; public: - DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h index 8458f4cabf..6b9d3e9557 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/Session.h @@ -22,7 +22,6 @@ * */ -#include "AccumulatedAck.h" #include "Consumer.h" #include "Deliverable.h" #include "DeliveryAdapter.h" @@ -35,6 +34,7 @@ #include "TxBuffer.h" #include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AccumulatedAck.h" #include "qpid/shared_ptr.h" #include <boost/ptr_container/ptr_vector.hpp> @@ -116,7 +116,7 @@ class Session : public framing::FrameHandler::Chains, TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; - AccumulatedAck accumulatedAck; + framing::AccumulatedAck accumulatedAck; bool opened; bool flowActive; diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp index 958dbcbec0..05ea755d71 100644 --- a/cpp/src/qpid/broker/TxAck.cpp +++ b/cpp/src/qpid/broker/TxAck.cpp @@ -25,6 +25,7 @@ using std::bind1st; using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; +using qpid::framing::AccumulatedAck; TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){ diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h index 5e6d0a370c..c8383b6314 100644 --- a/cpp/src/qpid/broker/TxAck.h +++ b/cpp/src/qpid/broker/TxAck.h @@ -24,7 +24,7 @@ #include <algorithm> #include <functional> #include <list> -#include "AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -35,7 +35,7 @@ namespace qpid { * transactional channel. */ class TxAck : public TxOp{ - AccumulatedAck& acked; + framing::AccumulatedAck& acked; std::list<DeliveryRecord>& unacked; public: @@ -44,7 +44,7 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index cc2b7aedc8..1a0fd25bc3 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -56,7 +56,7 @@ class ScopedSync Channel::Channel(bool _transactional, u_int16_t _prefetch) : prefetch(_prefetch), transactional(_transactional), running(false), - uniqueId(true)/*could eventually be the session id*/, nameCounter(0) + uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) { } @@ -65,26 +65,25 @@ Channel::~Channel() join(); } -void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) +void Channel::open(const Session& s) { + Mutex::ScopedLock l(lock); if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); - - connection = c; - sessionCore = s; - session = auto_ptr<Session>(new Session(c, s)); + active = true; + session = s; } bool Channel::isOpen() const { Mutex::ScopedLock l(lock); - return connection; + return active; } void Channel::setQos() { - session->basicQos(0, getPrefetch(), false); + session.basicQos(0, getPrefetch(), false); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - session->txSelect(); + session.txSelect(); } } @@ -95,13 +94,13 @@ void Channel::setPrefetch(uint16_t _prefetch){ void Channel::declareExchange(Exchange& exchange, bool synch){ FieldTable args; - ScopedSync s(*session, synch); - session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); + ScopedSync s(session, synch); + session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); } void Channel::deleteExchange(Exchange& exchange, bool synch){ - ScopedSync s(*session, synch); - session->exchangeDelete(0, exchange.getName(), false); + ScopedSync s(session, synch); + session.exchangeDelete(0, exchange.getName(), false); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -112,30 +111,30 @@ void Channel::declareQueue(Queue& queue, bool synch){ } FieldTable args; - ScopedSync s(*session, synch); - session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), + ScopedSync s(session, synch); + session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), args); } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - ScopedSync s(*session, synch); - session->queueDelete(0, queue.getName(), ifunused, ifempty); + ScopedSync s(session, synch); + session.queueDelete(0, queue.getName(), ifunused, ifempty); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - ScopedSync s(*session, synch); - session->queueBind(0, q, e, key, args); + ScopedSync s(session, synch); + session.queueBind(0, q, e, key, args); } void Channel::commit(){ - session->txCommit(); + session.txCommit(); } void Channel::rollback(){ - session->txRollback(); + session.txRollback(); } void Channel::consume( @@ -155,8 +154,8 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } - ScopedSync s(*session, synch); - session->basicConsume(0, queue.getName(), tag, noLocal, + ScopedSync s(session, synch); + session.basicConsume(0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable()); } @@ -171,13 +170,13 @@ void Channel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - ScopedSync s(*session, synch); - session->basicCancel(tag); + ScopedSync s(session, synch); + session.basicCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK); - sessionCore->flush();//TODO: need to expose the ability to request completion info through session + Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK); + session.execution().sendFlushRequest(); if (response.isA<BasicGetEmptyBody>()) { return false; } else { @@ -194,19 +193,15 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - session->basicPublish(0, e, key, mandatory, immediate, msg); + session.basicPublish(0, e, key, mandatory, immediate, msg); } void Channel::close() { - session->close(); + session.close(); { Mutex::ScopedLock l(lock); - if (connection); - { - sessionCore.reset(); - connection.reset(); - } + active = false; } stop(); } @@ -232,7 +227,7 @@ void Channel::join() { void Channel::run() { try { while (true) { - FrameSet::shared_ptr content = session->get(); + FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: if (content->isA<BasicDeliverBody>()) { ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index c355fe007a..7ba4b0a246 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -79,18 +79,17 @@ class Channel : private sys::Runnable bool running; ConsumerMap consumers; - ConnectionImpl::shared_ptr connection; - std::auto_ptr<Session> session; - SessionCore::shared_ptr sessionCore; + Session session; framing::ChannelId channelId; BlockingQueue<framing::FrameSet::shared_ptr> gets; framing::Uuid uniqueId; uint32_t nameCounter; + bool active; void stop(); void setQos(); - void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr); + void open(const Session& session); void closeInternal(); void join(); diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index e1581503f9..8c5f83f9f5 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -25,6 +25,7 @@ #include "Connection.h" #include "ClientChannel.h" #include "ClientMessage.h" +#include "ScopedAssociation.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -66,18 +67,15 @@ void Connection::open( } void Connection::openChannel(Channel& channel) { - ChannelId id = ++channelIdCounter; - SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - impl->allocated(session); - channel.open(impl, session); - session->open(); + channel.open(newSession()); } Session Connection::newSession() { ChannelId id = ++channelIdCounter; SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - impl->allocated(session); - return Session(impl, session); + ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); + session->open(); + return Session(assoc); } void Connection::close() diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h new file mode 100644 index 0000000000..000bba2138 --- /dev/null +++ b/cpp/src/qpid/client/Completion.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _Completion_ +#define _Completion_ + +#include <boost/shared_ptr.hpp> +#include "Future.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +class Completion +{ +protected: + Future future; + SessionCore::shared_ptr session; + +public: + Completion(Future f, SessionCore::shared_ptr s) : future(f), session(s) {} + + void sync() + { + future.sync(*session); + } + + bool isComplete() { + return future.isComplete(); + } +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp index 996971dbd2..46a7384ac2 100644 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -20,45 +20,101 @@ */ #include "CompletionTracker.h" +#include <algorithm> using qpid::client::CompletionTracker; using namespace qpid::framing; using namespace boost; +namespace +{ +const std::string empty; +} + CompletionTracker::CompletionTracker() {} CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} +void CompletionTracker::close() +{ + sys::Mutex::ScopedLock l(lock); + while (!listeners.empty()) { + Record r(listeners.front()); + { + sys::Mutex::ScopedUnlock u(lock); + r.completed(); + } + listeners.pop_front(); + } +} void CompletionTracker::completed(const SequenceNumber& _mark) { sys::Mutex::ScopedLock l(lock); mark = _mark; - while (!listeners.empty() && !(listeners.front().first > mark)) { - Listener f(listeners.front().second); + while (!listeners.empty() && !(listeners.front().id > mark)) { + Record r(listeners.front()); { sys::Mutex::ScopedUnlock u(lock); - f(); + r.completed(); } - listeners.pop(); + listeners.pop_front(); + } +} + +void CompletionTracker::received(const SequenceNumber& id, const std::string& result) +{ + sys::Mutex::ScopedLock l(lock); + Listeners::iterator i = seek(id); + if (i != listeners.end() && i->id == id) { + i->received(result); + listeners.erase(i); } } -void CompletionTracker::listen(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener) { - if (!add(point, listener)) { + if (!add(Record(point, listener))) { listener(); } } -bool CompletionTracker::add(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener) +{ + if (!add(Record(point, listener))) { + listener(empty); + } +} + +bool CompletionTracker::add(const Record& record) { sys::Mutex::ScopedLock l(lock); - if (point < mark) { + if (record.id < mark) { return false; } else { - listeners.push(make_pair(point, listener)); + //insert at the correct position + Listeners::iterator i = seek(record.id); + if (i == listeners.end()) i = listeners.begin(); + listeners.insert(i, record); + return true; } } +CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point) +{ + Listeners::iterator i = listeners.begin(); + while (i != listeners.end() && i->id < point) i++; + return i; +} + +void CompletionTracker::Record::completed() +{ + if (f) f(); + else if(g) g(empty);//won't get a result if command is now complete +} + +void CompletionTracker::Record::received(const std::string& result) +{ + if (g) g(result); +} diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h index 30999b4184..05cdc45c9f 100644 --- a/cpp/src/qpid/client/CompletionTracker.h +++ b/cpp/src/qpid/client/CompletionTracker.h @@ -19,7 +19,7 @@ * */ -#include <queue> +#include <list> #include <boost/function.hpp> #include "qpid/framing/amqp_framing.h" #include "qpid/framing/SequenceNumber.h" @@ -34,19 +34,40 @@ namespace client { class CompletionTracker { public: - typedef boost::function<void()> Listener; + //typedef boost::function<void()> CompletionListener; + typedef boost::function0<void> CompletionListener; + typedef boost::function<void(const std::string&)> ResultListener; CompletionTracker(); CompletionTracker(const framing::SequenceNumber& mark); void completed(const framing::SequenceNumber& mark); - void listen(const framing::SequenceNumber& point, Listener l); + void received(const framing::SequenceNumber& id, const std::string& result); + void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l); + void listenForResult(const framing::SequenceNumber& point, ResultListener l); + void close(); private: + struct Record + { + framing::SequenceNumber id; + CompletionListener f; + ResultListener g; + + Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {} + Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {} + void completed(); + void received(const std::string& result); + + }; + + typedef std::list<Record> Listeners; + sys::Mutex lock; framing::SequenceNumber mark; - std::queue< std::pair<framing::SequenceNumber, Listener> > listeners; + Listeners listeners; - bool add(const framing::SequenceNumber& point, Listener l); + bool add(const Record& r); + Listeners::iterator seek(const framing::SequenceNumber&); }; } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index b4d2156c31..5ff34cde4e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -60,11 +60,11 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { uint16_t id = frame.getChannel(); - SessionCore::shared_ptr session = sessions[id]; - if (!session) { + SessionMap::iterator i = sessions.find(id); + if (i == sessions.end()) { throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); } - session->handle(frame); + i->second->handle(frame); } void ConnectionImpl::open(const std::string& host, int port, @@ -111,7 +111,8 @@ void ConnectionImpl::idleOut() connector->send(frame); } -void ConnectionImpl::shutdown() { +void ConnectionImpl::shutdown() +{ //this indicates that the socket to the server has closed for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { i->second->closed(0, "Unexpected socket closure."); diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h new file mode 100644 index 0000000000..1e8c48734d --- /dev/null +++ b/cpp/src/qpid/client/Execution.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Execution_ +#define _Execution_ + +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace client { + +class Execution +{ +public: + virtual ~Execution() {} + virtual void sendSyncRequest() = 0; + virtual void sendFlushRequest() = 0; + virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0; +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index d10b3d3fe8..1520ba2272 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -97,8 +97,7 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra void ExecutionHandler::flush() { - //send completion - incoming.lwm = incoming.hwm; + sendCompletion(); } void ExecutionHandler::noop() @@ -106,48 +105,88 @@ void ExecutionHandler::noop() //do nothing } -void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void ExecutionHandler::result(uint32_t command, const std::string& data) { - //TODO: need to signal the result to the appropriate listener + completion.received(command, data); } void ExecutionHandler::sync() { - //TODO: implement (the application is in charge of completion of - //some commands, so need to track completion for them). + //TODO: implement - need to note the mark requested and then + //remember to send a response when that point is reached +} - //This shouldn't ever need to be called by the server (in my - //opinion) as the server never needs to synchronise with the - //clients execution +void ExecutionHandler::flushTo(const framing::SequenceNumber& point) +{ + if (point > outgoing.lwm) { + sendFlushRequest(); + } } -void ExecutionHandler::sendFlush() +void ExecutionHandler::sendFlushRequest() { AMQFrame frame(0, ExecutionFlushBody()); - out(frame); + out(frame); } -void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g) +void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { - //allocate id: - ++outgoing.hwm; - //register listeners if necessary: - if (f) { - completion.listen(outgoing.hwm, f); - } - if (g) { - correlation.listen(g); + if (point > outgoing.lwm) { + sendSyncRequest(); + } +} + + +void ExecutionHandler::sendSyncRequest() +{ + AMQFrame frame(0, ExecutionSyncBody()); + out(frame); +} + +void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) +{ + if (id > completionStatus.mark) { + if (cumulative) { + completionStatus.update(completionStatus.mark, id); + } else { + completionStatus.update(id, id); + } } + if (send) { + sendCompletion(); + } +} - AMQFrame frame(0/*id will be filled in be channel handler*/, command); + +void ExecutionHandler::sendCompletion() +{ + SequenceNumberSet range; + completionStatus.collectRanges(range); + AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range)); + out(frame); +} + +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) +{ + SequenceNumber id = ++outgoing.hwm; + if(l) { + completion.listenForResult(id, l); + } + AMQFrame frame(0/*channel will be filled in be channel handler*/, command); out(frame); + return id; } -void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, - CompletionTracker::Listener f, Correlator::Listener g) +SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, + CompletionTracker::ResultListener l) { - send(command, f, g); + SequenceNumber id = send(command, l); + sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData()); + return id; +} +void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data) +{ AMQHeaderBody header; BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers); header.get<BasicHeaderProperties>(true)->setContentLength(data.size()); diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index f740e14185..a42697e26a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -22,28 +22,34 @@ #define _ExecutionHandler_ #include <queue> +#include "qpid/framing/AccumulatedAck.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameSet.h" +#include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceNumber.h" #include "BlockingQueue.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" +#include "Execution.h" namespace qpid { namespace client { class ExecutionHandler : private framing::AMQP_ServerOperations::ExecutionHandler, - public ChainableFrameHandler + public ChainableFrameHandler, + public Execution { framing::Window incoming; framing::Window outgoing; framing::FrameSet::shared_ptr arriving; Correlator correlation; CompletionTracker completion; + BlockingQueue<framing::FrameSet::shared_ptr> received; framing::ProtocolVersion version; uint64_t maxFrameSize; + framing::AccumulatedAck completionStatus; void complete(uint32_t mark, const framing::SequenceNumberSet& range); void flush(); @@ -51,22 +57,29 @@ class ExecutionHandler : void result(uint32_t command, const std::string& data); void sync(); + void sendCompletion(); + + void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data); + public: - BlockingQueue<framing::FrameSet::shared_ptr> received; + typedef CompletionTracker::ResultListener ResultListener; ExecutionHandler(uint64_t maxFrameSize = 65536); - void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } - void handle(framing::AMQFrame& frame); - void send(const framing::AMQBody& command, - CompletionTracker::Listener f = CompletionTracker::Listener(), - Correlator::Listener g = Correlator::Listener()); - void sendContent(const framing::AMQBody& command, - const framing::BasicHeaderProperties& headers, const std::string& data, - CompletionTracker::Listener f = CompletionTracker::Listener(), - Correlator::Listener g = Correlator::Listener()); - void sendFlush(); + framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); + framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, + ResultListener=ResultListener()); + void sendSyncRequest(); + void sendFlushRequest(); + void completed(const framing::SequenceNumber& id, bool cumulative, bool send); + void syncTo(const framing::SequenceNumber& point); + void flushTo(const framing::SequenceNumber& point); + + void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } + Correlator& getCorrelator() { return correlation; } + CompletionTracker& getCompletionTracker() { return completion; } + BlockingQueue<framing::FrameSet::shared_ptr>& getReceived() { return received; } }; }} diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h new file mode 100644 index 0000000000..c2f3b426da --- /dev/null +++ b/cpp/src/qpid/client/Future.h @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _Future_ +#define _Future_ + +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> +#include "qpid/Exception.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/StructHelper.h" +#include "FutureCompletion.h" +#include "FutureResponse.h" +#include "FutureResult.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +class Future : private framing::StructHelper +{ + framing::SequenceNumber command; + boost::shared_ptr<FutureResponse> response; + boost::shared_ptr<FutureResult> result; + bool complete; + +public: + Future() : complete(false) {} + Future(const framing::SequenceNumber& id) : command(id), complete(false) {} + + void sync(SessionCore& session) + { + if (!complete) { + FutureCompletion callback; + session.getExecution().flushTo(command); + session.getExecution().getCompletionTracker().listenForCompletion( + command, + boost::bind(&FutureCompletion::completed, &callback) + ); + callback.waitForCompletion(); + session.checkClosed(); + complete = true; + } + } + + framing::AMQMethodBody* getResponse(SessionCore& session) + { + if (response) { + session.getExecution().getCompletionTracker().listenForCompletion( + command, + boost::bind(&FutureResponse::completed, response) + ); + return response->getResponse(session); + } else { + throw Exception("Response not expected"); + } + } + + template <class T> void decodeResult(T& value, SessionCore& session) + { + if (result) { + decode(value, result->getResult(session)); + } else { + throw Exception("Result not expected"); + } + } + + bool isComplete() { + return complete; + } + + void setCommandId(const framing::SequenceNumber& id) { command = id; } + void setFutureResponse(boost::shared_ptr<FutureResponse> r) { response = r; } + void setFutureResult(boost::shared_ptr<FutureResult> r) { result = r; } +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/FutureCompletion.cpp b/cpp/src/qpid/client/FutureCompletion.cpp index 6fc3d5f088..130c65d6aa 100644 --- a/cpp/src/qpid/client/FutureCompletion.cpp +++ b/cpp/src/qpid/client/FutureCompletion.cpp @@ -24,9 +24,9 @@ using namespace qpid::client; using namespace qpid::sys; -FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {} +FutureCompletion::FutureCompletion() : complete(false) {} -bool FutureCompletion::isComplete() +bool FutureCompletion::isComplete() const { Monitor::ScopedLock l(lock); return complete; @@ -39,23 +39,10 @@ void FutureCompletion::completed() lock.notifyAll(); } -void FutureCompletion::waitForCompletion() +void FutureCompletion::waitForCompletion() const { Monitor::ScopedLock l(lock); - while (!complete && !closed) { + while (!complete) { lock.wait(); } - if (closed) { - throw ChannelException(code, text); - } -} - -void FutureCompletion::close(uint16_t _code, const std::string& _text) -{ - Monitor::ScopedLock l(lock); - complete = true; - closed = true; - code = _code; - text = _text; - lock.notifyAll(); } diff --git a/cpp/src/qpid/client/FutureCompletion.h b/cpp/src/qpid/client/FutureCompletion.h index 3487a0910a..1897230230 100644 --- a/cpp/src/qpid/client/FutureCompletion.h +++ b/cpp/src/qpid/client/FutureCompletion.h @@ -31,19 +31,15 @@ namespace client { class FutureCompletion { protected: - sys::Monitor lock; + mutable sys::Monitor lock; bool complete; - bool closed; - uint16_t code; - std::string text; public: FutureCompletion(); virtual ~FutureCompletion(){} - bool isComplete(); - void waitForCompletion(); + bool isComplete() const; + void waitForCompletion() const; void completed(); - void close(uint16_t code, const std::string& text); }; }} diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index e63dc9c192..afdd35c5eb 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -21,14 +21,17 @@ #include "FutureResponse.h" +#include "SessionCore.h" + using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -AMQMethodBody* FutureResponse::getResponse() +AMQMethodBody* FutureResponse::getResponse(SessionCore& session) { waitForCompletion(); + session.checkClosed(); return response.get(); } diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h index 75b1f72c04..1e8a7eb456 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/FutureResponse.h @@ -29,11 +29,13 @@ namespace qpid { namespace client { +class SessionCore; + class FutureResponse : public FutureCompletion { framing::MethodHolder response; public: - framing::AMQMethodBody* getResponse(); + framing::AMQMethodBody* getResponse(SessionCore& session); void received(framing::AMQMethodBody* response); }; diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp new file mode 100644 index 0000000000..a523129206 --- /dev/null +++ b/cpp/src/qpid/client/FutureResult.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FutureResult.h" + +#include "SessionCore.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::sys; + +const std::string& FutureResult::getResult(SessionCore& session) const +{ + waitForCompletion(); + session.checkClosed(); + return result; +} + +void FutureResult::received(const std::string& r) +{ + Monitor::ScopedLock l(lock); + result = r; + complete = true; + lock.notifyAll(); +} diff --git a/cpp/src/qpid/client/FutureResult.h b/cpp/src/qpid/client/FutureResult.h new file mode 100644 index 0000000000..3117b63802 --- /dev/null +++ b/cpp/src/qpid/client/FutureResult.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _FutureResult_ +#define _FutureResult_ + +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "FutureCompletion.h" + +namespace qpid { +namespace client { + +class SessionCore; + +class FutureResult : public FutureCompletion +{ + std::string result; +public: + const std::string& getResult(SessionCore& session) const; + void received(const std::string& result); +}; + +}} + + + +#endif diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h index 7866df916c..f9a9f97b75 100644 --- a/cpp/src/qpid/client/Response.h +++ b/cpp/src/qpid/client/Response.h @@ -24,34 +24,27 @@ #include <boost/shared_ptr.hpp> #include "qpid/framing/amqp_framing.h" -#include "FutureResponse.h" +#include "Completion.h" namespace qpid { namespace client { -class Response +class Response : public Completion { - boost::shared_ptr<FutureResponse> future; - public: - Response(boost::shared_ptr<FutureResponse> f) : future(f) {} + Response(Future f, SessionCore::shared_ptr s) : Completion(f, s) {} template <class T> T& as() { - framing::AMQMethodBody* response(future->getResponse()); - assert(response); + framing::AMQMethodBody* response(future.getResponse(*session)); return *boost::polymorphic_downcast<T*>(response); } + template <class T> bool isA() { - framing::AMQMethodBody* response(future->getResponse()); + framing::AMQMethodBody* response(future.getResponse(*session)); return response && response->isA<T>(); } - - void sync() - { - return future->waitForCompletion(); - } }; }} diff --git a/cpp/src/qpid/client/ScopedAssociation.h b/cpp/src/qpid/client/ScopedAssociation.h new file mode 100644 index 0000000000..861a28c0f8 --- /dev/null +++ b/cpp/src/qpid/client/ScopedAssociation.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ScopedAssociation_ +#define _ScopedAssociation_ + +#include "ConnectionImpl.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +struct ScopedAssociation +{ + typedef boost::shared_ptr<ScopedAssociation> shared_ptr; + + SessionCore::shared_ptr session; + ConnectionImpl::shared_ptr connection; + + ScopedAssociation() {} + + ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c) + { + connection->allocated(session); + } + + ~ScopedAssociation() + { + if (connection && session) connection->released(session); + } +}; + + +}} + +#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 1b04e74af4..8dfe42989b 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -21,12 +21,15 @@ #include "SessionCore.h" #include <boost/bind.hpp> +#include "Future.h" +#include "FutureResponse.h" +#include "FutureResult.h" using namespace qpid::client; using namespace qpid::framing; SessionCore::SessionCore(uint16_t _id, boost::shared_ptr<framing::FrameHandler> out, - uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false) + uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false) { l2.out = boost::bind(&FrameHandler::handle, out, _1); l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1); @@ -39,47 +42,15 @@ void SessionCore::open() l2.open(id); } -void SessionCore::flush() +ExecutionHandler& SessionCore::getExecution() { - l3.sendFlush(); -} - -Response SessionCore::send(const AMQMethodBody& method, bool expectResponse) -{ - boost::shared_ptr<FutureResponse> f(futures.createResponse()); - if (expectResponse) { - l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1)); - } else { - l3.send(method, boost::bind(&FutureResponse::completed, f)); - } - if (sync) { - flush(); - f->waitForCompletion(); - } - return Response(f); -} - -Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse) -{ - //TODO: lots of duplication between these two send methods; refactor - boost::shared_ptr<FutureResponse> f(futures.createResponse()); - if (expectResponse) { - l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), - boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1)); - } else { - l3.sendContent(method, dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData(), - boost::bind(&FutureResponse::completed, f)); - } - if (sync) { - flush(); - f->waitForCompletion(); - } - return Response(f); + checkClosed(); + return l3; } FrameSet::shared_ptr SessionCore::get() { - return l3.received.pop(); + return l3.getReceived().pop(); } void SessionCore::setSync(bool s) @@ -95,12 +66,13 @@ bool SessionCore::isSync() void SessionCore::close() { l2.close(); - l3.received.close(); + stop(); } void SessionCore::stop() { - l3.received.close(); + l3.getReceived().close(); + l3.getCompletionTracker().close(); } void SessionCore::handle(AMQFrame& frame) @@ -110,6 +82,46 @@ void SessionCore::handle(AMQFrame& frame) void SessionCore::closed(uint16_t code, const std::string& text) { - l3.received.close(); - futures.close(code, text); + stop(); + + isClosed = true; + reason.code = code; + reason.text = text; +} + +void SessionCore::checkClosed() +{ + if (isClosed) { + throw ChannelException(reason.code, reason.text); + } +} + +Future SessionCore::send(const AMQBody& command) +{ + Future f; + //any result/response listeners must be set before the command is sent + if (command.getMethod()->resultExpected()) { + boost::shared_ptr<FutureResult> r(new FutureResult()); + f.setFutureResult(r); + //result listener is tied to command id, and is set when that + //is allocated by the execution handler, so pass it to send + f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1))); + } else { + if (command.getMethod()->responseExpected()) { + boost::shared_ptr<FutureResponse> r(new FutureResponse()); + f.setFutureResponse(r); + l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1)); + } + + f.setCommandId(l3.send(command)); + } + return f; +} + +Future SessionCore::send(const AMQBody& command, const MethodContent& content) +{ + //content bearing methods don't currently have responses or + //results, if that changes should follow procedure for the other + //send method impl: + return Future(l3.send(command, content)); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 0febb956b9..80fe13715f 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -22,6 +22,7 @@ #ifndef _SessionCore_ #define _SessionCore_ +#include <boost/function.hpp> #include <boost/shared_ptr.hpp> #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/FrameHandler.h" @@ -29,35 +30,44 @@ #include "qpid/framing/MethodContent.h" #include "ChannelHandler.h" #include "ExecutionHandler.h" -#include "FutureFactory.h" -#include "Response.h" namespace qpid { namespace client { +class Future; + class SessionCore : public framing::FrameHandler { + struct Reason + { + uint16_t code; + std::string text; + }; + ExecutionHandler l3; ChannelHandler l2; - FutureFactory futures; const uint16_t id; bool sync; + bool isClosed; + Reason reason; public: typedef boost::shared_ptr<SessionCore> shared_ptr; SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize); - Response send(const framing::AMQMethodBody& method, bool expectResponse = false); - Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false); framing::FrameSet::shared_ptr get(); uint16_t getId() const { return id; } void setSync(bool); bool isSync(); - void flush(); void open(); void close(); void stop(); void closed(uint16_t code, const std::string& text); + void checkClosed(); + ExecutionHandler& getExecution(); + + Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); //for incoming frames: void handle(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/client/TypedResult.h b/cpp/src/qpid/client/TypedResult.h new file mode 100644 index 0000000000..38892c42bd --- /dev/null +++ b/cpp/src/qpid/client/TypedResult.h @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _TypedResult_ +#define _TypedResult_ + +#include "Completion.h" + +namespace qpid { +namespace client { + +template <class T> class TypedResult : public Completion +{ + T result; + bool decoded; + +public: + TypedResult(Future f, SessionCore::shared_ptr s) : Completion(f, s), decoded(false) {} + + T& get() + { + if (!decoded) { + future.decodeResult(result, *session); + decoded = true; + } + + return result; + } +}; + +}} + +#endif diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index a5c14a37e9..f0043d9d3b 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -50,7 +50,9 @@ class AMQMethodBody : public AMQBody { virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; virtual bool isContentBearing() const = 0; - + virtual bool resultExpected() const = 0; + virtual bool responseExpected() const = 0; + void invoke(AMQP_ServerOperations&); bool invoke(Invocable*); diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp index 5603f39410..9daae5494c 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.cpp +++ b/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -26,9 +26,9 @@ using std::list; using std::max; using std::min; -using namespace qpid::broker; +using namespace qpid::framing; -void AccumulatedAck::update(DeliveryId first, DeliveryId last){ +void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ assert(first <= last); if (last < mark) return; @@ -84,7 +84,7 @@ void AccumulatedAck::clear(){ ranges.clear(); } -bool AccumulatedAck::covers(DeliveryId tag) const{ +bool AccumulatedAck::covers(SequenceNumber tag) const{ if (tag <= mark) return true; for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { if (i->contains(tag)) return true; @@ -92,7 +92,15 @@ bool AccumulatedAck::covers(DeliveryId tag) const{ return false; } -bool Range::contains(DeliveryId i) const +void AccumulatedAck::collectRanges(SequenceNumberSet& set) const +{ + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + set.push_back(i->start); + set.push_back(i->end); + } +} + +bool Range::contains(SequenceNumber i) const { return i >= start && i <= end; } @@ -113,7 +121,7 @@ bool Range::merge(const Range& r) } } -bool Range::mergeable(const DeliveryId& s) const +bool Range::mergeable(const SequenceNumber& s) const { if (contains(s) || start - s == 1) { return true; @@ -122,11 +130,11 @@ bool Range::mergeable(const DeliveryId& s) const } } -Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {} +Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} namespace qpid{ -namespace broker{ +namespace framing{ std::ostream& operator<<(std::ostream& out, const Range& r) { out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h index 9c7cc3d887..f75842968f 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.h +++ b/cpp/src/qpid/framing/AccumulatedAck.h @@ -25,21 +25,22 @@ #include <functional> #include <list> #include <ostream> -#include "DeliveryId.h" +#include "SequenceNumber.h" +#include "SequenceNumberSet.h" namespace qpid { - namespace broker { + namespace framing { struct Range { - DeliveryId start; - DeliveryId end; + SequenceNumber start; + SequenceNumber end; - Range(DeliveryId s, DeliveryId e); - bool contains(DeliveryId i) const; + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; bool intersect(const Range& r) const; bool merge(const Range& r); - bool mergeable(const DeliveryId& r) const; + bool mergeable(const SequenceNumber& r) const; }; /** * Keeps an accumulated record of acked messages (by delivery @@ -50,18 +51,19 @@ namespace qpid { /** * Everything up to this value has been acked. */ - DeliveryId mark; + SequenceNumber mark; /** * List of individually acked messages greater than the * 'mark'. */ std::list<Range> ranges; - explicit AccumulatedAck(DeliveryId r) : mark(r) {} - void update(DeliveryId firstTag, DeliveryId lastTag); + explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {} + void update(SequenceNumber firstTag, SequenceNumber lastTag); void consolidate(); void clear(); - bool covers(DeliveryId tag) const; + bool covers(SequenceNumber tag) const; + void collectRanges(SequenceNumberSet& set) const; }; std::ostream& operator<<(std::ostream&, const Range&); std::ostream& operator<<(std::ostream&, const AccumulatedAck&); diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h index 753a593523..6b111e1f9e 100644 --- a/cpp/src/qpid/framing/StructHelper.h +++ b/cpp/src/qpid/framing/StructHelper.h @@ -44,7 +44,7 @@ public: rbuffer.getRawData(data, size); } - template <class T> void decode(T t, std::string& data) { + template <class T> void decode(T& t, const std::string& data) { char* bytes = static_cast<char*>(::alloca(data.length())); Buffer wbuffer(bytes, data.length()); wbuffer.putRawData(data); diff --git a/cpp/src/tests/AccumulatedAckTest.cpp b/cpp/src/tests/AccumulatedAckTest.cpp index 601af532fa..62245e463b 100644 --- a/cpp/src/tests/AccumulatedAckTest.cpp +++ b/cpp/src/tests/AccumulatedAckTest.cpp @@ -19,13 +19,13 @@ * under the License. * */ -#include "qpid/broker/AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "qpid_test_plugin.h" #include <iostream> #include <list> using std::list; -using namespace qpid::broker; +using namespace qpid::framing; class AccumulatedAckTest : public CppUnit::TestCase { @@ -44,12 +44,12 @@ class AccumulatedAckTest : public CppUnit::TestCase public: bool covers(const AccumulatedAck& ack, int i) { - return ack.covers(DeliveryId(i)); + return ack.covers(SequenceNumber(i)); } void update(AccumulatedAck& ack, int start, int end) { - ack.update(DeliveryId(start), DeliveryId(end)); + ack.update(SequenceNumber(start), SequenceNumber(end)); } void testGeneral() diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp new file mode 100644 index 0000000000..1acac9c980 --- /dev/null +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <vector> +#include "qpid_test_plugin.h" +#include "InProcessBroker.h" +#include "qpid/client/Session.h" + +using namespace qpid::client; +using namespace qpid::framing; + +class ClientSessionTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ClientSessionTest); + CPPUNIT_TEST(testQueueQuery);; + CPPUNIT_TEST_SUITE_END(); + + boost::shared_ptr<Connector> broker; + Connection connection; + Session session; + + public: + + ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) + { + connection.open(""); + session = connection.newSession(); + } + + void testQueueQuery() + { + std::string name("my-queue"); + std::string alternate("amq.fanout"); + session.queueDeclare(0, name, alternate, false, false, true, true, FieldTable()); + TypedResult<QueueQueryResult> result = session.queueQuery(name); + CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); + CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive()); + CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); + } + + void testCompletion() + { + std::string queue("my-queue"); + std::string dest("my-dest"); + session.queueDeclare(0, queue, "", false, false, true, true, FieldTable()); + //subcribe to the queue with confirm_mode = 1 + session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable()); + //publish some messages + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest); diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 0e5f3895f9..531ebd8fa7 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -101,6 +101,7 @@ class InProcessBroker : public client::Connector { ) : sender(sender_), conversation(conversation_), in(ih) {} void send(framing::AMQFrame& frame) { + //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl; conversation.push_back(TaggedFrame(sender, frame)); in->received(frame); } diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 874a9a448a..de7a12c027 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -94,10 +94,11 @@ broker_unit_tests = \ TxPublishTest \ ValueTest \ MessageHandlerTest \ - MessageBuilderTest + MessageBuilderTest \ + ClientSessionTest #client_unit_tests = \ - ClientChannelTest + ClientChannelTest framing_unit_tests = \ FieldTableTest \ |