diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/broker/inc/SessionHandlerImpl.h | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/inc/SessionHandlerImpl.h')
-rw-r--r-- | cpp/broker/inc/SessionHandlerImpl.h | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h new file mode 100644 index 0000000000..14a6404c78 --- /dev/null +++ b/cpp/broker/inc/SessionHandlerImpl.h @@ -0,0 +1,230 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandlerImpl_ +#define _SessionHandlerImpl_ + +#include <map> +#include <sstream> +#include <vector> +#include <exception> +#include "AMQFrame.h" +#include "AMQP_ServerOperations.h" +#include "AutoDelete.h" +#include "ExchangeRegistry.h" +#include "Channel.h" +#include "ConnectionToken.h" +#include "DirectExchange.h" +#include "OutputHandler.h" +#include "ProtocolInitiation.h" +#include "QueueRegistry.h" +#include "SessionContext.h" +#include "SessionHandler.h" +#include "TimeoutHandler.h" +#include "TopicExchange.h" + +namespace qpid { +namespace broker { + +struct ChannelException : public std::exception { + u_int16_t code; + string text; + ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ChannelException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +struct ConnectionException : public std::exception { + u_int16_t code; + string text; + ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ConnectionException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +class SessionHandlerImpl : public virtual qpid::io::SessionHandler, + public virtual qpid::framing::AMQP_ServerOperations, + public virtual ConnectionToken +{ + typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + + qpid::io::SessionContext* context; + QueueRegistry* queues; + ExchangeRegistry* const exchanges; + AutoDelete* const cleaner; + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + + ConnectionHandler* connectionHandler; + ChannelHandler* channelHandler; + BasicHandler* basicHandler; + ExchangeHandler* exchangeHandler; + QueueHandler* queueHandler; + + std::map<u_int16_t, Channel*> channels; + std::vector<Queue::shared_ptr> exclusiveQueues; + + u_int32_t framemax; + u_int16_t heartbeat; + + void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); + void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for channel if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if no queue specified and channel has not declared one. + */ + Queue::shared_ptr getQueue(const string& name, u_int16_t channel); + + Exchange* findExchange(const string& name); + + public: + SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, + ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); + virtual void received(qpid::framing::AMQFrame* frame); + virtual void initiated(qpid::framing::ProtocolInitiation* header); + virtual void idleOut(); + virtual void idleIn(); + virtual void closed(); + virtual ~SessionHandlerImpl(); + + class ConnectionHandlerImpl : public virtual ConnectionHandler{ + SessionHandlerImpl* parent; + public: + inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism, + string& response, string& locale); + + virtual void secureOk(u_int16_t channel, string& response); + + virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); + + virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist); + + virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, + u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ConnectionHandlerImpl(){} + }; + + class ChannelHandlerImpl : public virtual ChannelHandler{ + SessionHandlerImpl* parent; + public: + inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void open(u_int16_t channel, string& outOfBand); + + virtual void flow(u_int16_t channel, bool active); + + virtual void flowOk(u_int16_t channel, bool active); + + virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, + u_int16_t classId, u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ChannelHandlerImpl(){} + }; + + class ExchangeHandlerImpl : public virtual ExchangeHandler{ + SessionHandlerImpl* parent; + public: + inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, + bool passive, bool durable, bool autoDelete, bool internal, bool nowait, + qpid::framing::FieldTable& arguments); + + virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait); + + virtual ~ExchangeHandlerImpl(){} + }; + + + class QueueHandlerImpl : public virtual QueueHandler{ + SessionHandlerImpl* parent; + public: + inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments); + + virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue, + string& exchange, string& routingKey, bool nowait, + qpid::framing::FieldTable& arguments); + + virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue, + bool nowait); + + virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty, + bool nowait); + + virtual ~QueueHandlerImpl(){} + }; + + class BasicHandlerImpl : public virtual BasicHandler{ + SessionHandlerImpl* parent; + public: + inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); + + virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag, + bool noLocal, bool noAck, bool exclusive, bool nowait); + + virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait); + + virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey, + bool mandatory, bool immediate); + + virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck); + + virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + + virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + + virtual void recover(u_int16_t channel, bool requeue); + + virtual ~BasicHandlerImpl(){} + }; + + inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; } + inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; } + inline virtual BasicHandler* getBasicHandler(){ return basicHandler; } + inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; } + inline virtual QueueHandler* getQueueHandler(){ return queueHandler; } + + inline virtual AccessHandler* getAccessHandler(){ return 0; } + inline virtual FileHandler* getFileHandler(){ return 0; } + inline virtual StreamHandler* getStreamHandler(){ return 0; } + inline virtual TxHandler* getTxHandler(){ return 0; } + inline virtual DtxHandler* getDtxHandler(){ return 0; } + inline virtual TunnelHandler* getTunnelHandler(){ return 0; } +}; + +} +} + + +#endif |