diff options
author | Alan Conway <aconway@apache.org> | 2007-09-21 18:26:37 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-09-21 18:26:37 +0000 |
commit | 2f6d6ad7efd788b71204af67dff51b6233881e2e (patch) | |
tree | a3d123bc112d12dfcef341a312f418624c98e342 /cpp | |
parent | 3b80f903b6174b4346d7d7b537d783f628fe28d6 (diff) | |
download | qpid-python-2f6d6ad7efd788b71204af67dff51b6233881e2e.tar.gz |
Split broker::Session into:
broker::SessionState: session info (uuid etc.) + handler chains.
broker::SemanticState: session state for the SemanticHandler.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
22 files changed, 331 insertions, 281 deletions
diff --git a/cpp/rubygen/generate b/cpp/rubygen/generate index dba3b81273..dba39dbf80 100755 --- a/cpp/rubygen/generate +++ b/cpp/rubygen/generate @@ -49,9 +49,9 @@ if makefile rgen_generator=#{make_continue rgen_generator} -rgen_client_cpp=#{make_continue(rgen_srcs.grep %r|/qpid/client/.+\.cpp$|)} +rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))} -rgen_common_cpp=#{make_continue(rgen_srcs.grep %r|qpid/framing/.+\.cpp$|)} +rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))} rgen_srcs=#{make_continue rgen_srcs} diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cf7029dabc..bdac539d92 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -187,8 +187,10 @@ libqpidbroker_la_SOURCES = \ qpid/broker/RecoveryManagerImpl.cpp \ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ - qpid/broker/Session.h \ - qpid/broker/Session.cpp \ + qpid/broker/SemanticState.h \ + qpid/broker/SemanticState.cpp \ + qpid/broker/SessionState.h \ + qpid/broker/SessionState.cpp \ qpid/broker/SessionHandler.h \ qpid/broker/SessionHandler.cpp \ qpid/broker/SemanticHandler.cpp \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index c266b36dfb..0fb521d626 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -16,8 +16,6 @@ * */ #include "BrokerAdapter.h" -#include "Session.h" -#include "SessionHandler.h" #include "Connection.h" #include "DeliveryToken.h" #include "MessageDelivery.h" @@ -38,7 +36,7 @@ typedef std::vector<Queue::shared_ptr> QueueVector; // by the handlers responsible for those classes. // -BrokerAdapter::BrokerAdapter(Session& s) : +BrokerAdapter::BrokerAdapter(SemanticState& s) : HandlerImpl(s), basicHandler(s), exchangeHandler(s), @@ -153,7 +151,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { - Queue::shared_ptr queue = getSession().getQueue(name); + Queue::shared_ptr queue = state.getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); return QueueQueryResult(queue->getName(), @@ -176,7 +174,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } Queue::shared_ptr queue; if (passive && !name.empty()) { - queue = getSession().getQueue(name); + queue = state.getQueue(name); //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = @@ -187,7 +185,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - getSession().setDefaultQueue(queue); + state.setDefaultQueue(queue); if (alternate) { queue->setAlternateExchange(alternate); alternate->incAlternateUsers(); @@ -216,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu const string& exchangeName, const string& routingKey, const FieldTable& arguments){ - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; @@ -239,7 +237,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, const string& routingKey, const qpid::framing::FieldTable& arguments ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); @@ -252,12 +250,12 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - getSession().getQueue(queue)->purge(); + state.getQueue(queue)->purge(); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ ChannelException error(0, ""); - Queue::shared_ptr q = getSession().getQueue(queue); + Queue::shared_ptr q = state.getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); }else if(ifUnused && q->getConsumerCount() > 0){ @@ -279,8 +277,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ //TODO: handle global - getSession().setPrefetchSize(prefetchSize); - getSession().setPrefetchCount(prefetchCount); + state.setPrefetchSize(prefetchSize); + state.setPrefetchCount(prefetchCount); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -289,8 +287,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, bool nowait, const FieldTable& fields) { - Queue::shared_ptr queue = getSession().getQueue(queueName); - if(!consumerTag.empty() && getSession().exists(consumerTag)){ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!consumerTag.empty() && state.exists(consumerTag)){ throw ConnectionException(530, "Consumer tags must be unique"); } string newTag = consumerTag; @@ -298,7 +296,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); + state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); if(!nowait) getProxy().getBasic().consumeOk(newTag); @@ -308,13 +306,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, } void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - getSession().cancel(consumerTag); + state.cancel(consumerTag); } void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!getSession().get(token, queue, !noAck)){ + if(!state.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack getProxy().getBasic().getEmpty(clusterId); @@ -323,9 +321,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ if (multiple) { - getSession().ackCumulative(deliveryTag); + state.ackCumulative(deliveryTag); } else { - getSession().ackRange(deliveryTag, deliveryTag); + state.ackRange(deliveryTag, deliveryTag); } } @@ -333,23 +331,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) { - getSession().recover(requeue); + state.recover(requeue); } void BrokerAdapter::TxHandlerImpl::select() { - getSession().startTx(); + state.startTx(); } void BrokerAdapter::TxHandlerImpl::commit() { - getSession().commit(&getBroker().getStore()); + state.commit(&getBroker().getStore()); } void BrokerAdapter::TxHandlerImpl::rollback() { - getSession().rollback(); - getSession().recover(false); + state.rollback(); + state.recover(false); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index ec6b4aa0fc..5537dc67f5 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -20,7 +20,7 @@ */ #include "DtxHandlerImpl.h" #include "MessageHandlerImpl.h" -#include "NameGenerator.h" + #include "qpid/Exception.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/reply_exceptions.h" @@ -44,6 +44,7 @@ class StreamHandler; class DtxHandler; class TunnelHandler; class MessageHandlerImpl; +class Exchange; /** * Per-channel protocol adapter. @@ -54,16 +55,10 @@ class MessageHandlerImpl; * peer. * */ - -// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way -// to group methods as seen by the BADHANDLERs below. -// Handlers should be grouped by layer, the BrokerAdapter stuff -// belongs on the SemanticHandler. -// class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Session& session); + BrokerAdapter(SemanticState& session); BasicHandler* getBasicHandler() { return &basicHandler; } ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } @@ -73,7 +68,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations MessageHandler* getMessageHandler() { return &messageHandler; } DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); } + + framing::ProtocolVersion getVersion() const { return session.getVersion();} AccessHandler* getAccessHandler() { @@ -99,7 +95,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {} + ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {} void declare(uint16_t ticket, const std::string& exchange, const std::string& type, @@ -108,10 +104,13 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, const std::string& exchange, bool ifUnused); - framing::ExchangeQueryResult query(u_int16_t ticket, const string& name); + framing::ExchangeQueryResult query(u_int16_t ticket, + const std::string& name); private: - void checkType(Exchange::shared_ptr exchange, const std::string& type); - void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate); + void checkType(shared_ptr<Exchange> exchange, const std::string& type); + + void checkAlternate(shared_ptr<Exchange> exchange, + shared_ptr<Exchange> alternate); }; class BindingHandlerImpl : @@ -119,7 +118,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - BindingHandlerImpl(Session& session) : HandlerImpl(session) {} + BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {} framing::BindingQueryResult query(u_int16_t ticket, const std::string& exchange, @@ -133,7 +132,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - QueueHandlerImpl(Session& session) : HandlerImpl(session) {} + QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {} void declare(uint16_t ticket, const std::string& queue, const std::string& alternateExchange, @@ -148,7 +147,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - framing::QueueQueryResult query(const string& queue); + framing::QueueQueryResult query(const std::string& queue); void purge(uint16_t ticket, const std::string& queue); void delete_(uint16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty); @@ -159,9 +158,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { NameGenerator tagGenerator; - public: - BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {} + BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {} void qos(uint32_t prefetchSize, uint16_t prefetchCount, bool global); @@ -181,7 +179,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public HandlerImpl { public: - TxHandlerImpl(Session& session) : HandlerImpl(session) {} + TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {} void select(); void commit(); diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ec6fd6ece7..b1b8abe4fd 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -23,7 +23,7 @@ #include <assert.h> #include "Connection.h" -#include "Session.h" +#include "SessionState.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "BrokerAdapter.h" #include "SemanticHandler.h" @@ -52,8 +52,7 @@ void Connection::received(framing::AMQFrame& frame){ if (frame.getChannel() == 0) { adapter.handle(frame); } else { - SessionHandler sa = getChannel(frame.getChannel()); - sa.in(frame); + getChannel(frame.getChannel()).in(frame); } } @@ -94,7 +93,7 @@ void Connection::closeChannel(uint16_t id) { if (i != channels.end()) channels.erase(i); } -SessionHandler Connection::getChannel(ChannelId id) { +SessionHandler& Connection::getChannel(ChannelId id) { boost::optional<SessionHandler>& ch = channels[id]; if (!ch) { ch = boost::in_place(boost::ref(*this), id); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 2723ac9acc..4f64873dc3 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -35,7 +35,6 @@ #include "qpid/framing/ProtocolVersion.h" #include "Broker.h" #include "qpid/Exception.h" -#include "Session.h" #include "ConnectionHandler.h" #include "SessionHandler.h" @@ -51,7 +50,7 @@ class Connection : public sys::ConnectionInputHandler, Connection(sys::ConnectionOutputHandler* out, Broker& broker); /** Get the SessionHandler for channel. Create if it does not already exist */ - SessionHandler getChannel(framing::ChannelId channel); + SessionHandler& getChannel(framing::ChannelId channel); /** Close the connection */ void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 9196fa71a0..36e6c22f88 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -20,7 +20,7 @@ */ #include "DeliveryRecord.h" #include "DeliverableMessage.h" -#include "Session.h" +#include "SemanticState.h" #include "BrokerExchange.h" #include "qpid/log/Statement.h" @@ -74,7 +74,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const return range->covers(id); } -void DeliveryRecord::redeliver(Session* const session) const{ +void DeliveryRecord::redeliver(SemanticState* const session) const{ if (!confirmed) { if(pull){ //if message was originally sent as response to get, we must requeue it diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 4d98b0c5da..3c833fcaa8 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -34,7 +34,7 @@ namespace qpid { namespace broker { -class Session; +class SemanticState; /** * Record of a delivery for which an ack is outstanding. @@ -61,7 +61,7 @@ class DeliveryRecord{ void requeue() const; void release(); void reject(); - void redeliver(Session* const) const; + void redeliver(SemanticState* const) const; void updateByteCredit(uint32_t& credit) const; void addTo(Prefetch&) const; void subtractFrom(Prefetch&) const; diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 7ed42d285b..5887d13f85 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -19,21 +19,20 @@ #include <boost/format.hpp> #include "Broker.h" -#include "Session.h" #include "qpid/framing/constants.h" using namespace qpid::broker; using namespace qpid::framing; using std::string; -DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {} +DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {} // DtxDemarcationHandler: void DtxHandlerImpl::select() { - getSession().selectDtx(); + state.selectDtx(); } DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -43,7 +42,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, { try { if (fail) { - getSession().endDtx(xid, true); + state.endDtx(xid, true); if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { @@ -51,9 +50,9 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, } } else { if (suspend) { - getSession().suspendDtx(xid); + state.suspendDtx(xid); } else { - getSession().endDtx(xid, false); + state.endDtx(xid, false); } return DtxDemarcationEndResult(XA_OK); } @@ -72,9 +71,9 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, } try { if (resume) { - getSession().resumeDtx(xid); + state.resumeDtx(xid); } else { - getSession().startDtx(xid, getBroker().getDtxManager(), join); + state.startDtx(xid, getBroker().getDtxManager(), join); } return DtxDemarcationStartResult(XA_OK); } catch (const DtxTimeoutException& e) { diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index 7f8eaac335..5bc9d5142a 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -32,7 +32,7 @@ class DtxHandlerImpl public framing::AMQP_ServerOperations::DtxDemarcationHandler { public: - DtxHandlerImpl(Session&); + DtxHandlerImpl(SemanticState&); // DtxCoordinationHandler: @@ -57,8 +57,6 @@ public: void select(); framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); - - }; diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h index c06188d3c0..0250805f52 100644 --- a/cpp/src/qpid/broker/HandlerImpl.h +++ b/cpp/src/qpid/broker/HandlerImpl.h @@ -19,9 +19,8 @@ * */ -#include "Session.h" -#include "SessionHandler.h" -#include "Connection.h" +#include "SemanticState.h" +#include "SessionState.h" namespace qpid { namespace broker { @@ -34,26 +33,14 @@ class Broker; */ class HandlerImpl { protected: - HandlerImpl(Session& s) : session(s) {} + SemanticState& state; + SessionState& session; - Session& getSession() { return session; } - const Session& getSession() const { return session; } - - SessionHandler* getSessionHandler() { return session.getHandler(); } - const SessionHandler* getSessionHandler() const { return session.getHandler(); } + HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} - // Remaining functions may only be called if getSessionHandler() != 0 - framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); } - const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); } - - Connection& getConnection() { return getSessionHandler()->getConnection(); } - const Connection& getConnection() const { return getSessionHandler()->getConnection(); } - - Broker& getBroker() { return getConnection().broker; } - const Broker& getBroker() const { return getConnection().broker; } - - private: - Session& session; + framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } + Connection& getConnection() { return session.getConnection(); } + Broker& getBroker() { return session.getBroker(); } }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index a31ac78aa4..3d197e185d 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -18,7 +18,6 @@ #include "qpid/QpidError.h" #include "MessageHandlerImpl.h" -#include "Session.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" #include "Broker.h" @@ -36,8 +35,7 @@ namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Session& session) - : HandlerImpl(session) {} +MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {} // // Message class method handlers @@ -46,7 +44,7 @@ MessageHandlerImpl::MessageHandlerImpl(Session& session) void MessageHandlerImpl::cancel(const string& destination ) { - getSession().cancel(destination); + state.cancel(destination); } void @@ -97,14 +95,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, bool exclusive, const framing::FieldTable& filter ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); - if(!destination.empty() && getSession().exists(destination)) + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; //NB: am assuming pre-acquired = 0 as discussed on SIG list as is //the previously expected behaviour - getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), + state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); @@ -117,9 +115,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& destination, bool noAck ) { - Queue::shared_ptr queue = getSession().getQueue(queueName); + Queue::shared_ptr queue = state.getQueue(queueName); - if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ + if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){ //don't send any response... rely on execution completion } else { //temporarily disabled: @@ -148,14 +146,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, bool /*global*/ ) { //TODO: handle global - getSession().setPrefetchSize(prefetchSize); - getSession().setPrefetchCount(prefetchCount); + state.setPrefetchSize(prefetchSize); + state.setPrefetchCount(prefetchCount); } void MessageHandlerImpl::recover(bool requeue) { - getSession().recover(requeue); + state.recover(requeue); } void @@ -166,7 +164,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/ } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().reject(i->getValue(), (++i)->getValue()); + state.reject(i->getValue(), (++i)->getValue()); } } @@ -175,10 +173,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i if (unit == 0) { //message - getSession().addMessageCredit(destination, value); + state.addMessageCredit(destination, value); } else if (unit == 1) { //bytes - getSession().addByteCredit(destination, value); + state.addByteCredit(destination, value); } else { //unknown throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); @@ -190,10 +188,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) { if (mode == 0) { //credit - getSession().setCreditMode(destination); + state.setCreditMode(destination); } else if (mode == 1) { //window - getSession().setWindowMode(destination); + state.setWindowMode(destination); } else{ throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); } @@ -201,12 +199,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) void MessageHandlerImpl::flush(const std::string& destination) { - getSession().flush(destination); + state.flush(destination); } void MessageHandlerImpl::stop(const std::string& destination) { - getSession().stop(destination); + state.stop(destination); } void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) @@ -218,7 +216,7 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /* } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().acquire(i->getValue(), (++i)->getValue(), results); + state.acquire(i->getValue(), (++i)->getValue(), results); } results = results.condense(); @@ -232,7 +230,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers) } for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) { - getSession().release(i->getValue(), (++i)->getValue()); + state.release(i->getValue(), (++i)->getValue()); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index e4d66428d1..d90159d4f7 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -37,7 +37,7 @@ class MessageHandlerImpl : public HandlerImpl { public: - MessageHandlerImpl(Session&); + MessageHandlerImpl(SemanticState&); void append(const std::string& reference, const std::string& bytes); @@ -87,8 +87,8 @@ class MessageHandlerImpl : void release(const framing::SequenceNumberSet& transfers); void subscribe(u_int16_t ticket, - const string& queue, - const string& destination, + const std::string& queue, + const std::string& destination, bool noLocal, u_int8_t confirmMode, u_int8_t acquireMode, diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f8d76c3b5f..0bb813ebfd 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -20,12 +20,12 @@ */ #include "SemanticHandler.h" -#include "Session.h" +#include "SemanticState.h" #include "SessionHandler.h" +#include "SessionState.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" #include "Connection.h" -#include "Session.h" #include "qpid/framing/ExecutionCompleteBody.h" #include "qpid/framing/ExecutionResultBody.h" #include "qpid/framing/InvocationVisitor.h" @@ -36,7 +36,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {} +SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {} void SemanticHandler::handle(framing::AMQFrame& frame) { @@ -79,13 +79,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - getSession().ackCumulative(mark.getValue()); + state.ackCumulative(mark.getValue()); } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) { - getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } @@ -95,9 +95,9 @@ void SemanticHandler::sendCompletion() SequenceNumber mark = incoming.getMark(); SequenceNumberSet range = incoming.getRange(); Mutex::ScopedLock l(outLock); - assert(getSessionHandler()); - getProxy().getExecution().complete(mark.getValue(), range); + session.getProxy().getExecution().complete(mark.getValue(), range); } + void SemanticHandler::flush() { incoming.flush(); @@ -122,7 +122,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) void SemanticHandler::handleCommand(framing::AMQMethodBody* method) { SequenceNumber id = incoming.next(); - BrokerAdapter adapter(getSession()); + BrokerAdapter adapter(state); InvocationVisitor v(&adapter); method->accept(v); incoming.complete(id); @@ -130,7 +130,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) if (!v.wasHandled()) { throw ConnectionException(540, "Not implemented"); } else if (v.hasResult()) { - getProxy().getExecution().result(id.getValue(), v.getResult()); + session.getProxy().getExecution().result(id.getValue(), v.getResult()); } //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -152,8 +152,8 @@ void SemanticHandler::handleContent(AMQFrame& frame) } msgBuilder.handle(frame); if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags - msg->setPublisher(&getConnection()); - getSession().handle(msg); + msg->setPublisher(&session.getConnection()); + state.handle(msg); msgBuilder.end(); incoming.track(msg); //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); } @@ -163,13 +163,17 @@ void SemanticHandler::handleContent(AMQFrame& frame) DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax()); + MessageDelivery::deliver( + msg, session.getHandler().out, + ++outgoing.hwm, token, + session.getConnection().getFrameMax()); return outgoing.hwm; } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) { - MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax()); + MessageDelivery::deliver(msg, session.getHandler().out, tag, token, + session.getConnection().getFrameMax()); } SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 4b3a05ba19..d6dbf878c9 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -44,13 +44,17 @@ class AMQHeaderBody; namespace broker { -class Session; +class SessionState; class SemanticHandler : public DeliveryAdapter, - public framing::FrameHandler, - public framing::AMQP_ServerOperations::ExecutionHandler, - private HandlerImpl + public framing::FrameHandler, + public framing::AMQP_ServerOperations::ExecutionHandler + { + SemanticState state; + SessionState& session; + // FIXME aconway 2007-09-20: Why are these on the handler rather than the + // state? IncomingExecutionContext incoming; framing::Window outgoing; sys::Mutex outLock; @@ -69,8 +73,12 @@ class SemanticHandler : public DeliveryAdapter, DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag); + framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } + Connection& getConnection() { return session.getConnection(); } + Broker& getBroker() { return session.getBroker(); } + public: - SemanticHandler(Session& session); + SemanticHandler(SessionState& session); //frame handler: void handle(framing::AMQFrame& frame); diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/SemanticState.cpp index d379b40d3f..059f99077c 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -19,8 +19,7 @@ * */ -#include "Session.h" - +#include "SessionState.h" #include "BrokerAdapter.h" #include "BrokerQueue.h" #include "Connection.h" @@ -56,11 +55,9 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -Session::Session(SessionHandler& a, uint32_t t) - : adapter(&a), - broker(adapter->getConnection().broker), - timeout(t), - id(true), +SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss) + : session(ss), + deliveryAdapter(da), prefetchSize(0), prefetchCount(0), tagGenerator("sgen"), @@ -69,28 +66,21 @@ Session::Session(SessionHandler& a, uint32_t t) flowActive(true) { outstanding.reset(); - std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this)); - // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState. - deliveryAdapter=semantic.get(); - handlers.push_back(semantic.release()); - in = &handlers[0]; - out = &adapter->out; - // FIXME aconway 2007-08-31: handlerupdater->sessionupdater, - // create a SessionManager in the broker for all session related - // stuff: suspended sessions, handler updaters etc. - // FIXME aconway 2007-08-31: Shouldn't be passing channel ID - broker.update(a.getChannel(), *this); } -Session::~Session() { - close(); +SemanticState::~SemanticState() { + consumers.clear(); + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } + recover(true); } -bool Session::exists(const string& consumerTag){ +bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, +void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire, bool exclusive, const FieldTable*) { @@ -101,7 +91,7 @@ void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut, consumers.insert(tagInOut, c.release()); } -void Session::cancel(const string& tag){ +void SemanticState::cancel(const string& tag){ // consumers is a ptr_map so erase will delete the consumer // which will call cancel. ConsumerImplMap::iterator i = consumers.find(tag); @@ -109,22 +99,13 @@ void Session::cancel(const string& tag){ consumers.erase(i); } -void Session::close() -{ - opened = false; - consumers.clear(); - if (dtxBuffer.get()) { - dtxBuffer->fail(); - } - recover(true); -} -void Session::startTx() +void SemanticState::startTx() { txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Session::commit(MessageStore* const store) +void SemanticState::commit(MessageStore* const store) { if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); @@ -135,7 +116,7 @@ void Session::commit(MessageStore* const store) } } -void Session::rollback() +void SemanticState::rollback() { if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); @@ -143,12 +124,12 @@ void Session::rollback() accumulatedAck.clear(); } -void Session::selectDtx() +void SemanticState::selectDtx() { dtxSelected = true; } -void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join) +void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) { if (!dtxSelected) { throw ConnectionException(503, "Session has not been selected for use with dtx"); @@ -162,7 +143,7 @@ void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join) } } -void Session::endDtx(const std::string& xid, bool fail) +void SemanticState::endDtx(const std::string& xid, bool fail) { if (!dtxBuffer) { throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); @@ -183,7 +164,7 @@ void Session::endDtx(const std::string& xid, bool fail) dtxBuffer.reset(); } -void Session::suspendDtx(const std::string& xid) +void SemanticState::suspendDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") @@ -195,7 +176,7 @@ void Session::suspendDtx(const std::string& xid) dtxBuffer->setSuspended(true); } -void Session::resumeDtx(const std::string& xid) +void SemanticState::resumeDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") @@ -210,7 +191,7 @@ void Session::resumeDtx(const std::string& xid) txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); } -void Session::checkDtxTimeout() +void SemanticState::checkDtxTimeout() { if (dtxBuffer->isExpired()) { dtxBuffer.reset(); @@ -218,13 +199,13 @@ void Session::checkDtxTimeout() } } -void Session::record(const DeliveryRecord& delivery) +void SemanticState::record(const DeliveryRecord& delivery) { unacked.push_back(delivery); delivery.addTo(outstanding); } -bool Session::checkPrefetch(Message::shared_ptr& msg) +bool SemanticState::checkPrefetch(Message::shared_ptr& msg) { Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); @@ -232,7 +213,7 @@ bool Session::checkPrefetch(Message::shared_ptr& msg) return countOk && sizeOk; } -Session::ConsumerImpl::ConsumerImpl(Session* _parent, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, DeliveryToken::shared_ptr _token, const string& _name, Queue::shared_ptr _queue, @@ -253,9 +234,10 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent, msgCredit(0), byteCredit(0) {} -bool Session::ConsumerImpl::deliver(QueuedMessage& msg) +bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { - if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) { + if (nolocal && + &parent->getSession().getConnection() == msg.payload->getPublisher()) { return false; } else { if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { @@ -266,7 +248,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg) Mutex::ScopedLock locker(parent->deliveryLock); DeliveryId deliveryTag = - parent->deliveryAdapter->deliver(msg.payload, token); + parent->deliveryAdapter.deliver(msg.payload, token); if (windowing || ackExpected) { parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); } @@ -275,7 +257,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg) } } -bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg) +bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg) { Mutex::ScopedLock l(lock); if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { @@ -291,33 +273,34 @@ bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg) } } -void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { +void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { Mutex::ScopedLock locker(parent->deliveryLock); - parent->deliveryAdapter->redeliver(msg, token, deliveryTag); + parent->deliveryAdapter.redeliver(msg, token, deliveryTag); } -Session::ConsumerImpl::~ConsumerImpl() { +SemanticState::ConsumerImpl::~ConsumerImpl() { cancel(); } -void Session::ConsumerImpl::cancel() +void SemanticState::ConsumerImpl::cancel() { if(queue) { queue->cancel(this); if (queue->canAutoDelete()) { - parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(), - boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); + parent->getSession().getBroker().getQueues().destroyIf( + queue->getName(), + boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue)); } } } -void Session::ConsumerImpl::requestDispatch() +void SemanticState::ConsumerImpl::requestDispatch() { if(blocked) queue->requestDispatch(this); } -void Session::handle(Message::shared_ptr msg) { +void SemanticState::handle(Message::shared_ptr msg) { if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); @@ -329,10 +312,10 @@ void Session::handle(Message::shared_ptr msg) { } } -void Session::route(Message::shared_ptr msg, Deliverable& strategy) { +void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName){ - cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName); + cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName); } cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -347,17 +330,17 @@ void Session::route(Message::shared_ptr msg, Deliverable& strategy) { } -void Session::ackCumulative(DeliveryId id) +void SemanticState::ackCumulative(DeliveryId id) { ack(id, id, true); } -void Session::ackRange(DeliveryId first, DeliveryId last) +void SemanticState::ackRange(DeliveryId first, DeliveryId last) { ack(first, last, false); } -void Session::ack(DeliveryId first, DeliveryId last, bool cumulative) +void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery @@ -373,7 +356,7 @@ void Session::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&Session::acknowledged, this, _1)); + for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -398,7 +381,7 @@ void Session::ack(DeliveryId first, DeliveryId last, bool cumulative) for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); } -void Session::acknowledged(const DeliveryRecord& delivery) +void SemanticState::acknowledged(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); @@ -407,7 +390,7 @@ void Session::acknowledged(const DeliveryRecord& delivery) } } -void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) { if (windowing) { if (msgCredit != 0xFFFFFFFF) msgCredit++; @@ -415,7 +398,7 @@ void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) } } -void Session::recover(bool requeue) +void SemanticState::recover(bool requeue) { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery @@ -431,12 +414,12 @@ void Session::recover(bool requeue) } } -bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) +bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) { QueuedMessage msg = queue->dequeue(); if(msg.payload){ Mutex::ScopedLock locker(deliveryLock); - DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token); + DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -446,7 +429,7 @@ bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool } } -void Session::deliver(Message::shared_ptr& msg, const string& consumerTag, +void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag) { ConsumerImplMap::iterator i = consumers.find(consumerTag); @@ -455,7 +438,7 @@ void Session::deliver(Message::shared_ptr& msg, const string& consumerTag, } } -void Session::flow(bool active) +void SemanticState::flow(bool active) { Mutex::ScopedLock locker(deliveryLock); bool requestDelivery(!flowActive && active); @@ -467,7 +450,7 @@ void Session::flow(bool active) } -Session::ConsumerImpl& Session::find(const std::string& destination) +SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) { ConsumerImplMap::iterator i = consumers.find(destination); if (i == consumers.end()) { @@ -477,62 +460,62 @@ Session::ConsumerImpl& Session::find(const std::string& destination) } } -void Session::setWindowMode(const std::string& destination) +void SemanticState::setWindowMode(const std::string& destination) { find(destination).setWindowMode(); } -void Session::setCreditMode(const std::string& destination) +void SemanticState::setCreditMode(const std::string& destination) { find(destination).setCreditMode(); } -void Session::addByteCredit(const std::string& destination, uint32_t value) +void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { find(destination).addByteCredit(value); } -void Session::addMessageCredit(const std::string& destination, uint32_t value) +void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { find(destination).addMessageCredit(value); } -void Session::flush(const std::string& destination) +void SemanticState::flush(const std::string& destination) { ConsumerImpl& c = find(destination); c.flush(); } -void Session::stop(const std::string& destination) +void SemanticState::stop(const std::string& destination) { find(destination).stop(); } -void Session::ConsumerImpl::setWindowMode() +void SemanticState::ConsumerImpl::setWindowMode() { windowing = true; } -void Session::ConsumerImpl::setCreditMode() +void SemanticState::ConsumerImpl::setCreditMode() { windowing = false; } -void Session::ConsumerImpl::addByteCredit(uint32_t value) +void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { byteCredit += value; requestDispatch(); } -void Session::ConsumerImpl::addMessageCredit(uint32_t value) +void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { msgCredit += value; requestDispatch(); } -void Session::ConsumerImpl::flush() +void SemanticState::ConsumerImpl::flush() { //need to prevent delivery after requestDispatch returns but //before credit is reduced to zero; TODO: come up with better @@ -543,13 +526,13 @@ void Session::ConsumerImpl::flush() msgCredit = 0; } -void Session::ConsumerImpl::stop() +void SemanticState::ConsumerImpl::stop() { msgCredit = 0; byteCredit = 0; } -Queue::shared_ptr Session::getQueue(const string& name) const { +Queue::shared_ptr SemanticState::getQueue(const string& name) const { //Note: this can be removed soon as the default queue for sessions is scrapped in 0-10 Queue::shared_ptr queue; if (name.empty()) { @@ -558,14 +541,14 @@ Queue::shared_ptr Session::getQueue(const string& name) const { throw NotAllowedException(QPID_MSG("No queue name specified.")); } else { - queue = getBroker().getQueues().find(name); + queue = session.getBroker().getQueues().find(name); if (!queue) throw NotFoundException(QPID_MSG("Queue not found: "<<name)); } return queue; } -AckRange Session::findRange(DeliveryId first, DeliveryId last) +AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) { ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); ack_iterator end = start; @@ -582,21 +565,21 @@ AckRange Session::findRange(DeliveryId first, DeliveryId last) return AckRange(start, end); } -void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired) +void SemanticState::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired) { Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); for_each(range.start, range.end, AcquireFunctor(acquired)); } -void Session::release(DeliveryId first, DeliveryId last) +void SemanticState::release(DeliveryId first, DeliveryId last) { Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release)); } -void Session::reject(DeliveryId first, DeliveryId last) +void SemanticState::reject(DeliveryId first, DeliveryId last) { Mutex::ScopedLock locker(deliveryLock); AckRange range = findRange(first, last); diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/SemanticState.h index 80f1159f04..6147380714 100644 --- a/cpp/src/qpid/broker/Session.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_SESSION_H -#define QPID_BROKER_SESSION_H +#ifndef QPID_BROKER_SEMANTICSTATE_H +#define QPID_BROKER_SEMANTICSTATE_H /* * @@ -37,7 +37,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/shared_ptr.h" -#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/ptr_container/ptr_map.hpp> #include <list> #include <vector> @@ -45,21 +45,19 @@ namespace qpid { namespace broker { -class SessionHandler; -class Broker; +class SessionState; /** - * Session holds the state of an open session, whether attached to a - * channel or suspended. It also holds the handler chains associated - * with the session. + * SemanticState holds the L3 and L4 state of an open session, whether + * attached to a channel or suspended. */ -class Session : public framing::FrameHandler::Chains, - private boost::noncopyable +class SemanticState : public framing::FrameHandler::Chains, + private boost::noncopyable { class ConsumerImpl : public Consumer { sys::Mutex lock; - Session* const parent; + SemanticState* const parent; const DeliveryToken::shared_ptr token; const string name; const Queue::shared_ptr queue; @@ -74,7 +72,7 @@ class Session : public framing::FrameHandler::Chains, bool checkCredit(Message::shared_ptr& msg); public: - ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token, + ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, const string& name, Queue::shared_ptr queue, bool ack, bool nolocal, bool acquire); ~ConsumerImpl(); @@ -94,13 +92,8 @@ class Session : public framing::FrameHandler::Chains, typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; - SessionHandler* adapter; - Broker& broker; - uint32_t timeout; - framing::Uuid id; - boost::ptr_vector<framing::FrameHandler> handlers; - - DeliveryAdapter* deliveryAdapter; + SessionState& session; + DeliveryAdapter& deliveryAdapter; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; uint32_t prefetchSize; @@ -113,7 +106,6 @@ class Session : public framing::FrameHandler::Chains, DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; framing::AccumulatedAck accumulatedAck; - bool opened; bool flowActive; boost::shared_ptr<Exchange> cacheExchange; @@ -128,19 +120,10 @@ class Session : public framing::FrameHandler::Chains, AckRange findRange(DeliveryId first, DeliveryId last); public: - Session(SessionHandler&, uint32_t timeout); - ~Session(); - - /** Returns 0 if this session is not currently attached */ - SessionHandler* getHandler() { return adapter; } - const SessionHandler* getHandler() const { return adapter; } + SemanticState(DeliveryAdapter&, SessionState&); + ~SemanticState(); - Broker& getBroker() const { return broker; } - - /** Session timeout, aka detached-lifetime. */ - uint32_t getTimeout() const { return timeout; } - /** Session ID */ - const framing::Uuid& getId() const { return id; } + SessionState& getSession() { return session; } /** * Get named queue, never returns 0. @@ -174,7 +157,6 @@ class Session : public framing::FrameHandler::Chains, void stop(const std::string& destination); bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); - void close(); void startTx(); void commit(MessageStore* const store); void rollback(); @@ -198,4 +180,5 @@ class Session : public framing::FrameHandler::Chains, -#endif /*!QPID_BROKER_SESSION_H*/ + +#endif /*!QPID_BROKER_SEMANTICSTATE_H*/ diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 01ce88059a..13e5c247be 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -19,7 +19,7 @@ */ #include "SessionHandler.h" -#include "Session.h" +#include "SessionState.h" #include "Connection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/constants.h" @@ -94,7 +94,7 @@ void SessionHandler::assertClosed(const char* method) { void SessionHandler::open(uint32_t detachedLifetime) { assertClosed("open"); - session.reset(new Session(*this, detachedLifetime)); + session.reset(new SessionState(*this, detachedLifetime)); getProxy().getSession().attached(session->getId(), session->getTimeout()); } diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index a9c0f69985..5ae5b5cfee 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -31,7 +31,7 @@ namespace qpid { namespace broker { class Connection; -class Session; +class SessionState; /** * A SessionHandler is associated with each active channel. It @@ -48,8 +48,8 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, ~SessionHandler(); /** Returns 0 if not attached to a session */ - Session* getSession() { return session.get(); } - const Session* getSession() const { return session.get(); } + SessionState* getSession() { return session.get(); } + const SessionState* getSession() const { return session.get(); } framing::ChannelId getChannel() const { return channel; } @@ -84,7 +84,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, Connection& connection; const framing::ChannelId channel; framing::AMQP_ClientProxy proxy; - shared_ptr<Session> session; + shared_ptr<SessionState> session; bool ignoring; }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp new file mode 100644 index 0000000000..acfb3bfea8 --- /dev/null +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -0,0 +1,63 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionState.h" +#include "SessionHandler.h" +#include "Connection.h" +#include "Broker.h" +#include "SemanticHandler.h" + +namespace qpid { +namespace broker { + +using namespace framing; + +SessionState::SessionState(SessionHandler& h, uint32_t timeout_) + : handler(&h), id(true), timeout(timeout_), + broker(h.getConnection().broker), + version(h.getConnection().getVersion()) +{ + // FIXME aconway 2007-09-21: Break dependnecy - broker updates session. + chain.push_back(new SemanticHandler(*this)); + in = &chain[0]; // Incoming frame to handler chain. + out = &handler->out; // Outgoing frames to SessionHandler + + // FIXME aconway 2007-09-20: use broker to add plugin + // handlers to the chain. + // FIXME aconway 2007-08-31: Shouldn't be passing channel ID. + broker.update(handler->getChannel(), *this); +} + +SessionHandler& SessionState::getHandler() { + assert(isAttached()); + return *handler; +} + +AMQP_ClientProxy& SessionState::getProxy() { + return getHandler().getProxy(); +} + /** Convenience for: getHandler()->getConnection() + *@pre getHandler() != 0 + */ +Connection& SessionState::getConnection() { + return getHandler().getConnection(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 7558ea7866..1334cc7005 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -23,44 +23,73 @@ */ #include "qpid/framing/Uuid.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/ProtocolVersion.h" + +#include <boost/ptr_container/ptr_vector.hpp> +#include <boost/noncopyable.hpp> + namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + namespace broker { +class SessionHandler; +class Broker; +class Connection; + /** * State of a session. + * + * An attached session has a SessionHandler which is attached to a + * connection. A suspended session has no handler. + * + * A SessionState is always associated with an open session (attached or + * suspended) it is destroyed when the session is closed. + * + * The SessionState includes the sessions handler chains, which may + * themselves have state. The handlers will be preserved as long as + * the session is alive. */ -class SessionState +class SessionState : public framing::FrameHandler::Chains, + private boost::noncopyable { public: - enum State { CLOSED, ACTIVE, SUSPENDED }; + /** SessionState for a newly opened connection. */ + SessionState(SessionHandler& h, uint32_t timeout_); - /** Initially in CLOSED state */ - SessionState() : id(false), state(CLOSED), timeout(0) {} + bool isAttached() { return handler; } - /** Make CLOSED session ACTIVE, assigns a new UUID. - * #@param timeout in seconds - */ - void open(u_int32_t timeout_) { - state=ACTIVE; id.generate(); timeout=timeout_; - } + /** @pre isAttached() */ + SessionHandler& getHandler(); - /** Close a session. */ - void close() { state=CLOSED; id.clear(); timeout=0; } + /** @pre isAttached() */ + framing::AMQP_ClientProxy& getProxy(); + + /** @pre isAttached() */ + Connection& getConnection(); - State getState() const { return state; } const framing::Uuid& getId() const { return id; } uint32_t getTimeout() const { return timeout; } - - bool isOpen() { return state == ACTIVE; } - bool isClosed() { return state == CLOSED; } - bool isSuspended() { return state == SUSPENDED; } + Broker& getBroker() { return broker; } + framing::ProtocolVersion getVersion() const { return version; } + private: - friend class SuspendedSessions; + friend class SessionHandler; // Only SessionHandler can attach/detach + void detach() { handler=0; } + void attach(SessionHandler& h) { handler = &h; } + + SessionHandler* handler; framing::Uuid id; - State state; uint32_t timeout; + Broker& broker; + boost::ptr_vector<framing::FrameHandler> chain; + framing::ProtocolVersion version; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SuspendedSessions.h b/cpp/src/qpid/broker/SuspendedSessions.h index 03c5df27ed..d3a0c17050 100644 --- a/cpp/src/qpid/broker/SuspendedSessions.h +++ b/cpp/src/qpid/broker/SuspendedSessions.h @@ -31,8 +31,10 @@ namespace qpid { namespace broker { -/** Collection of suspended sessions. - * Thread safe. +/** + * Thread safe collection of suspended sessions. + * Every session is owned either by a connection's SessionHandler + * or by the SuspendedSessions. */ class SuspendedSessions { typedef std::multimap<sys::AbsTime,SessionState> Map; |