diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionAdapter.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.h | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.h b/qpid/cpp/src/qpid/broker/SessionAdapter.h new file mode 100644 index 0000000000..6c09d68254 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.h @@ -0,0 +1,272 @@ +#ifndef _broker_SessionAdapter_h +#define _broker_SessionAdapter_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/broker/HandlerImpl.h" + +#include "qpid/Exception.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/StructHelper.h" + +#include <algorithm> +#include <vector> +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class Channel; +class Connection; +class Broker; +class Queue; + +/** + * SessionAdapter translates protocol-specific AMQP commands for one + * specific version of AMQP into calls on the core broker objects. It + * is a container for a collection of adapters. + * + * Each adapter class provides a client proxy to send methods to the + * peer broker or client. + * + */ + + class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations +{ + public: + SessionAdapter(SemanticState& session); + + framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} + + MessageHandler* getMessageHandler(){ return &messageImpl; } + ExchangeHandler* getExchangeHandler(){ return &exchangeImpl; } + QueueHandler* getQueueHandler(){ return &queueImpl; } + ExecutionHandler* getExecutionHandler(){ return &executionImpl; } + TxHandler* getTxHandler(){ return &txImpl; } + DtxHandler* getDtxHandler(){ return &dtxImpl; } + + ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); } + SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); } + FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); } + StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); } + + template <class F> void eachExclusiveQueue(F f) + { + queueImpl.eachExclusiveQueue(f); + } + + + private: + //common base for utility methods etc that are specific to this adapter + struct HandlerHelper : public HandlerImpl + { + HandlerHelper(SemanticState& s) : HandlerImpl(s) {} + + boost::shared_ptr<Queue> getQueue(const std::string& name) const; + }; + + + class ExchangeHandlerImpl : + public ExchangeHandler, + public HandlerHelper + { + public: + ExchangeHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + + void declare(const std::string& exchange, const std::string& type, + const std::string& alternateExchange, + bool passive, bool durable, bool autoDelete, + const qpid::framing::FieldTable& arguments); + void delete_(const std::string& exchange, bool ifUnused); + framing::ExchangeQueryResult query(const std::string& name); + void bind(const std::string& queue, + const std::string& exchange, const std::string& routingKey, + const qpid::framing::FieldTable& arguments); + void unbind(const std::string& queue, + const std::string& exchange, + const std::string& routingKey); + framing::ExchangeBoundResult bound(const std::string& exchange, + const std::string& queue, + const std::string& routingKey, + const framing::FieldTable& arguments); + private: + void checkType(boost::shared_ptr<Exchange> exchange, const std::string& type); + + void checkAlternate(boost::shared_ptr<Exchange> exchange, + boost::shared_ptr<Exchange> alternate); + }; + + class QueueHandlerImpl : public QueueHandler, + public HandlerHelper + { + Broker& broker; + std::vector< boost::shared_ptr<Queue> > exclusiveQueues; + //connectionId and userId are needed for queue-delete events for auto deleted, exclusive queues + std::string connectionId; + std::string userId; + + public: + QueueHandlerImpl(SemanticState& session); + ~QueueHandlerImpl(); + + void declare(const std::string& queue, + const std::string& alternateExchange, + bool passive, bool durable, bool exclusive, + bool autoDelete, + const qpid::framing::FieldTable& arguments); + void delete_(const std::string& queue, + bool ifUnused, bool ifEmpty); + void purge(const std::string& queue); + framing::QueueQueryResult query(const std::string& queue); + bool isLocal(const OwnershipToken* t) const; + + void destroyExclusiveQueues(); + void checkDelete(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty); + template <class F> void eachExclusiveQueue(F f) + { + std::for_each(exclusiveQueues.begin(), exclusiveQueues.end(), f); + } + }; + + class MessageHandlerImpl : + public MessageHandler, + public HandlerHelper + { + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + RangedOperation releaseRedeliveredOp; + RangedOperation releaseOp; + RangedOperation rejectOp; + RangedOperation acceptOp; + + public: + MessageHandlerImpl(SemanticState& session); + void transfer(const std::string& destination, + uint8_t acceptMode, + uint8_t acquireMode); + + void accept(const framing::SequenceSet& commands); + + void reject(const framing::SequenceSet& commands, + uint16_t code, + const std::string& text); + + void release(const framing::SequenceSet& commands, + bool setRedelivered); + + framing::MessageAcquireResult acquire(const framing::SequenceSet&); + + void subscribe(const std::string& queue, + const std::string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + bool exclusive, + const std::string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments); + + void cancel(const std::string& destination); + + void setFlowMode(const std::string& destination, + uint8_t flowMode); + + void flow(const std::string& destination, + uint8_t unit, + uint32_t value); + + void flush(const std::string& destination); + + void stop(const std::string& destination); + + framing::MessageResumeResult resume(const std::string& destination, + const std::string& resumeId); + + }; + + class ExecutionHandlerImpl : public ExecutionHandler, public HandlerHelper + { + public: + ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + + void sync(); + void result(const framing::SequenceNumber& commandId, const std::string& value); + void exception(uint16_t errorCode, + const framing::SequenceNumber& commandId, + uint8_t classCode, + uint8_t commandCode, + uint8_t fieldIndex, + const std::string& description, + const framing::FieldTable& errorInfo); + + }; + + class TxHandlerImpl : public TxHandler, public HandlerHelper + { + public: + TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + + void select(); + void commit(); + void rollback(); + }; + + class DtxHandlerImpl : public DtxHandler, public HandlerHelper + { + public: + DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} + + void select(); + + framing::XaResult start(const framing::Xid& xid, + bool join, + bool resume); + + framing::XaResult end(const framing::Xid& xid, + bool fail, + bool suspend); + + framing::XaResult commit(const framing::Xid& xid, + bool onePhase); + + void forget(const framing::Xid& xid); + + framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid); + + framing::XaResult prepare(const framing::Xid& xid); + + framing::DtxRecoverResult recover(); + + framing::XaResult rollback(const framing::Xid& xid); + + void setTimeout(const framing::Xid& xid, uint32_t timeout); + }; + + ExchangeHandlerImpl exchangeImpl; + QueueHandlerImpl queueImpl; + MessageHandlerImpl messageImpl; + ExecutionHandlerImpl executionImpl; + TxHandlerImpl txImpl; + DtxHandlerImpl dtxImpl; +}; +}} // namespace qpid::broker + + + +#endif /*!_broker_SessionAdapter_h*/ |