From b33a63b36c659a894143382d0a61efe6a598fcc6 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 6 Sep 2007 20:27:33 +0000 Subject: Implementation of execution.result on the client side git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@573359 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/Makefile.am | 13 ++- cpp/src/qpid/broker/AccumulatedAck.cpp | 146 ---------------------------- cpp/src/qpid/broker/AccumulatedAck.h | 72 -------------- cpp/src/qpid/broker/BrokerAdapter.cpp | 1 - cpp/src/qpid/broker/DeliveryRecord.cpp | 2 +- cpp/src/qpid/broker/DeliveryRecord.h | 4 +- cpp/src/qpid/broker/DtxAck.cpp | 2 +- cpp/src/qpid/broker/DtxAck.h | 4 +- cpp/src/qpid/broker/Session.h | 4 +- cpp/src/qpid/broker/TxAck.cpp | 1 + cpp/src/qpid/broker/TxAck.h | 6 +- cpp/src/qpid/client/ClientChannel.cpp | 65 ++++++------- cpp/src/qpid/client/ClientChannel.h | 7 +- cpp/src/qpid/client/ClientConnection.cpp | 12 +-- cpp/src/qpid/client/Completion.h | 53 ++++++++++ cpp/src/qpid/client/CompletionTracker.cpp | 74 ++++++++++++-- cpp/src/qpid/client/CompletionTracker.h | 31 +++++- cpp/src/qpid/client/ConnectionImpl.cpp | 9 +- cpp/src/qpid/client/Execution.h | 40 ++++++++ cpp/src/qpid/client/ExecutionHandler.cpp | 87 ++++++++++++----- cpp/src/qpid/client/ExecutionHandler.h | 37 ++++--- cpp/src/qpid/client/Future.h | 97 +++++++++++++++++++ cpp/src/qpid/client/FutureCompletion.cpp | 21 +--- cpp/src/qpid/client/FutureCompletion.h | 10 +- cpp/src/qpid/client/FutureResponse.cpp | 5 +- cpp/src/qpid/client/FutureResponse.h | 4 +- cpp/src/qpid/client/FutureResult.cpp | 43 +++++++++ cpp/src/qpid/client/FutureResult.h | 46 +++++++++ cpp/src/qpid/client/Response.h | 19 ++-- cpp/src/qpid/client/ScopedAssociation.h | 53 ++++++++++ cpp/src/qpid/client/SessionCore.cpp | 94 ++++++++++-------- cpp/src/qpid/client/SessionCore.h | 22 +++-- cpp/src/qpid/client/TypedResult.h | 51 ++++++++++ cpp/src/qpid/framing/AMQMethodBody.h | 4 +- cpp/src/qpid/framing/AccumulatedAck.cpp | 154 ++++++++++++++++++++++++++++++ cpp/src/qpid/framing/AccumulatedAck.h | 74 ++++++++++++++ cpp/src/qpid/framing/StructHelper.h | 2 +- cpp/src/tests/AccumulatedAckTest.cpp | 8 +- cpp/src/tests/ClientSessionTest.cpp | 71 ++++++++++++++ cpp/src/tests/InProcessBroker.h | 1 + cpp/src/tests/Makefile.am | 5 +- 41 files changed, 1026 insertions(+), 428 deletions(-) delete mode 100644 cpp/src/qpid/broker/AccumulatedAck.cpp delete mode 100644 cpp/src/qpid/broker/AccumulatedAck.h create mode 100644 cpp/src/qpid/client/Completion.h create mode 100644 cpp/src/qpid/client/Execution.h create mode 100644 cpp/src/qpid/client/Future.h create mode 100644 cpp/src/qpid/client/FutureResult.cpp create mode 100644 cpp/src/qpid/client/FutureResult.h create mode 100644 cpp/src/qpid/client/ScopedAssociation.h create mode 100644 cpp/src/qpid/client/TypedResult.h create mode 100644 cpp/src/qpid/framing/AccumulatedAck.cpp create mode 100644 cpp/src/qpid/framing/AccumulatedAck.h create mode 100644 cpp/src/tests/ClientSessionTest.cpp (limited to 'cpp/src') diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index a3c10d53f4..d97265c1d6 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -98,6 +98,7 @@ libqpidcommon_la_LIBADD = \ libqpidcommon_la_SOURCES = \ $(rgen_common_cpp) \ $(platform_src) \ + qpid/framing/AccumulatedAck.cpp \ qpid/framing/AMQBody.cpp \ qpid/framing/AMQMethodBody.cpp \ qpid/framing/AMQContentBody.cpp \ @@ -155,7 +156,6 @@ libqpidcommon_la_SOURCES = \ libqpidbroker_la_LIBADD = libqpidcommon.la -lboost_iostreams libqpidbroker_la_SOURCES = \ - qpid/broker/AccumulatedAck.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ @@ -218,14 +218,13 @@ libqpidclient_la_SOURCES = \ qpid/client/ExecutionHandler.cpp \ qpid/client/FutureCompletion.cpp \ qpid/client/FutureResponse.cpp \ - qpid/client/FutureFactory.cpp \ + qpid/client/FutureResult.cpp \ qpid/client/SessionCore.cpp \ qpid/client/StateManager.cpp nobase_include_HEADERS = \ $(platform_hdr) \ - qpid/broker/AccumulatedAck.h \ qpid/broker/BrokerExchange.h \ qpid/broker/BrokerQueue.h \ qpid/broker/Consumer.h \ @@ -295,6 +294,7 @@ nobase_include_HEADERS = \ qpid/client/Connection.h \ qpid/client/ConnectionImpl.h \ qpid/client/Connector.h \ + qpid/client/Completion.h \ qpid/client/MessageListener.h \ qpid/client/BlockingQueue.h \ qpid/client/Correlator.h \ @@ -302,13 +302,18 @@ nobase_include_HEADERS = \ qpid/client/ChannelHandler.h \ qpid/client/ChainableFrameHandler.h \ qpid/client/ConnectionHandler.h \ + qpid/client/Execution.h \ qpid/client/ExecutionHandler.h \ + qpid/client/Future.h \ qpid/client/FutureCompletion.h \ qpid/client/FutureResponse.h \ - qpid/client/FutureFactory.h \ + qpid/client/FutureResult.h \ qpid/client/Response.h \ + qpid/client/ScopedAssociation.h \ qpid/client/SessionCore.h \ qpid/client/StateManager.h \ + qpid/client/TypedResult.h \ + qpid/framing/AccumulatedAck.h \ qpid/framing/AMQBody.h \ qpid/framing/AMQContentBody.h \ qpid/framing/AMQDataBlock.h \ diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp deleted file mode 100644 index 5603f39410..0000000000 --- a/cpp/src/qpid/broker/AccumulatedAck.cpp +++ /dev/null @@ -1,146 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "AccumulatedAck.h" - -#include -#include - -using std::list; -using std::max; -using std::min; -using namespace qpid::broker; - -void AccumulatedAck::update(DeliveryId first, DeliveryId last){ - assert(first <= last); - if (last < mark) return; - - - Range r(first, last); - bool handled = false; - bool markMerged = false; - list::iterator merged = ranges.end(); - if (r.mergeable(mark)) { - mark = r.end; - markMerged = true; - handled = true; - } else { - for (list::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) { - if (i->merge(r)) { - merged = i; - handled = true; - } else if (r.start < i->start) { - ranges.insert(i, r); - handled = true; - } - } - } - if (!handled) { - ranges.push_back(r); - } else { - while (!ranges.empty() && ranges.front().end <= mark) { - ranges.pop_front(); - } - if (markMerged) { - //new range is incorporated, but may be possible to consolidate - merged = ranges.begin(); - while (merged != ranges.end() && merged->mergeable(mark)) { - mark = merged->end; - merged = ranges.erase(merged); - } - } - if (merged != ranges.end()) { - //consolidate ranges - list::iterator i = merged; - list::iterator j = i++; - while (i != ranges.end() && j->merge(*i)) { - j = i++; - } - } - } -} - -void AccumulatedAck::consolidate(){} - -void AccumulatedAck::clear(){ - mark = 0;//not sure that this is valid when wraparound is a possibility - ranges.clear(); -} - -bool AccumulatedAck::covers(DeliveryId tag) const{ - if (tag <= mark) return true; - for (list::const_iterator i = ranges.begin(); i != ranges.end(); i++) { - if (i->contains(tag)) return true; - } - return false; -} - -bool Range::contains(DeliveryId i) const -{ - return i >= start && i <= end; -} - -bool Range::intersect(const Range& r) const -{ - return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); -} - -bool Range::merge(const Range& r) -{ - if (intersect(r) || mergeable(r.end) || r.mergeable(end)) { - start = min(start, r.start); - end = max(end, r.end); - return true; - } else { - return false; - } -} - -bool Range::mergeable(const DeliveryId& s) const -{ - if (contains(s) || start - s == 1) { - return true; - } else { - return false; - } -} - -Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {} - - -namespace qpid{ -namespace broker{ - std::ostream& operator<<(std::ostream& out, const Range& r) - { - out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; - return out; - } - - std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a) - { - out << "{mark: " << a.mark.getValue() << ", ranges: ("; - for (list::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) { - if (i != a.ranges.begin()) out << ", "; - out << *i; - } - out << ")]"; - return out; - } -}} diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h deleted file mode 100644 index 9c7cc3d887..0000000000 --- a/cpp/src/qpid/broker/AccumulatedAck.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _AccumulatedAck_ -#define _AccumulatedAck_ - -#include -#include -#include -#include -#include "DeliveryId.h" - -namespace qpid { - namespace broker { - - struct Range - { - DeliveryId start; - DeliveryId end; - - Range(DeliveryId s, DeliveryId e); - bool contains(DeliveryId i) const; - bool intersect(const Range& r) const; - bool merge(const Range& r); - bool mergeable(const DeliveryId& r) const; - }; - /** - * Keeps an accumulated record of acked messages (by delivery - * tag). - */ - class AccumulatedAck { - public: - /** - * Everything up to this value has been acked. - */ - DeliveryId mark; - /** - * List of individually acked messages greater than the - * 'mark'. - */ - std::list ranges; - - explicit AccumulatedAck(DeliveryId r) : mark(r) {} - void update(DeliveryId firstTag, DeliveryId lastTag); - void consolidate(); - void clear(); - bool covers(DeliveryId tag) const; - }; - std::ostream& operator<<(std::ostream&, const Range&); - std::ostream& operator<<(std::ostream&, const AccumulatedAck&); - } -} - - -#endif diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 1a44b09188..b3a8e135a3 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -171,7 +171,6 @@ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) queue->getSettings(), queue->getMessageCount(), queue->getConsumerCount()); - } void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 7649715ade..9b33fd5f10 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -65,7 +65,7 @@ bool DeliveryRecord::after(DeliveryId tag) const{ return id > tag; } -bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ +bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{ return range->covers(id); } diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 583579ac10..3caac6bf40 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -25,7 +25,7 @@ #include #include #include -#include "AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "BrokerQueue.h" #include "Consumer.h" #include "DeliveryId.h" @@ -56,7 +56,7 @@ class DeliveryRecord{ bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; - bool coveredBy(const AccumulatedAck* const range) const; + bool coveredBy(const framing::AccumulatedAck* const range) const; void requeue() const; void release(); void reject(); diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index badf3564e7..25186b4102 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -26,7 +26,7 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -DtxAck::DtxAck(const AccumulatedAck& acked, std::list& unacked) +DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list& unacked) { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 84afd00c9c..c61b279c42 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -24,7 +24,7 @@ #include #include #include -#include "AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -34,7 +34,7 @@ namespace qpid { std::list pending; public: - DtxAck(const AccumulatedAck& acked, std::list& unacked); + DtxAck(const framing::AccumulatedAck& acked, std::list& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/Session.h index 8458f4cabf..6b9d3e9557 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/Session.h @@ -22,7 +22,6 @@ * */ -#include "AccumulatedAck.h" #include "Consumer.h" #include "Deliverable.h" #include "DeliveryAdapter.h" @@ -35,6 +34,7 @@ #include "TxBuffer.h" #include "SemanticHandler.h" // FIXME aconway 2007-08-31: remove #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/AccumulatedAck.h" #include "qpid/shared_ptr.h" #include @@ -116,7 +116,7 @@ class Session : public framing::FrameHandler::Chains, TxBuffer::shared_ptr txBuffer; DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; - AccumulatedAck accumulatedAck; + framing::AccumulatedAck accumulatedAck; bool opened; bool flowActive; diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp index 958dbcbec0..05ea755d71 100644 --- a/cpp/src/qpid/broker/TxAck.cpp +++ b/cpp/src/qpid/broker/TxAck.cpp @@ -25,6 +25,7 @@ using std::bind1st; using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; +using qpid::framing::AccumulatedAck; TxAck::TxAck(AccumulatedAck& _acked, std::list& _unacked) : acked(_acked), unacked(_unacked){ diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h index 5e6d0a370c..c8383b6314 100644 --- a/cpp/src/qpid/broker/TxAck.h +++ b/cpp/src/qpid/broker/TxAck.h @@ -24,7 +24,7 @@ #include #include #include -#include "AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -35,7 +35,7 @@ namespace qpid { * transactional channel. */ class TxAck : public TxOp{ - AccumulatedAck& acked; + framing::AccumulatedAck& acked; std::list& unacked; public: @@ -44,7 +44,7 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAck(AccumulatedAck& acked, std::list& unacked); + TxAck(framing::AccumulatedAck& acked, std::list& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index cc2b7aedc8..1a0fd25bc3 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -56,7 +56,7 @@ class ScopedSync Channel::Channel(bool _transactional, u_int16_t _prefetch) : prefetch(_prefetch), transactional(_transactional), running(false), - uniqueId(true)/*could eventually be the session id*/, nameCounter(0) + uniqueId(true)/*could eventually be the session id*/, nameCounter(0), active(false) { } @@ -65,26 +65,25 @@ Channel::~Channel() join(); } -void Channel::open(ConnectionImpl::shared_ptr c, SessionCore::shared_ptr s) +void Channel::open(const Session& s) { + Mutex::ScopedLock l(lock); if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); - - connection = c; - sessionCore = s; - session = auto_ptr(new Session(c, s)); + active = true; + session = s; } bool Channel::isOpen() const { Mutex::ScopedLock l(lock); - return connection; + return active; } void Channel::setQos() { - session->basicQos(0, getPrefetch(), false); + session.basicQos(0, getPrefetch(), false); if(isTransactional()) { //I think this is wrong! should only send TxSelect once... - session->txSelect(); + session.txSelect(); } } @@ -95,13 +94,13 @@ void Channel::setPrefetch(uint16_t _prefetch){ void Channel::declareExchange(Exchange& exchange, bool synch){ FieldTable args; - ScopedSync s(*session, synch); - session->exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); + ScopedSync s(session, synch); + session.exchangeDeclare(0, exchange.getName(), exchange.getType(), empty, false, false, false, args); } void Channel::deleteExchange(Exchange& exchange, bool synch){ - ScopedSync s(*session, synch); - session->exchangeDelete(0, exchange.getName(), false); + ScopedSync s(session, synch); + session.exchangeDelete(0, exchange.getName(), false); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -112,30 +111,30 @@ void Channel::declareQueue(Queue& queue, bool synch){ } FieldTable args; - ScopedSync s(*session, synch); - session->queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), + ScopedSync s(session, synch); + session.queueDeclare(0, queue.getName(), empty, false/*passive*/, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), args); } void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch){ - ScopedSync s(*session, synch); - session->queueDelete(0, queue.getName(), ifunused, ifempty); + ScopedSync s(session, synch); + session.queueDelete(0, queue.getName(), ifunused, ifempty); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - ScopedSync s(*session, synch); - session->queueBind(0, q, e, key, args); + ScopedSync s(session, synch); + session.queueBind(0, q, e, key, args); } void Channel::commit(){ - session->txCommit(); + session.txCommit(); } void Channel::rollback(){ - session->txRollback(); + session.txRollback(); } void Channel::consume( @@ -155,8 +154,8 @@ void Channel::consume( c.ackMode = ackMode; c.lastDeliveryTag = 0; } - ScopedSync s(*session, synch); - session->basicConsume(0, queue.getName(), tag, noLocal, + ScopedSync s(session, synch); + session.basicConsume(0, queue.getName(), tag, noLocal, ackMode == NO_ACK, false, !synch, fields ? *fields : FieldTable()); } @@ -171,13 +170,13 @@ void Channel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - ScopedSync s(*session, synch); - session->basicCancel(tag); + ScopedSync s(session, synch); + session.basicCancel(tag); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - Response response = session->basicGet(0, queue.getName(), ackMode == NO_ACK); - sessionCore->flush();//TODO: need to expose the ability to request completion info through session + Response response = session.basicGet(0, queue.getName(), ackMode == NO_ACK); + session.execution().sendFlushRequest(); if (response.isA()) { return false; } else { @@ -194,19 +193,15 @@ void Channel::publish(const Message& msg, const Exchange& exchange, const string e = exchange.getName(); string key = routingKey; - session->basicPublish(0, e, key, mandatory, immediate, msg); + session.basicPublish(0, e, key, mandatory, immediate, msg); } void Channel::close() { - session->close(); + session.close(); { Mutex::ScopedLock l(lock); - if (connection); - { - sessionCore.reset(); - connection.reset(); - } + active = false; } stop(); } @@ -232,7 +227,7 @@ void Channel::join() { void Channel::run() { try { while (true) { - FrameSet::shared_ptr content = session->get(); + FrameSet::shared_ptr content = session.get(); //need to dispatch this to the relevant listener: if (content->isA()) { ConsumerMap::iterator i = consumers.find(content->as()->getConsumerTag()); diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index c355fe007a..7ba4b0a246 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -79,18 +79,17 @@ class Channel : private sys::Runnable bool running; ConsumerMap consumers; - ConnectionImpl::shared_ptr connection; - std::auto_ptr session; - SessionCore::shared_ptr sessionCore; + Session session; framing::ChannelId channelId; BlockingQueue gets; framing::Uuid uniqueId; uint32_t nameCounter; + bool active; void stop(); void setQos(); - void open(ConnectionImpl::shared_ptr, SessionCore::shared_ptr); + void open(const Session& session); void closeInternal(); void join(); diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index e1581503f9..8c5f83f9f5 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -25,6 +25,7 @@ #include "Connection.h" #include "ClientChannel.h" #include "ClientMessage.h" +#include "ScopedAssociation.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" @@ -66,18 +67,15 @@ void Connection::open( } void Connection::openChannel(Channel& channel) { - ChannelId id = ++channelIdCounter; - SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - impl->allocated(session); - channel.open(impl, session); - session->open(); + channel.open(newSession()); } Session Connection::newSession() { ChannelId id = ++channelIdCounter; SessionCore::shared_ptr session(new SessionCore(id, impl, max_frame_size)); - impl->allocated(session); - return Session(impl, session); + ScopedAssociation::shared_ptr assoc(new ScopedAssociation(session, impl)); + session->open(); + return Session(assoc); } void Connection::close() diff --git a/cpp/src/qpid/client/Completion.h b/cpp/src/qpid/client/Completion.h new file mode 100644 index 0000000000..000bba2138 --- /dev/null +++ b/cpp/src/qpid/client/Completion.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _Completion_ +#define _Completion_ + +#include +#include "Future.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +class Completion +{ +protected: + Future future; + SessionCore::shared_ptr session; + +public: + Completion(Future f, SessionCore::shared_ptr s) : future(f), session(s) {} + + void sync() + { + future.sync(*session); + } + + bool isComplete() { + return future.isComplete(); + } +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp index 996971dbd2..46a7384ac2 100644 --- a/cpp/src/qpid/client/CompletionTracker.cpp +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -20,45 +20,101 @@ */ #include "CompletionTracker.h" +#include using qpid::client::CompletionTracker; using namespace qpid::framing; using namespace boost; +namespace +{ +const std::string empty; +} + CompletionTracker::CompletionTracker() {} CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} +void CompletionTracker::close() +{ + sys::Mutex::ScopedLock l(lock); + while (!listeners.empty()) { + Record r(listeners.front()); + { + sys::Mutex::ScopedUnlock u(lock); + r.completed(); + } + listeners.pop_front(); + } +} void CompletionTracker::completed(const SequenceNumber& _mark) { sys::Mutex::ScopedLock l(lock); mark = _mark; - while (!listeners.empty() && !(listeners.front().first > mark)) { - Listener f(listeners.front().second); + while (!listeners.empty() && !(listeners.front().id > mark)) { + Record r(listeners.front()); { sys::Mutex::ScopedUnlock u(lock); - f(); + r.completed(); } - listeners.pop(); + listeners.pop_front(); + } +} + +void CompletionTracker::received(const SequenceNumber& id, const std::string& result) +{ + sys::Mutex::ScopedLock l(lock); + Listeners::iterator i = seek(id); + if (i != listeners.end() && i->id == id) { + i->received(result); + listeners.erase(i); } } -void CompletionTracker::listen(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForCompletion(const SequenceNumber& point, CompletionListener listener) { - if (!add(point, listener)) { + if (!add(Record(point, listener))) { listener(); } } -bool CompletionTracker::add(const SequenceNumber& point, Listener listener) +void CompletionTracker::listenForResult(const SequenceNumber& point, ResultListener listener) +{ + if (!add(Record(point, listener))) { + listener(empty); + } +} + +bool CompletionTracker::add(const Record& record) { sys::Mutex::ScopedLock l(lock); - if (point < mark) { + if (record.id < mark) { return false; } else { - listeners.push(make_pair(point, listener)); + //insert at the correct position + Listeners::iterator i = seek(record.id); + if (i == listeners.end()) i = listeners.begin(); + listeners.insert(i, record); + return true; } } +CompletionTracker::Listeners::iterator CompletionTracker::seek(const framing::SequenceNumber& point) +{ + Listeners::iterator i = listeners.begin(); + while (i != listeners.end() && i->id < point) i++; + return i; +} + +void CompletionTracker::Record::completed() +{ + if (f) f(); + else if(g) g(empty);//won't get a result if command is now complete +} + +void CompletionTracker::Record::received(const std::string& result) +{ + if (g) g(result); +} diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h index 30999b4184..05cdc45c9f 100644 --- a/cpp/src/qpid/client/CompletionTracker.h +++ b/cpp/src/qpid/client/CompletionTracker.h @@ -19,7 +19,7 @@ * */ -#include +#include #include #include "qpid/framing/amqp_framing.h" #include "qpid/framing/SequenceNumber.h" @@ -34,19 +34,40 @@ namespace client { class CompletionTracker { public: - typedef boost::function Listener; + //typedef boost::function CompletionListener; + typedef boost::function0 CompletionListener; + typedef boost::function ResultListener; CompletionTracker(); CompletionTracker(const framing::SequenceNumber& mark); void completed(const framing::SequenceNumber& mark); - void listen(const framing::SequenceNumber& point, Listener l); + void received(const framing::SequenceNumber& id, const std::string& result); + void listenForCompletion(const framing::SequenceNumber& point, CompletionListener l); + void listenForResult(const framing::SequenceNumber& point, ResultListener l); + void close(); private: + struct Record + { + framing::SequenceNumber id; + CompletionListener f; + ResultListener g; + + Record(const framing::SequenceNumber& _id, CompletionListener l) : id(_id), f(l) {} + Record(const framing::SequenceNumber& _id, ResultListener l) : id(_id), g(l) {} + void completed(); + void received(const std::string& result); + + }; + + typedef std::list Listeners; + sys::Mutex lock; framing::SequenceNumber mark; - std::queue< std::pair > listeners; + Listeners listeners; - bool add(const framing::SequenceNumber& point, Listener l); + bool add(const Record& r); + Listeners::iterator seek(const framing::SequenceNumber&); }; } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index b4d2156c31..5ff34cde4e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -60,11 +60,11 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { uint16_t id = frame.getChannel(); - SessionCore::shared_ptr session = sessions[id]; - if (!session) { + SessionMap::iterator i = sessions.find(id); + if (i == sessions.end()) { throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); } - session->handle(frame); + i->second->handle(frame); } void ConnectionImpl::open(const std::string& host, int port, @@ -111,7 +111,8 @@ void ConnectionImpl::idleOut() connector->send(frame); } -void ConnectionImpl::shutdown() { +void ConnectionImpl::shutdown() +{ //this indicates that the socket to the server has closed for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { i->second->closed(0, "Unexpected socket closure."); diff --git a/cpp/src/qpid/client/Execution.h b/cpp/src/qpid/client/Execution.h new file mode 100644 index 0000000000..1e8c48734d --- /dev/null +++ b/cpp/src/qpid/client/Execution.h @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Execution_ +#define _Execution_ + +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace client { + +class Execution +{ +public: + virtual ~Execution() {} + virtual void sendSyncRequest() = 0; + virtual void sendFlushRequest() = 0; + virtual void completed(const framing::SequenceNumber& id, bool cumulative, bool send) = 0; +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index d10b3d3fe8..1520ba2272 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -97,8 +97,7 @@ void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& ra void ExecutionHandler::flush() { - //send completion - incoming.lwm = incoming.hwm; + sendCompletion(); } void ExecutionHandler::noop() @@ -106,48 +105,88 @@ void ExecutionHandler::noop() //do nothing } -void ExecutionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void ExecutionHandler::result(uint32_t command, const std::string& data) { - //TODO: need to signal the result to the appropriate listener + completion.received(command, data); } void ExecutionHandler::sync() { - //TODO: implement (the application is in charge of completion of - //some commands, so need to track completion for them). + //TODO: implement - need to note the mark requested and then + //remember to send a response when that point is reached +} - //This shouldn't ever need to be called by the server (in my - //opinion) as the server never needs to synchronise with the - //clients execution +void ExecutionHandler::flushTo(const framing::SequenceNumber& point) +{ + if (point > outgoing.lwm) { + sendFlushRequest(); + } } -void ExecutionHandler::sendFlush() +void ExecutionHandler::sendFlushRequest() { AMQFrame frame(0, ExecutionFlushBody()); - out(frame); + out(frame); } -void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g) +void ExecutionHandler::syncTo(const framing::SequenceNumber& point) { - //allocate id: - ++outgoing.hwm; - //register listeners if necessary: - if (f) { - completion.listen(outgoing.hwm, f); - } - if (g) { - correlation.listen(g); + if (point > outgoing.lwm) { + sendSyncRequest(); + } +} + + +void ExecutionHandler::sendSyncRequest() +{ + AMQFrame frame(0, ExecutionSyncBody()); + out(frame); +} + +void ExecutionHandler::completed(const SequenceNumber& id, bool cumulative, bool send) +{ + if (id > completionStatus.mark) { + if (cumulative) { + completionStatus.update(completionStatus.mark, id); + } else { + completionStatus.update(id, id); + } } + if (send) { + sendCompletion(); + } +} - AMQFrame frame(0/*id will be filled in be channel handler*/, command); + +void ExecutionHandler::sendCompletion() +{ + SequenceNumberSet range; + completionStatus.collectRanges(range); + AMQFrame frame(0, ExecutionCompleteBody(version, completionStatus.mark.getValue(), range)); + out(frame); +} + +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) +{ + SequenceNumber id = ++outgoing.hwm; + if(l) { + completion.listenForResult(id, l); + } + AMQFrame frame(0/*channel will be filled in be channel handler*/, command); out(frame); + return id; } -void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, - CompletionTracker::Listener f, Correlator::Listener g) +SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, + CompletionTracker::ResultListener l) { - send(command, f, g); + SequenceNumber id = send(command, l); + sendContent(dynamic_cast(content.getMethodHeaders()), content.getData()); + return id; +} +void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data) +{ AMQHeaderBody header; BasicHeaderProperties::copy(*header.get(true), headers); header.get(true)->setContentLength(data.size()); diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index f740e14185..a42697e26a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -22,28 +22,34 @@ #define _ExecutionHandler_ #include +#include "qpid/framing/AccumulatedAck.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameSet.h" +#include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceNumber.h" #include "BlockingQueue.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" +#include "Execution.h" namespace qpid { namespace client { class ExecutionHandler : private framing::AMQP_ServerOperations::ExecutionHandler, - public ChainableFrameHandler + public ChainableFrameHandler, + public Execution { framing::Window incoming; framing::Window outgoing; framing::FrameSet::shared_ptr arriving; Correlator correlation; CompletionTracker completion; + BlockingQueue received; framing::ProtocolVersion version; uint64_t maxFrameSize; + framing::AccumulatedAck completionStatus; void complete(uint32_t mark, const framing::SequenceNumberSet& range); void flush(); @@ -51,22 +57,29 @@ class ExecutionHandler : void result(uint32_t command, const std::string& data); void sync(); + void sendCompletion(); + + void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data); + public: - BlockingQueue received; + typedef CompletionTracker::ResultListener ResultListener; ExecutionHandler(uint64_t maxFrameSize = 65536); - void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } - void handle(framing::AMQFrame& frame); - void send(const framing::AMQBody& command, - CompletionTracker::Listener f = CompletionTracker::Listener(), - Correlator::Listener g = Correlator::Listener()); - void sendContent(const framing::AMQBody& command, - const framing::BasicHeaderProperties& headers, const std::string& data, - CompletionTracker::Listener f = CompletionTracker::Listener(), - Correlator::Listener g = Correlator::Listener()); - void sendFlush(); + framing::SequenceNumber send(const framing::AMQBody& command, ResultListener=ResultListener()); + framing::SequenceNumber send(const framing::AMQBody& command, const framing::MethodContent& content, + ResultListener=ResultListener()); + void sendSyncRequest(); + void sendFlushRequest(); + void completed(const framing::SequenceNumber& id, bool cumulative, bool send); + void syncTo(const framing::SequenceNumber& point); + void flushTo(const framing::SequenceNumber& point); + + void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } + Correlator& getCorrelator() { return correlation; } + CompletionTracker& getCompletionTracker() { return completion; } + BlockingQueue& getReceived() { return received; } }; }} diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h new file mode 100644 index 0000000000..c2f3b426da --- /dev/null +++ b/cpp/src/qpid/client/Future.h @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _Future_ +#define _Future_ + +#include +#include +#include "qpid/Exception.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/StructHelper.h" +#include "FutureCompletion.h" +#include "FutureResponse.h" +#include "FutureResult.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +class Future : private framing::StructHelper +{ + framing::SequenceNumber command; + boost::shared_ptr response; + boost::shared_ptr result; + bool complete; + +public: + Future() : complete(false) {} + Future(const framing::SequenceNumber& id) : command(id), complete(false) {} + + void sync(SessionCore& session) + { + if (!complete) { + FutureCompletion callback; + session.getExecution().flushTo(command); + session.getExecution().getCompletionTracker().listenForCompletion( + command, + boost::bind(&FutureCompletion::completed, &callback) + ); + callback.waitForCompletion(); + session.checkClosed(); + complete = true; + } + } + + framing::AMQMethodBody* getResponse(SessionCore& session) + { + if (response) { + session.getExecution().getCompletionTracker().listenForCompletion( + command, + boost::bind(&FutureResponse::completed, response) + ); + return response->getResponse(session); + } else { + throw Exception("Response not expected"); + } + } + + template void decodeResult(T& value, SessionCore& session) + { + if (result) { + decode(value, result->getResult(session)); + } else { + throw Exception("Result not expected"); + } + } + + bool isComplete() { + return complete; + } + + void setCommandId(const framing::SequenceNumber& id) { command = id; } + void setFutureResponse(boost::shared_ptr r) { response = r; } + void setFutureResult(boost::shared_ptr r) { result = r; } +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/FutureCompletion.cpp b/cpp/src/qpid/client/FutureCompletion.cpp index 6fc3d5f088..130c65d6aa 100644 --- a/cpp/src/qpid/client/FutureCompletion.cpp +++ b/cpp/src/qpid/client/FutureCompletion.cpp @@ -24,9 +24,9 @@ using namespace qpid::client; using namespace qpid::sys; -FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {} +FutureCompletion::FutureCompletion() : complete(false) {} -bool FutureCompletion::isComplete() +bool FutureCompletion::isComplete() const { Monitor::ScopedLock l(lock); return complete; @@ -39,23 +39,10 @@ void FutureCompletion::completed() lock.notifyAll(); } -void FutureCompletion::waitForCompletion() +void FutureCompletion::waitForCompletion() const { Monitor::ScopedLock l(lock); - while (!complete && !closed) { + while (!complete) { lock.wait(); } - if (closed) { - throw ChannelException(code, text); - } -} - -void FutureCompletion::close(uint16_t _code, const std::string& _text) -{ - Monitor::ScopedLock l(lock); - complete = true; - closed = true; - code = _code; - text = _text; - lock.notifyAll(); } diff --git a/cpp/src/qpid/client/FutureCompletion.h b/cpp/src/qpid/client/FutureCompletion.h index 3487a0910a..1897230230 100644 --- a/cpp/src/qpid/client/FutureCompletion.h +++ b/cpp/src/qpid/client/FutureCompletion.h @@ -31,19 +31,15 @@ namespace client { class FutureCompletion { protected: - sys::Monitor lock; + mutable sys::Monitor lock; bool complete; - bool closed; - uint16_t code; - std::string text; public: FutureCompletion(); virtual ~FutureCompletion(){} - bool isComplete(); - void waitForCompletion(); + bool isComplete() const; + void waitForCompletion() const; void completed(); - void close(uint16_t code, const std::string& text); }; }} diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index e63dc9c192..afdd35c5eb 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -21,14 +21,17 @@ #include "FutureResponse.h" +#include "SessionCore.h" + using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; -AMQMethodBody* FutureResponse::getResponse() +AMQMethodBody* FutureResponse::getResponse(SessionCore& session) { waitForCompletion(); + session.checkClosed(); return response.get(); } diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h index 75b1f72c04..1e8a7eb456 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/FutureResponse.h @@ -29,11 +29,13 @@ namespace qpid { namespace client { +class SessionCore; + class FutureResponse : public FutureCompletion { framing::MethodHolder response; public: - framing::AMQMethodBody* getResponse(); + framing::AMQMethodBody* getResponse(SessionCore& session); void received(framing::AMQMethodBody* response); }; diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp new file mode 100644 index 0000000000..a523129206 --- /dev/null +++ b/cpp/src/qpid/client/FutureResult.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FutureResult.h" + +#include "SessionCore.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::sys; + +const std::string& FutureResult::getResult(SessionCore& session) const +{ + waitForCompletion(); + session.checkClosed(); + return result; +} + +void FutureResult::received(const std::string& r) +{ + Monitor::ScopedLock l(lock); + result = r; + complete = true; + lock.notifyAll(); +} diff --git a/cpp/src/qpid/client/FutureResult.h b/cpp/src/qpid/client/FutureResult.h new file mode 100644 index 0000000000..3117b63802 --- /dev/null +++ b/cpp/src/qpid/client/FutureResult.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _FutureResult_ +#define _FutureResult_ + +#include +#include "qpid/framing/amqp_framing.h" +#include "FutureCompletion.h" + +namespace qpid { +namespace client { + +class SessionCore; + +class FutureResult : public FutureCompletion +{ + std::string result; +public: + const std::string& getResult(SessionCore& session) const; + void received(const std::string& result); +}; + +}} + + + +#endif diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h index 7866df916c..f9a9f97b75 100644 --- a/cpp/src/qpid/client/Response.h +++ b/cpp/src/qpid/client/Response.h @@ -24,34 +24,27 @@ #include #include "qpid/framing/amqp_framing.h" -#include "FutureResponse.h" +#include "Completion.h" namespace qpid { namespace client { -class Response +class Response : public Completion { - boost::shared_ptr future; - public: - Response(boost::shared_ptr f) : future(f) {} + Response(Future f, SessionCore::shared_ptr s) : Completion(f, s) {} template T& as() { - framing::AMQMethodBody* response(future->getResponse()); - assert(response); + framing::AMQMethodBody* response(future.getResponse(*session)); return *boost::polymorphic_downcast(response); } + template bool isA() { - framing::AMQMethodBody* response(future->getResponse()); + framing::AMQMethodBody* response(future.getResponse(*session)); return response && response->isA(); } - - void sync() - { - return future->waitForCompletion(); - } }; }} diff --git a/cpp/src/qpid/client/ScopedAssociation.h b/cpp/src/qpid/client/ScopedAssociation.h new file mode 100644 index 0000000000..861a28c0f8 --- /dev/null +++ b/cpp/src/qpid/client/ScopedAssociation.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ScopedAssociation_ +#define _ScopedAssociation_ + +#include "ConnectionImpl.h" +#include "SessionCore.h" + +namespace qpid { +namespace client { + +struct ScopedAssociation +{ + typedef boost::shared_ptr shared_ptr; + + SessionCore::shared_ptr session; + ConnectionImpl::shared_ptr connection; + + ScopedAssociation() {} + + ScopedAssociation(SessionCore::shared_ptr s, ConnectionImpl::shared_ptr c) : session(s), connection(c) + { + connection->allocated(session); + } + + ~ScopedAssociation() + { + if (connection && session) connection->released(session); + } +}; + + +}} + +#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 1b04e74af4..8dfe42989b 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -21,12 +21,15 @@ #include "SessionCore.h" #include +#include "Future.h" +#include "FutureResponse.h" +#include "FutureResult.h" using namespace qpid::client; using namespace qpid::framing; SessionCore::SessionCore(uint16_t _id, boost::shared_ptr out, - uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false) + uint64_t maxFrameSize) : l3(maxFrameSize), id(_id), sync(false), isClosed(false) { l2.out = boost::bind(&FrameHandler::handle, out, _1); l2.in = boost::bind(&ExecutionHandler::handle, &l3, _1); @@ -39,47 +42,15 @@ void SessionCore::open() l2.open(id); } -void SessionCore::flush() +ExecutionHandler& SessionCore::getExecution() { - l3.sendFlush(); -} - -Response SessionCore::send(const AMQMethodBody& method, bool expectResponse) -{ - boost::shared_ptr f(futures.createResponse()); - if (expectResponse) { - l3.send(method, boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1)); - } else { - l3.send(method, boost::bind(&FutureResponse::completed, f)); - } - if (sync) { - flush(); - f->waitForCompletion(); - } - return Response(f); -} - -Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse) -{ - //TODO: lots of duplication between these two send methods; refactor - boost::shared_ptr f(futures.createResponse()); - if (expectResponse) { - l3.sendContent(method, dynamic_cast(content.getMethodHeaders()), content.getData(), - boost::bind(&FutureResponse::completed, f), boost::bind(&FutureResponse::received, f, _1)); - } else { - l3.sendContent(method, dynamic_cast(content.getMethodHeaders()), content.getData(), - boost::bind(&FutureResponse::completed, f)); - } - if (sync) { - flush(); - f->waitForCompletion(); - } - return Response(f); + checkClosed(); + return l3; } FrameSet::shared_ptr SessionCore::get() { - return l3.received.pop(); + return l3.getReceived().pop(); } void SessionCore::setSync(bool s) @@ -95,12 +66,13 @@ bool SessionCore::isSync() void SessionCore::close() { l2.close(); - l3.received.close(); + stop(); } void SessionCore::stop() { - l3.received.close(); + l3.getReceived().close(); + l3.getCompletionTracker().close(); } void SessionCore::handle(AMQFrame& frame) @@ -110,6 +82,46 @@ void SessionCore::handle(AMQFrame& frame) void SessionCore::closed(uint16_t code, const std::string& text) { - l3.received.close(); - futures.close(code, text); + stop(); + + isClosed = true; + reason.code = code; + reason.text = text; +} + +void SessionCore::checkClosed() +{ + if (isClosed) { + throw ChannelException(reason.code, reason.text); + } +} + +Future SessionCore::send(const AMQBody& command) +{ + Future f; + //any result/response listeners must be set before the command is sent + if (command.getMethod()->resultExpected()) { + boost::shared_ptr r(new FutureResult()); + f.setFutureResult(r); + //result listener is tied to command id, and is set when that + //is allocated by the execution handler, so pass it to send + f.setCommandId(l3.send(command, boost::bind(&FutureResult::received, r, _1))); + } else { + if (command.getMethod()->responseExpected()) { + boost::shared_ptr r(new FutureResponse()); + f.setFutureResponse(r); + l3.getCorrelator().listen(boost::bind(&FutureResponse::received, r, _1)); + } + + f.setCommandId(l3.send(command)); + } + return f; +} + +Future SessionCore::send(const AMQBody& command, const MethodContent& content) +{ + //content bearing methods don't currently have responses or + //results, if that changes should follow procedure for the other + //send method impl: + return Future(l3.send(command, content)); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 0febb956b9..80fe13715f 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -22,6 +22,7 @@ #ifndef _SessionCore_ #define _SessionCore_ +#include #include #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/FrameHandler.h" @@ -29,35 +30,44 @@ #include "qpid/framing/MethodContent.h" #include "ChannelHandler.h" #include "ExecutionHandler.h" -#include "FutureFactory.h" -#include "Response.h" namespace qpid { namespace client { +class Future; + class SessionCore : public framing::FrameHandler { + struct Reason + { + uint16_t code; + std::string text; + }; + ExecutionHandler l3; ChannelHandler l2; - FutureFactory futures; const uint16_t id; bool sync; + bool isClosed; + Reason reason; public: typedef boost::shared_ptr shared_ptr; SessionCore(uint16_t id, boost::shared_ptr out, uint64_t maxFrameSize); - Response send(const framing::AMQMethodBody& method, bool expectResponse = false); - Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false); framing::FrameSet::shared_ptr get(); uint16_t getId() const { return id; } void setSync(bool); bool isSync(); - void flush(); void open(); void close(); void stop(); void closed(uint16_t code, const std::string& text); + void checkClosed(); + ExecutionHandler& getExecution(); + + Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); //for incoming frames: void handle(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/client/TypedResult.h b/cpp/src/qpid/client/TypedResult.h new file mode 100644 index 0000000000..38892c42bd --- /dev/null +++ b/cpp/src/qpid/client/TypedResult.h @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _TypedResult_ +#define _TypedResult_ + +#include "Completion.h" + +namespace qpid { +namespace client { + +template class TypedResult : public Completion +{ + T result; + bool decoded; + +public: + TypedResult(Future f, SessionCore::shared_ptr s) : Completion(f, s), decoded(false) {} + + T& get() + { + if (!decoded) { + future.decodeResult(result, *session); + decoded = true; + } + + return result; + } +}; + +}} + +#endif diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index a5c14a37e9..f0043d9d3b 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -50,7 +50,9 @@ class AMQMethodBody : public AMQBody { virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; virtual bool isContentBearing() const = 0; - + virtual bool resultExpected() const = 0; + virtual bool responseExpected() const = 0; + void invoke(AMQP_ServerOperations&); bool invoke(Invocable*); diff --git a/cpp/src/qpid/framing/AccumulatedAck.cpp b/cpp/src/qpid/framing/AccumulatedAck.cpp new file mode 100644 index 0000000000..9daae5494c --- /dev/null +++ b/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -0,0 +1,154 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "AccumulatedAck.h" + +#include +#include + +using std::list; +using std::max; +using std::min; +using namespace qpid::framing; + +void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ + assert(first <= last); + if (last < mark) return; + + + Range r(first, last); + bool handled = false; + bool markMerged = false; + list::iterator merged = ranges.end(); + if (r.mergeable(mark)) { + mark = r.end; + markMerged = true; + handled = true; + } else { + for (list::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) { + if (i->merge(r)) { + merged = i; + handled = true; + } else if (r.start < i->start) { + ranges.insert(i, r); + handled = true; + } + } + } + if (!handled) { + ranges.push_back(r); + } else { + while (!ranges.empty() && ranges.front().end <= mark) { + ranges.pop_front(); + } + if (markMerged) { + //new range is incorporated, but may be possible to consolidate + merged = ranges.begin(); + while (merged != ranges.end() && merged->mergeable(mark)) { + mark = merged->end; + merged = ranges.erase(merged); + } + } + if (merged != ranges.end()) { + //consolidate ranges + list::iterator i = merged; + list::iterator j = i++; + while (i != ranges.end() && j->merge(*i)) { + j = i++; + } + } + } +} + +void AccumulatedAck::consolidate(){} + +void AccumulatedAck::clear(){ + mark = 0;//not sure that this is valid when wraparound is a possibility + ranges.clear(); +} + +bool AccumulatedAck::covers(SequenceNumber tag) const{ + if (tag <= mark) return true; + for (list::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(tag)) return true; + } + return false; +} + +void AccumulatedAck::collectRanges(SequenceNumberSet& set) const +{ + for (list::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + set.push_back(i->start); + set.push_back(i->end); + } +} + +bool Range::contains(SequenceNumber i) const +{ + return i >= start && i <= end; +} + +bool Range::intersect(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); +} + +bool Range::merge(const Range& r) +{ + if (intersect(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool Range::mergeable(const SequenceNumber& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} + + +namespace qpid{ +namespace framing{ + std::ostream& operator<<(std::ostream& out, const Range& r) + { + out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; + return out; + } + + std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a) + { + out << "{mark: " << a.mark.getValue() << ", ranges: ("; + for (list::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) { + if (i != a.ranges.begin()) out << ", "; + out << *i; + } + out << ")]"; + return out; + } +}} diff --git a/cpp/src/qpid/framing/AccumulatedAck.h b/cpp/src/qpid/framing/AccumulatedAck.h new file mode 100644 index 0000000000..f75842968f --- /dev/null +++ b/cpp/src/qpid/framing/AccumulatedAck.h @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _AccumulatedAck_ +#define _AccumulatedAck_ + +#include +#include +#include +#include +#include "SequenceNumber.h" +#include "SequenceNumberSet.h" + +namespace qpid { + namespace framing { + + struct Range + { + SequenceNumber start; + SequenceNumber end; + + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; + bool intersect(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const SequenceNumber& r) const; + }; + /** + * Keeps an accumulated record of acked messages (by delivery + * tag). + */ + class AccumulatedAck { + public: + /** + * Everything up to this value has been acked. + */ + SequenceNumber mark; + /** + * List of individually acked messages greater than the + * 'mark'. + */ + std::list ranges; + + explicit AccumulatedAck(SequenceNumber r = SequenceNumber()) : mark(r) {} + void update(SequenceNumber firstTag, SequenceNumber lastTag); + void consolidate(); + void clear(); + bool covers(SequenceNumber tag) const; + void collectRanges(SequenceNumberSet& set) const; + }; + std::ostream& operator<<(std::ostream&, const Range&); + std::ostream& operator<<(std::ostream&, const AccumulatedAck&); + } +} + + +#endif diff --git a/cpp/src/qpid/framing/StructHelper.h b/cpp/src/qpid/framing/StructHelper.h index 753a593523..6b111e1f9e 100644 --- a/cpp/src/qpid/framing/StructHelper.h +++ b/cpp/src/qpid/framing/StructHelper.h @@ -44,7 +44,7 @@ public: rbuffer.getRawData(data, size); } - template void decode(T t, std::string& data) { + template void decode(T& t, const std::string& data) { char* bytes = static_cast(::alloca(data.length())); Buffer wbuffer(bytes, data.length()); wbuffer.putRawData(data); diff --git a/cpp/src/tests/AccumulatedAckTest.cpp b/cpp/src/tests/AccumulatedAckTest.cpp index 601af532fa..62245e463b 100644 --- a/cpp/src/tests/AccumulatedAckTest.cpp +++ b/cpp/src/tests/AccumulatedAckTest.cpp @@ -19,13 +19,13 @@ * under the License. * */ -#include "qpid/broker/AccumulatedAck.h" +#include "qpid/framing/AccumulatedAck.h" #include "qpid_test_plugin.h" #include #include using std::list; -using namespace qpid::broker; +using namespace qpid::framing; class AccumulatedAckTest : public CppUnit::TestCase { @@ -44,12 +44,12 @@ class AccumulatedAckTest : public CppUnit::TestCase public: bool covers(const AccumulatedAck& ack, int i) { - return ack.covers(DeliveryId(i)); + return ack.covers(SequenceNumber(i)); } void update(AccumulatedAck& ack, int start, int end) { - ack.update(DeliveryId(start), DeliveryId(end)); + ack.update(SequenceNumber(start), SequenceNumber(end)); } void testGeneral() diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp new file mode 100644 index 0000000000..1acac9c980 --- /dev/null +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include +#include "qpid_test_plugin.h" +#include "InProcessBroker.h" +#include "qpid/client/Session.h" + +using namespace qpid::client; +using namespace qpid::framing; + +class ClientSessionTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(ClientSessionTest); + CPPUNIT_TEST(testQueueQuery);; + CPPUNIT_TEST_SUITE_END(); + + boost::shared_ptr broker; + Connection connection; + Session session; + + public: + + ClientSessionTest() : broker(new qpid::broker::InProcessBroker()), connection(broker) + { + connection.open(""); + session = connection.newSession(); + } + + void testQueueQuery() + { + std::string name("my-queue"); + std::string alternate("amq.fanout"); + session.queueDeclare(0, name, alternate, false, false, true, true, FieldTable()); + TypedResult result = session.queueQuery(name); + CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable()); + CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive()); + CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); + } + + void testCompletion() + { + std::string queue("my-queue"); + std::string dest("my-dest"); + session.queueDeclare(0, queue, "", false, false, true, true, FieldTable()); + //subcribe to the queue with confirm_mode = 1 + session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable()); + //publish some messages + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest); diff --git a/cpp/src/tests/InProcessBroker.h b/cpp/src/tests/InProcessBroker.h index 0e5f3895f9..531ebd8fa7 100644 --- a/cpp/src/tests/InProcessBroker.h +++ b/cpp/src/tests/InProcessBroker.h @@ -101,6 +101,7 @@ class InProcessBroker : public client::Connector { ) : sender(sender_), conversation(conversation_), in(ih) {} void send(framing::AMQFrame& frame) { + //std::cout << (sender == CLIENT ? "C->S: " : "S->C: ") << frame << std::endl; conversation.push_back(TaggedFrame(sender, frame)); in->received(frame); } diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 874a9a448a..de7a12c027 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -94,10 +94,11 @@ broker_unit_tests = \ TxPublishTest \ ValueTest \ MessageHandlerTest \ - MessageBuilderTest + MessageBuilderTest \ + ClientSessionTest #client_unit_tests = \ - ClientChannelTest + ClientChannelTest framing_unit_tests = \ FieldTableTest \ -- cgit v1.2.1