summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-09-21 18:26:37 +0000
committerAlan Conway <aconway@apache.org>2007-09-21 18:26:37 +0000
commit2f6d6ad7efd788b71204af67dff51b6233881e2e (patch)
treea3d123bc112d12dfcef341a312f418624c98e342 /cpp
parent3b80f903b6174b4346d7d7b537d783f628fe28d6 (diff)
downloadqpid-python-2f6d6ad7efd788b71204af67dff51b6233881e2e.tar.gz
Split broker::Session into:
broker::SessionState: session info (uuid etc.) + handler chains. broker::SemanticState: session state for the SemanticHandler. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@578219 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rwxr-xr-xcpp/rubygen/generate4
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp48
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h36
-rw-r--r--cpp/src/qpid/broker/Connection.cpp7
-rw-r--r--cpp/src/qpid/broker/Connection.h3
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp4
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp15
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h4
-rw-r--r--cpp/src/qpid/broker/HandlerImpl.h29
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp40
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h6
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp30
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h18
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp (renamed from cpp/src/qpid/broker/Session.cpp)159
-rw-r--r--cpp/src/qpid/broker/SemanticState.h (renamed from cpp/src/qpid/broker/Session.h)51
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h8
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp63
-rw-r--r--cpp/src/qpid/broker/SessionState.h67
-rw-r--r--cpp/src/qpid/broker/SuspendedSessions.h6
22 files changed, 331 insertions, 281 deletions
diff --git a/cpp/rubygen/generate b/cpp/rubygen/generate
index dba3b81273..dba39dbf80 100755
--- a/cpp/rubygen/generate
+++ b/cpp/rubygen/generate
@@ -49,9 +49,9 @@ if makefile
rgen_generator=#{make_continue rgen_generator}
-rgen_client_cpp=#{make_continue(rgen_srcs.grep %r|/qpid/client/.+\.cpp$|)}
+rgen_client_cpp=#{make_continue(rgen_srcs.grep(%r|/qpid/client/.+\.cpp$|))}
-rgen_common_cpp=#{make_continue(rgen_srcs.grep %r|qpid/framing/.+\.cpp$|)}
+rgen_common_cpp=#{make_continue(rgen_srcs.grep(%r|qpid/framing/.+\.cpp$|))}
rgen_srcs=#{make_continue rgen_srcs}
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index cf7029dabc..bdac539d92 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -187,8 +187,10 @@ libqpidbroker_la_SOURCES = \
qpid/broker/RecoveryManagerImpl.cpp \
qpid/broker/RecoveredEnqueue.cpp \
qpid/broker/RecoveredDequeue.cpp \
- qpid/broker/Session.h \
- qpid/broker/Session.cpp \
+ qpid/broker/SemanticState.h \
+ qpid/broker/SemanticState.cpp \
+ qpid/broker/SessionState.h \
+ qpid/broker/SessionState.cpp \
qpid/broker/SessionHandler.h \
qpid/broker/SessionHandler.cpp \
qpid/broker/SemanticHandler.cpp \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index c266b36dfb..0fb521d626 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -16,8 +16,6 @@
*
*/
#include "BrokerAdapter.h"
-#include "Session.h"
-#include "SessionHandler.h"
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
@@ -38,7 +36,7 @@ typedef std::vector<Queue::shared_ptr> QueueVector;
// by the handlers responsible for those classes.
//
-BrokerAdapter::BrokerAdapter(Session& s) :
+BrokerAdapter::BrokerAdapter(SemanticState& s) :
HandlerImpl(s),
basicHandler(s),
exchangeHandler(s),
@@ -153,7 +151,7 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
- Queue::shared_ptr queue = getSession().getQueue(name);
+ Queue::shared_ptr queue = state.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
return QueueQueryResult(queue->getName(),
@@ -176,7 +174,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
}
Queue::shared_ptr queue;
if (passive && !name.empty()) {
- queue = getSession().getQueue(name);
+ queue = state.getQueue(name);
//TODO: check alternate-exchange is as expected
} else {
std::pair<Queue::shared_ptr, bool> queue_created =
@@ -187,7 +185,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- getSession().setDefaultQueue(queue);
+ state.setDefaultQueue(queue);
if (alternate) {
queue->setAlternateExchange(alternate);
alternate->incAlternateUsers();
@@ -216,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
const string& exchangeName, const string& routingKey,
const FieldTable& arguments){
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
if(exchange){
string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
@@ -239,7 +237,7 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
const string& routingKey,
const qpid::framing::FieldTable& arguments )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
@@ -252,12 +250,12 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- getSession().getQueue(queue)->purge();
+ state.getQueue(queue)->purge();
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
ChannelException error(0, "");
- Queue::shared_ptr q = getSession().getQueue(queue);
+ Queue::shared_ptr q = state.getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
}else if(ifUnused && q->getConsumerCount() > 0){
@@ -279,8 +277,8 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
//TODO: handle global
- getSession().setPrefetchSize(prefetchSize);
- getSession().setPrefetchCount(prefetchCount);
+ state.setPrefetchSize(prefetchSize);
+ state.setPrefetchCount(prefetchCount);
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -289,8 +287,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
bool nowait, const FieldTable& fields)
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
- if(!consumerTag.empty() && getSession().exists(consumerTag)){
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!consumerTag.empty() && state.exists(consumerTag)){
throw ConnectionException(530, "Consumer tags must be unique");
}
string newTag = consumerTag;
@@ -298,7 +296,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
//also version specific behaviour now)
if (newTag.empty()) newTag = tagGenerator.generate();
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+ state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
if(!nowait)
getProxy().getBasic().consumeOk(newTag);
@@ -308,13 +306,13 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
}
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- getSession().cancel(consumerTag);
+ state.cancel(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!getSession().get(token, queue, !noAck)){
+ if(!state.get(token, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
getProxy().getBasic().getEmpty(clusterId);
@@ -323,9 +321,9 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que
void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
if (multiple) {
- getSession().ackCumulative(deliveryTag);
+ state.ackCumulative(deliveryTag);
} else {
- getSession().ackRange(deliveryTag, deliveryTag);
+ state.ackRange(deliveryTag, deliveryTag);
}
}
@@ -333,23 +331,23 @@ void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*re
void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
{
- getSession().recover(requeue);
+ state.recover(requeue);
}
void BrokerAdapter::TxHandlerImpl::select()
{
- getSession().startTx();
+ state.startTx();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
- getSession().commit(&getBroker().getStore());
+ state.commit(&getBroker().getStore());
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
- getSession().rollback();
- getSession().recover(false);
+ state.rollback();
+ state.recover(false);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index ec6b4aa0fc..5537dc67f5 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -20,7 +20,7 @@
*/
#include "DtxHandlerImpl.h"
#include "MessageHandlerImpl.h"
-#include "NameGenerator.h"
+
#include "qpid/Exception.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
@@ -44,6 +44,7 @@ class StreamHandler;
class DtxHandler;
class TunnelHandler;
class MessageHandlerImpl;
+class Exchange;
/**
* Per-channel protocol adapter.
@@ -54,16 +55,10 @@ class MessageHandlerImpl;
* peer.
*
*/
-
-// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way
-// to group methods as seen by the BADHANDLERs below.
-// Handlers should be grouped by layer, the BrokerAdapter stuff
-// belongs on the SemanticHandler.
-//
class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
{
public:
- BrokerAdapter(Session& session);
+ BrokerAdapter(SemanticState& session);
BasicHandler* getBasicHandler() { return &basicHandler; }
ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
@@ -73,7 +68,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
MessageHandler* getMessageHandler() { return &messageHandler; }
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
- framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); }
+
+ framing::ProtocolVersion getVersion() const { return session.getVersion();}
AccessHandler* getAccessHandler() {
@@ -99,7 +95,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public HandlerImpl
{
public:
- ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {}
+ ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
void declare(uint16_t ticket,
const std::string& exchange, const std::string& type,
@@ -108,10 +104,13 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused);
- framing::ExchangeQueryResult query(u_int16_t ticket, const string& name);
+ framing::ExchangeQueryResult query(u_int16_t ticket,
+ const std::string& name);
private:
- void checkType(Exchange::shared_ptr exchange, const std::string& type);
- void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate);
+ void checkType(shared_ptr<Exchange> exchange, const std::string& type);
+
+ void checkAlternate(shared_ptr<Exchange> exchange,
+ shared_ptr<Exchange> alternate);
};
class BindingHandlerImpl :
@@ -119,7 +118,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public HandlerImpl
{
public:
- BindingHandlerImpl(Session& session) : HandlerImpl(session) {}
+ BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
framing::BindingQueryResult query(u_int16_t ticket,
const std::string& exchange,
@@ -133,7 +132,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public HandlerImpl
{
public:
- QueueHandlerImpl(Session& session) : HandlerImpl(session) {}
+ QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
void declare(uint16_t ticket, const std::string& queue,
const std::string& alternateExchange,
@@ -148,7 +147,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- framing::QueueQueryResult query(const string& queue);
+ framing::QueueQueryResult query(const std::string& queue);
void purge(uint16_t ticket, const std::string& queue);
void delete_(uint16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty);
@@ -159,9 +158,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public HandlerImpl
{
NameGenerator tagGenerator;
-
public:
- BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {}
+ BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {}
void qos(uint32_t prefetchSize,
uint16_t prefetchCount, bool global);
@@ -181,7 +179,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public HandlerImpl
{
public:
- TxHandlerImpl(Session& session) : HandlerImpl(session) {}
+ TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
void select();
void commit();
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ec6fd6ece7..b1b8abe4fd 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -23,7 +23,7 @@
#include <assert.h>
#include "Connection.h"
-#include "Session.h"
+#include "SessionState.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
@@ -52,8 +52,7 @@ void Connection::received(framing::AMQFrame& frame){
if (frame.getChannel() == 0) {
adapter.handle(frame);
} else {
- SessionHandler sa = getChannel(frame.getChannel());
- sa.in(frame);
+ getChannel(frame.getChannel()).in(frame);
}
}
@@ -94,7 +93,7 @@ void Connection::closeChannel(uint16_t id) {
if (i != channels.end()) channels.erase(i);
}
-SessionHandler Connection::getChannel(ChannelId id) {
+SessionHandler& Connection::getChannel(ChannelId id) {
boost::optional<SessionHandler>& ch = channels[id];
if (!ch) {
ch = boost::in_place(boost::ref(*this), id);
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 2723ac9acc..4f64873dc3 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -35,7 +35,6 @@
#include "qpid/framing/ProtocolVersion.h"
#include "Broker.h"
#include "qpid/Exception.h"
-#include "Session.h"
#include "ConnectionHandler.h"
#include "SessionHandler.h"
@@ -51,7 +50,7 @@ class Connection : public sys::ConnectionInputHandler,
Connection(sys::ConnectionOutputHandler* out, Broker& broker);
/** Get the SessionHandler for channel. Create if it does not already exist */
- SessionHandler getChannel(framing::ChannelId channel);
+ SessionHandler& getChannel(framing::ChannelId channel);
/** Close the connection */
void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 9196fa71a0..36e6c22f88 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -20,7 +20,7 @@
*/
#include "DeliveryRecord.h"
#include "DeliverableMessage.h"
-#include "Session.h"
+#include "SemanticState.h"
#include "BrokerExchange.h"
#include "qpid/log/Statement.h"
@@ -74,7 +74,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const
return range->covers(id);
}
-void DeliveryRecord::redeliver(Session* const session) const{
+void DeliveryRecord::redeliver(SemanticState* const session) const{
if (!confirmed) {
if(pull){
//if message was originally sent as response to get, we must requeue it
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 4d98b0c5da..3c833fcaa8 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -34,7 +34,7 @@
namespace qpid {
namespace broker {
-class Session;
+class SemanticState;
/**
* Record of a delivery for which an ack is outstanding.
@@ -61,7 +61,7 @@ class DeliveryRecord{
void requeue() const;
void release();
void reject();
- void redeliver(Session* const) const;
+ void redeliver(SemanticState* const) const;
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
void subtractFrom(Prefetch&) const;
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 7ed42d285b..5887d13f85 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -19,21 +19,20 @@
#include <boost/format.hpp>
#include "Broker.h"
-#include "Session.h"
#include "qpid/framing/constants.h"
using namespace qpid::broker;
using namespace qpid::framing;
using std::string;
-DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {}
+DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
// DtxDemarcationHandler:
void DtxHandlerImpl::select()
{
- getSession().selectDtx();
+ state.selectDtx();
}
DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -43,7 +42,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
{
try {
if (fail) {
- getSession().endDtx(xid, true);
+ state.endDtx(xid, true);
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
@@ -51,9 +50,9 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
}
} else {
if (suspend) {
- getSession().suspendDtx(xid);
+ state.suspendDtx(xid);
} else {
- getSession().endDtx(xid, false);
+ state.endDtx(xid, false);
}
return DtxDemarcationEndResult(XA_OK);
}
@@ -72,9 +71,9 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
}
try {
if (resume) {
- getSession().resumeDtx(xid);
+ state.resumeDtx(xid);
} else {
- getSession().startDtx(xid, getBroker().getDtxManager(), join);
+ state.startDtx(xid, getBroker().getDtxManager(), join);
}
return DtxDemarcationStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index 7f8eaac335..5bc9d5142a 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -32,7 +32,7 @@ class DtxHandlerImpl
public framing::AMQP_ServerOperations::DtxDemarcationHandler
{
public:
- DtxHandlerImpl(Session&);
+ DtxHandlerImpl(SemanticState&);
// DtxCoordinationHandler:
@@ -57,8 +57,6 @@ public:
void select();
framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
-
-
};
diff --git a/cpp/src/qpid/broker/HandlerImpl.h b/cpp/src/qpid/broker/HandlerImpl.h
index c06188d3c0..0250805f52 100644
--- a/cpp/src/qpid/broker/HandlerImpl.h
+++ b/cpp/src/qpid/broker/HandlerImpl.h
@@ -19,9 +19,8 @@
*
*/
-#include "Session.h"
-#include "SessionHandler.h"
-#include "Connection.h"
+#include "SemanticState.h"
+#include "SessionState.h"
namespace qpid {
namespace broker {
@@ -34,26 +33,14 @@ class Broker;
*/
class HandlerImpl {
protected:
- HandlerImpl(Session& s) : session(s) {}
+ SemanticState& state;
+ SessionState& session;
- Session& getSession() { return session; }
- const Session& getSession() const { return session; }
-
- SessionHandler* getSessionHandler() { return session.getHandler(); }
- const SessionHandler* getSessionHandler() const { return session.getHandler(); }
+ HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
- // Remaining functions may only be called if getSessionHandler() != 0
- framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); }
- const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); }
-
- Connection& getConnection() { return getSessionHandler()->getConnection(); }
- const Connection& getConnection() const { return getSessionHandler()->getConnection(); }
-
- Broker& getBroker() { return getConnection().broker; }
- const Broker& getBroker() const { return getConnection().broker; }
-
- private:
- Session& session;
+ framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
+ Connection& getConnection() { return session.getConnection(); }
+ Broker& getBroker() { return session.getBroker(); }
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index a31ac78aa4..3d197e185d 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -18,7 +18,6 @@
#include "qpid/QpidError.h"
#include "MessageHandlerImpl.h"
-#include "Session.h"
#include "qpid/framing/FramingContent.h"
#include "Connection.h"
#include "Broker.h"
@@ -36,8 +35,7 @@ namespace broker {
using namespace framing;
-MessageHandlerImpl::MessageHandlerImpl(Session& session)
- : HandlerImpl(session) {}
+MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
//
// Message class method handlers
@@ -46,7 +44,7 @@ MessageHandlerImpl::MessageHandlerImpl(Session& session)
void
MessageHandlerImpl::cancel(const string& destination )
{
- getSession().cancel(destination);
+ state.cancel(destination);
}
void
@@ -97,14 +95,14 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
bool exclusive,
const framing::FieldTable& filter )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
- if(!destination.empty() && getSession().exists(destination))
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
throw ConnectionException(530, "Consumer tags must be unique");
string tag = destination;
//NB: am assuming pre-acquired = 0 as discussed on SIG list as is
//the previously expected behaviour
- getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
@@ -117,9 +115,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& destination,
bool noAck )
{
- Queue::shared_ptr queue = getSession().getQueue(queueName);
+ Queue::shared_ptr queue = state.getQueue(queueName);
- if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+ if (state.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
//don't send any response... rely on execution completion
} else {
//temporarily disabled:
@@ -148,14 +146,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
bool /*global*/ )
{
//TODO: handle global
- getSession().setPrefetchSize(prefetchSize);
- getSession().setPrefetchCount(prefetchCount);
+ state.setPrefetchSize(prefetchSize);
+ state.setPrefetchCount(prefetchCount);
}
void
MessageHandlerImpl::recover(bool requeue)
{
- getSession().recover(requeue);
+ state.recover(requeue);
}
void
@@ -166,7 +164,7 @@ MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().reject(i->getValue(), (++i)->getValue());
+ state.reject(i->getValue(), (++i)->getValue());
}
}
@@ -175,10 +173,10 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
if (unit == 0) {
//message
- getSession().addMessageCredit(destination, value);
+ state.addMessageCredit(destination, value);
} else if (unit == 1) {
//bytes
- getSession().addByteCredit(destination, value);
+ state.addByteCredit(destination, value);
} else {
//unknown
throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +188,10 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
{
if (mode == 0) {
//credit
- getSession().setCreditMode(destination);
+ state.setCreditMode(destination);
} else if (mode == 1) {
//window
- getSession().setWindowMode(destination);
+ state.setWindowMode(destination);
} else{
throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
}
@@ -201,12 +199,12 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
void MessageHandlerImpl::flush(const std::string& destination)
{
- getSession().flush(destination);
+ state.flush(destination);
}
void MessageHandlerImpl::stop(const std::string& destination)
{
- getSession().stop(destination);
+ state.stop(destination);
}
void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,7 +216,7 @@ void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().acquire(i->getValue(), (++i)->getValue(), results);
+ state.acquire(i->getValue(), (++i)->getValue(), results);
}
results = results.condense();
@@ -232,7 +230,7 @@ void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
}
for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
- getSession().release(i->getValue(), (++i)->getValue());
+ state.release(i->getValue(), (++i)->getValue());
}
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
index e4d66428d1..d90159d4f7 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.h
@@ -37,7 +37,7 @@ class MessageHandlerImpl :
public HandlerImpl
{
public:
- MessageHandlerImpl(Session&);
+ MessageHandlerImpl(SemanticState&);
void append(const std::string& reference, const std::string& bytes);
@@ -87,8 +87,8 @@ class MessageHandlerImpl :
void release(const framing::SequenceNumberSet& transfers);
void subscribe(u_int16_t ticket,
- const string& queue,
- const string& destination,
+ const std::string& queue,
+ const std::string& destination,
bool noLocal,
u_int8_t confirmMode,
u_int8_t acquireMode,
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index f8d76c3b5f..0bb813ebfd 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -20,12 +20,12 @@
*/
#include "SemanticHandler.h"
-#include "Session.h"
+#include "SemanticState.h"
#include "SessionHandler.h"
+#include "SessionState.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
#include "Connection.h"
-#include "Session.h"
#include "qpid/framing/ExecutionCompleteBody.h"
#include "qpid/framing/ExecutionResultBody.h"
#include "qpid/framing/InvocationVisitor.h"
@@ -36,7 +36,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
+SemanticHandler::SemanticHandler(SessionState& s) : state(*this,s), session(s) {}
void SemanticHandler::handle(framing::AMQFrame& frame)
{
@@ -79,13 +79,13 @@ void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& ran
if (outgoing.lwm < mark) {
outgoing.lwm = mark;
//ack messages:
- getSession().ackCumulative(mark.getValue());
+ state.ackCumulative(mark.getValue());
}
if (range.size() % 2) { //must be even number
throw ConnectionException(530, "Received odd number of elements in ranged mark");
} else {
for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
- getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+ state.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
}
}
}
@@ -95,9 +95,9 @@ void SemanticHandler::sendCompletion()
SequenceNumber mark = incoming.getMark();
SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- assert(getSessionHandler());
- getProxy().getExecution().complete(mark.getValue(), range);
+ session.getProxy().getExecution().complete(mark.getValue(), range);
}
+
void SemanticHandler::flush()
{
incoming.flush();
@@ -122,7 +122,7 @@ void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
SequenceNumber id = incoming.next();
- BrokerAdapter adapter(getSession());
+ BrokerAdapter adapter(state);
InvocationVisitor v(&adapter);
method->accept(v);
incoming.complete(id);
@@ -130,7 +130,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- getProxy().getExecution().result(id.getValue(), v.getResult());
+ session.getProxy().getExecution().result(id.getValue(), v.getResult());
}
//TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
//TODO: if window gets too large send unsolicited completion
@@ -152,8 +152,8 @@ void SemanticHandler::handleContent(AMQFrame& frame)
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&getConnection());
- getSession().handle(msg);
+ msg->setPublisher(&session.getConnection());
+ state.handle(msg);
msgBuilder.end();
incoming.track(msg);
//TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
@@ -163,13 +163,17 @@ void SemanticHandler::handleContent(AMQFrame& frame)
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
+ MessageDelivery::deliver(
+ msg, session.getHandler().out,
+ ++outgoing.hwm, token,
+ session.getConnection().getFrameMax());
return outgoing.hwm;
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
{
- MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
+ MessageDelivery::deliver(msg, session.getHandler().out, tag, token,
+ session.getConnection().getFrameMax());
}
SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index 4b3a05ba19..d6dbf878c9 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -44,13 +44,17 @@ class AMQHeaderBody;
namespace broker {
-class Session;
+class SessionState;
class SemanticHandler : public DeliveryAdapter,
- public framing::FrameHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler,
- private HandlerImpl
+ public framing::FrameHandler,
+ public framing::AMQP_ServerOperations::ExecutionHandler
+
{
+ SemanticState state;
+ SessionState& session;
+ // FIXME aconway 2007-09-20: Why are these on the handler rather than the
+ // state?
IncomingExecutionContext incoming;
framing::Window outgoing;
sys::Mutex outLock;
@@ -69,8 +73,12 @@ class SemanticHandler : public DeliveryAdapter,
DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
+ framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
+ Connection& getConnection() { return session.getConnection(); }
+ Broker& getBroker() { return session.getBroker(); }
+
public:
- SemanticHandler(Session& session);
+ SemanticHandler(SessionState& session);
//frame handler:
void handle(framing::AMQFrame& frame);
diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index d379b40d3f..059f99077c 100644
--- a/cpp/src/qpid/broker/Session.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -19,8 +19,7 @@
*
*/
-#include "Session.h"
-
+#include "SessionState.h"
#include "BrokerAdapter.h"
#include "BrokerQueue.h"
#include "Connection.h"
@@ -56,11 +55,9 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-Session::Session(SessionHandler& a, uint32_t t)
- : adapter(&a),
- broker(adapter->getConnection().broker),
- timeout(t),
- id(true),
+SemanticState::SemanticState(DeliveryAdapter& da, SessionState& ss)
+ : session(ss),
+ deliveryAdapter(da),
prefetchSize(0),
prefetchCount(0),
tagGenerator("sgen"),
@@ -69,28 +66,21 @@ Session::Session(SessionHandler& a, uint32_t t)
flowActive(true)
{
outstanding.reset();
- std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
- // FIXME aconway 2007-08-29: move deliveryAdapter to SemanticHandlerState.
- deliveryAdapter=semantic.get();
- handlers.push_back(semantic.release());
- in = &handlers[0];
- out = &adapter->out;
- // FIXME aconway 2007-08-31: handlerupdater->sessionupdater,
- // create a SessionManager in the broker for all session related
- // stuff: suspended sessions, handler updaters etc.
- // FIXME aconway 2007-08-31: Shouldn't be passing channel ID
- broker.update(a.getChannel(), *this);
}
-Session::~Session() {
- close();
+SemanticState::~SemanticState() {
+ consumers.clear();
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
+ recover(true);
}
-bool Session::exists(const string& consumerTag){
+bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
+void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
Queue::shared_ptr queue, bool nolocal, bool acks, bool acquire,
bool exclusive, const FieldTable*)
{
@@ -101,7 +91,7 @@ void Session::consume(DeliveryToken::shared_ptr token, string& tagInOut,
consumers.insert(tagInOut, c.release());
}
-void Session::cancel(const string& tag){
+void SemanticState::cancel(const string& tag){
// consumers is a ptr_map so erase will delete the consumer
// which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
@@ -109,22 +99,13 @@ void Session::cancel(const string& tag){
consumers.erase(i);
}
-void Session::close()
-{
- opened = false;
- consumers.clear();
- if (dtxBuffer.get()) {
- dtxBuffer->fail();
- }
- recover(true);
-}
-void Session::startTx()
+void SemanticState::startTx()
{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Session::commit(MessageStore* const store)
+void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
@@ -135,7 +116,7 @@ void Session::commit(MessageStore* const store)
}
}
-void Session::rollback()
+void SemanticState::rollback()
{
if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
@@ -143,12 +124,12 @@ void Session::rollback()
accumulatedAck.clear();
}
-void Session::selectDtx()
+void SemanticState::selectDtx()
{
dtxSelected = true;
}
-void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
{
if (!dtxSelected) {
throw ConnectionException(503, "Session has not been selected for use with dtx");
@@ -162,7 +143,7 @@ void Session::startDtx(const std::string& xid, DtxManager& mgr, bool join)
}
}
-void Session::endDtx(const std::string& xid, bool fail)
+void SemanticState::endDtx(const std::string& xid, bool fail)
{
if (!dtxBuffer) {
throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
@@ -183,7 +164,7 @@ void Session::endDtx(const std::string& xid, bool fail)
dtxBuffer.reset();
}
-void Session::suspendDtx(const std::string& xid)
+void SemanticState::suspendDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
@@ -195,7 +176,7 @@ void Session::suspendDtx(const std::string& xid)
dtxBuffer->setSuspended(true);
}
-void Session::resumeDtx(const std::string& xid)
+void SemanticState::resumeDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
@@ -210,7 +191,7 @@ void Session::resumeDtx(const std::string& xid)
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
}
-void Session::checkDtxTimeout()
+void SemanticState::checkDtxTimeout()
{
if (dtxBuffer->isExpired()) {
dtxBuffer.reset();
@@ -218,13 +199,13 @@ void Session::checkDtxTimeout()
}
}
-void Session::record(const DeliveryRecord& delivery)
+void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
delivery.addTo(outstanding);
}
-bool Session::checkPrefetch(Message::shared_ptr& msg)
+bool SemanticState::checkPrefetch(Message::shared_ptr& msg)
{
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
@@ -232,7 +213,7 @@ bool Session::checkPrefetch(Message::shared_ptr& msg)
return countOk && sizeOk;
}
-Session::ConsumerImpl::ConsumerImpl(Session* _parent,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
DeliveryToken::shared_ptr _token,
const string& _name,
Queue::shared_ptr _queue,
@@ -253,9 +234,10 @@ Session::ConsumerImpl::ConsumerImpl(Session* _parent,
msgCredit(0),
byteCredit(0) {}
-bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) {
+ if (nolocal &&
+ &parent->getSession().getConnection() == msg.payload->getPublisher()) {
return false;
} else {
if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
@@ -266,7 +248,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
Mutex::ScopedLock locker(parent->deliveryLock);
DeliveryId deliveryTag =
- parent->deliveryAdapter->deliver(msg.payload, token);
+ parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
}
@@ -275,7 +257,7 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
}
}
-bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
{
Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
@@ -291,33 +273,34 @@ bool Session::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
}
}
-void Session::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
+void SemanticState::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
- parent->deliveryAdapter->redeliver(msg, token, deliveryTag);
+ parent->deliveryAdapter.redeliver(msg, token, deliveryTag);
}
-Session::ConsumerImpl::~ConsumerImpl() {
+SemanticState::ConsumerImpl::~ConsumerImpl() {
cancel();
}
-void Session::ConsumerImpl::cancel()
+void SemanticState::ConsumerImpl::cancel()
{
if(queue) {
queue->cancel(this);
if (queue->canAutoDelete()) {
- parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(),
- boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+ parent->getSession().getBroker().getQueues().destroyIf(
+ queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
}
}
}
-void Session::ConsumerImpl::requestDispatch()
+void SemanticState::ConsumerImpl::requestDispatch()
{
if(blocked)
queue->requestDispatch(this);
}
-void Session::handle(Message::shared_ptr msg) {
+void SemanticState::handle(Message::shared_ptr msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
@@ -329,10 +312,10 @@ void Session::handle(Message::shared_ptr msg) {
}
}
-void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
+void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName){
- cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName);
+ cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName);
}
cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -347,17 +330,17 @@ void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
}
-void Session::ackCumulative(DeliveryId id)
+void SemanticState::ackCumulative(DeliveryId id)
{
ack(id, id, true);
}
-void Session::ackRange(DeliveryId first, DeliveryId last)
+void SemanticState::ackRange(DeliveryId first, DeliveryId last)
{
ack(first, last, false);
}
-void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
+void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
@@ -373,7 +356,7 @@ void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&Session::acknowledged, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -398,7 +381,7 @@ void Session::ack(DeliveryId first, DeliveryId last, bool cumulative)
for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
}
-void Session::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::acknowledged(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
@@ -407,7 +390,7 @@ void Session::acknowledged(const DeliveryRecord& delivery)
}
}
-void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
{
if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -415,7 +398,7 @@ void Session::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
}
}
-void Session::recover(bool requeue)
+void SemanticState::recover(bool requeue)
{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
@@ -431,12 +414,12 @@ void Session::recover(bool requeue)
}
}
-bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
{
QueuedMessage msg = queue->dequeue();
if(msg.payload){
Mutex::ScopedLock locker(deliveryLock);
- DeliveryId myDeliveryTag = deliveryAdapter->deliver(msg.payload, token);
+ DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg.payload, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -446,7 +429,7 @@ bool Session::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool
}
}
-void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
+void SemanticState::deliver(Message::shared_ptr& msg, const string& consumerTag,
DeliveryId deliveryTag)
{
ConsumerImplMap::iterator i = consumers.find(consumerTag);
@@ -455,7 +438,7 @@ void Session::deliver(Message::shared_ptr& msg, const string& consumerTag,
}
}
-void Session::flow(bool active)
+void SemanticState::flow(bool active)
{
Mutex::ScopedLock locker(deliveryLock);
bool requestDelivery(!flowActive && active);
@@ -467,7 +450,7 @@ void Session::flow(bool active)
}
-Session::ConsumerImpl& Session::find(const std::string& destination)
+SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
{
ConsumerImplMap::iterator i = consumers.find(destination);
if (i == consumers.end()) {
@@ -477,62 +460,62 @@ Session::ConsumerImpl& Session::find(const std::string& destination)
}
}
-void Session::setWindowMode(const std::string& destination)
+void SemanticState::setWindowMode(const std::string& destination)
{
find(destination).setWindowMode();
}
-void Session::setCreditMode(const std::string& destination)
+void SemanticState::setCreditMode(const std::string& destination)
{
find(destination).setCreditMode();
}
-void Session::addByteCredit(const std::string& destination, uint32_t value)
+void SemanticState::addByteCredit(const std::string& destination, uint32_t value)
{
find(destination).addByteCredit(value);
}
-void Session::addMessageCredit(const std::string& destination, uint32_t value)
+void SemanticState::addMessageCredit(const std::string& destination, uint32_t value)
{
find(destination).addMessageCredit(value);
}
-void Session::flush(const std::string& destination)
+void SemanticState::flush(const std::string& destination)
{
ConsumerImpl& c = find(destination);
c.flush();
}
-void Session::stop(const std::string& destination)
+void SemanticState::stop(const std::string& destination)
{
find(destination).stop();
}
-void Session::ConsumerImpl::setWindowMode()
+void SemanticState::ConsumerImpl::setWindowMode()
{
windowing = true;
}
-void Session::ConsumerImpl::setCreditMode()
+void SemanticState::ConsumerImpl::setCreditMode()
{
windowing = false;
}
-void Session::ConsumerImpl::addByteCredit(uint32_t value)
+void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
byteCredit += value;
requestDispatch();
}
-void Session::ConsumerImpl::addMessageCredit(uint32_t value)
+void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
msgCredit += value;
requestDispatch();
}
-void Session::ConsumerImpl::flush()
+void SemanticState::ConsumerImpl::flush()
{
//need to prevent delivery after requestDispatch returns but
//before credit is reduced to zero; TODO: come up with better
@@ -543,13 +526,13 @@ void Session::ConsumerImpl::flush()
msgCredit = 0;
}
-void Session::ConsumerImpl::stop()
+void SemanticState::ConsumerImpl::stop()
{
msgCredit = 0;
byteCredit = 0;
}
-Queue::shared_ptr Session::getQueue(const string& name) const {
+Queue::shared_ptr SemanticState::getQueue(const string& name) const {
//Note: this can be removed soon as the default queue for sessions is scrapped in 0-10
Queue::shared_ptr queue;
if (name.empty()) {
@@ -558,14 +541,14 @@ Queue::shared_ptr Session::getQueue(const string& name) const {
throw NotAllowedException(QPID_MSG("No queue name specified."));
}
else {
- queue = getBroker().getQueues().find(name);
+ queue = session.getBroker().getQueues().find(name);
if (!queue)
throw NotFoundException(QPID_MSG("Queue not found: "<<name));
}
return queue;
}
-AckRange Session::findRange(DeliveryId first, DeliveryId last)
+AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
{
ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
ack_iterator end = start;
@@ -582,21 +565,21 @@ AckRange Session::findRange(DeliveryId first, DeliveryId last)
return AckRange(start, end);
}
-void Session::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
+void SemanticState::acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, AcquireFunctor(acquired));
}
-void Session::release(DeliveryId first, DeliveryId last)
+void SemanticState::release(DeliveryId first, DeliveryId last)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::release));
}
-void Session::reject(DeliveryId first, DeliveryId last)
+void SemanticState::reject(DeliveryId first, DeliveryId last)
{
Mutex::ScopedLock locker(deliveryLock);
AckRange range = findRange(first, last);
diff --git a/cpp/src/qpid/broker/Session.h b/cpp/src/qpid/broker/SemanticState.h
index 80f1159f04..6147380714 100644
--- a/cpp/src/qpid/broker/Session.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_SESSION_H
-#define QPID_BROKER_SESSION_H
+#ifndef QPID_BROKER_SEMANTICSTATE_H
+#define QPID_BROKER_SEMANTICSTATE_H
/*
*
@@ -37,7 +37,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/shared_ptr.h"
-#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_map.hpp>
#include <list>
#include <vector>
@@ -45,21 +45,19 @@
namespace qpid {
namespace broker {
-class SessionHandler;
-class Broker;
+class SessionState;
/**
- * Session holds the state of an open session, whether attached to a
- * channel or suspended. It also holds the handler chains associated
- * with the session.
+ * SemanticState holds the L3 and L4 state of an open session, whether
+ * attached to a channel or suspended.
*/
-class Session : public framing::FrameHandler::Chains,
- private boost::noncopyable
+class SemanticState : public framing::FrameHandler::Chains,
+ private boost::noncopyable
{
class ConsumerImpl : public Consumer
{
sys::Mutex lock;
- Session* const parent;
+ SemanticState* const parent;
const DeliveryToken::shared_ptr token;
const string name;
const Queue::shared_ptr queue;
@@ -74,7 +72,7 @@ class Session : public framing::FrameHandler::Chains,
bool checkCredit(Message::shared_ptr& msg);
public:
- ConsumerImpl(Session* parent, DeliveryToken::shared_ptr token,
+ ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
@@ -94,13 +92,8 @@ class Session : public framing::FrameHandler::Chains,
typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
- SessionHandler* adapter;
- Broker& broker;
- uint32_t timeout;
- framing::Uuid id;
- boost::ptr_vector<framing::FrameHandler> handlers;
-
- DeliveryAdapter* deliveryAdapter;
+ SessionState& session;
+ DeliveryAdapter& deliveryAdapter;
Queue::shared_ptr defaultQueue;
ConsumerImplMap consumers;
uint32_t prefetchSize;
@@ -113,7 +106,6 @@ class Session : public framing::FrameHandler::Chains,
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
framing::AccumulatedAck accumulatedAck;
- bool opened;
bool flowActive;
boost::shared_ptr<Exchange> cacheExchange;
@@ -128,19 +120,10 @@ class Session : public framing::FrameHandler::Chains,
AckRange findRange(DeliveryId first, DeliveryId last);
public:
- Session(SessionHandler&, uint32_t timeout);
- ~Session();
-
- /** Returns 0 if this session is not currently attached */
- SessionHandler* getHandler() { return adapter; }
- const SessionHandler* getHandler() const { return adapter; }
+ SemanticState(DeliveryAdapter&, SessionState&);
+ ~SemanticState();
- Broker& getBroker() const { return broker; }
-
- /** Session timeout, aka detached-lifetime. */
- uint32_t getTimeout() const { return timeout; }
- /** Session ID */
- const framing::Uuid& getId() const { return id; }
+ SessionState& getSession() { return session; }
/**
* Get named queue, never returns 0.
@@ -174,7 +157,6 @@ class Session : public framing::FrameHandler::Chains,
void stop(const std::string& destination);
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
- void close();
void startTx();
void commit(MessageStore* const store);
void rollback();
@@ -198,4 +180,5 @@ class Session : public framing::FrameHandler::Chains,
-#endif /*!QPID_BROKER_SESSION_H*/
+
+#endif /*!QPID_BROKER_SEMANTICSTATE_H*/
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 01ce88059a..13e5c247be 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -19,7 +19,7 @@
*/
#include "SessionHandler.h"
-#include "Session.h"
+#include "SessionState.h"
#include "Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
@@ -94,7 +94,7 @@ void SessionHandler::assertClosed(const char* method) {
void SessionHandler::open(uint32_t detachedLifetime) {
assertClosed("open");
- session.reset(new Session(*this, detachedLifetime));
+ session.reset(new SessionState(*this, detachedLifetime));
getProxy().getSession().attached(session->getId(), session->getTimeout());
}
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index a9c0f69985..5ae5b5cfee 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -31,7 +31,7 @@ namespace qpid {
namespace broker {
class Connection;
-class Session;
+class SessionState;
/**
* A SessionHandler is associated with each active channel. It
@@ -48,8 +48,8 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
~SessionHandler();
/** Returns 0 if not attached to a session */
- Session* getSession() { return session.get(); }
- const Session* getSession() const { return session.get(); }
+ SessionState* getSession() { return session.get(); }
+ const SessionState* getSession() const { return session.get(); }
framing::ChannelId getChannel() const { return channel; }
@@ -84,7 +84,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
Connection& connection;
const framing::ChannelId channel;
framing::AMQP_ClientProxy proxy;
- shared_ptr<Session> session;
+ shared_ptr<SessionState> session;
bool ignoring;
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
new file mode 100644
index 0000000000..acfb3bfea8
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionState.h"
+#include "SessionHandler.h"
+#include "Connection.h"
+#include "Broker.h"
+#include "SemanticHandler.h"
+
+namespace qpid {
+namespace broker {
+
+using namespace framing;
+
+SessionState::SessionState(SessionHandler& h, uint32_t timeout_)
+ : handler(&h), id(true), timeout(timeout_),
+ broker(h.getConnection().broker),
+ version(h.getConnection().getVersion())
+{
+ // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
+ chain.push_back(new SemanticHandler(*this));
+ in = &chain[0]; // Incoming frame to handler chain.
+ out = &handler->out; // Outgoing frames to SessionHandler
+
+ // FIXME aconway 2007-09-20: use broker to add plugin
+ // handlers to the chain.
+ // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
+ broker.update(handler->getChannel(), *this);
+}
+
+SessionHandler& SessionState::getHandler() {
+ assert(isAttached());
+ return *handler;
+}
+
+AMQP_ClientProxy& SessionState::getProxy() {
+ return getHandler().getProxy();
+}
+ /** Convenience for: getHandler()->getConnection()
+ *@pre getHandler() != 0
+ */
+Connection& SessionState::getConnection() {
+ return getHandler().getConnection();
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 7558ea7866..1334cc7005 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -23,44 +23,73 @@
*/
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/noncopyable.hpp>
+
namespace qpid {
+
+namespace framing {
+class AMQP_ClientProxy;
+}
+
namespace broker {
+class SessionHandler;
+class Broker;
+class Connection;
+
/**
* State of a session.
+ *
+ * An attached session has a SessionHandler which is attached to a
+ * connection. A suspended session has no handler.
+ *
+ * A SessionState is always associated with an open session (attached or
+ * suspended) it is destroyed when the session is closed.
+ *
+ * The SessionState includes the sessions handler chains, which may
+ * themselves have state. The handlers will be preserved as long as
+ * the session is alive.
*/
-class SessionState
+class SessionState : public framing::FrameHandler::Chains,
+ private boost::noncopyable
{
public:
- enum State { CLOSED, ACTIVE, SUSPENDED };
+ /** SessionState for a newly opened connection. */
+ SessionState(SessionHandler& h, uint32_t timeout_);
- /** Initially in CLOSED state */
- SessionState() : id(false), state(CLOSED), timeout(0) {}
+ bool isAttached() { return handler; }
- /** Make CLOSED session ACTIVE, assigns a new UUID.
- * #@param timeout in seconds
- */
- void open(u_int32_t timeout_) {
- state=ACTIVE; id.generate(); timeout=timeout_;
- }
+ /** @pre isAttached() */
+ SessionHandler& getHandler();
- /** Close a session. */
- void close() { state=CLOSED; id.clear(); timeout=0; }
+ /** @pre isAttached() */
+ framing::AMQP_ClientProxy& getProxy();
+
+ /** @pre isAttached() */
+ Connection& getConnection();
- State getState() const { return state; }
const framing::Uuid& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
-
- bool isOpen() { return state == ACTIVE; }
- bool isClosed() { return state == CLOSED; }
- bool isSuspended() { return state == SUSPENDED; }
+ Broker& getBroker() { return broker; }
+ framing::ProtocolVersion getVersion() const { return version; }
+
private:
- friend class SuspendedSessions;
+ friend class SessionHandler; // Only SessionHandler can attach/detach
+ void detach() { handler=0; }
+ void attach(SessionHandler& h) { handler = &h; }
+
+ SessionHandler* handler;
framing::Uuid id;
- State state;
uint32_t timeout;
+ Broker& broker;
+ boost::ptr_vector<framing::FrameHandler> chain;
+ framing::ProtocolVersion version;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SuspendedSessions.h b/cpp/src/qpid/broker/SuspendedSessions.h
index 03c5df27ed..d3a0c17050 100644
--- a/cpp/src/qpid/broker/SuspendedSessions.h
+++ b/cpp/src/qpid/broker/SuspendedSessions.h
@@ -31,8 +31,10 @@
namespace qpid {
namespace broker {
-/** Collection of suspended sessions.
- * Thread safe.
+/**
+ * Thread safe collection of suspended sessions.
+ * Every session is owned either by a connection's SessionHandler
+ * or by the SuspendedSessions.
*/
class SuspendedSessions {
typedef std::multimap<sys::AbsTime,SessionState> Map;