diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-04 20:42:19 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-04 20:42:19 +0000 |
commit | d47950ff7a88e4684d1e07e334e705776ed569a7 (patch) | |
tree | 23f00efd6c9009f158c0b9643746dd16fd8adbf6 /cpp/src | |
parent | cf9a1617599bb680deef2bab76fc6022f7dad50b (diff) | |
download | qpid-python-d47950ff7a88e4684d1e07e334e705776ed569a7.tar.gz |
Further updates to support final 0-10 spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633627 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Message.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 382 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 183 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/framing/ModelMethod.h | 6 |
14 files changed, 613 insertions, 12 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index f15f8d7a91..8ec15619c1 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -164,6 +164,7 @@ libqpidbroker_la_SOURCES = \ $(mgen_broker_cpp) \ qpid/broker/Broker.cpp \ qpid/broker/BrokerAdapter.cpp \ + qpid/broker/SessionAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ @@ -273,6 +274,7 @@ nobase_include_HEADERS = \ qpid/shared_ptr.h \ qpid/broker/Broker.h \ qpid/broker/BrokerAdapter.h \ + qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ qpid/broker/Queue.h \ qpid/broker/BrokerSingleton.h \ diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 5237087dc8..6e92f89706 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -80,6 +80,10 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Tunnel class not implemented"); } + Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); } + // Handlers no longer implemented in BrokerAdapter: #define BADHANDLER() assert(0); throw framing::NotImplementedException("") ExecutionHandler* getExecutionHandler() { BADHANDLER(); } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 822890ae76..8be4f7756e 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -117,7 +117,7 @@ void Connection::received(framing::AMQFrame& frame){ if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); - if (frame.getChannel() == 0) { + if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 1730c01099..a2c78daa7c 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -35,6 +35,7 @@ using namespace qpid::framing; using std::string; TransferAdapter Message::TRANSFER; +PreviewAdapter Message::TRANSFER_99_0; Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {} @@ -219,6 +220,8 @@ MessageAdapter& Message::getAdapter() const { if (!adapter) { if(frames.isA<MessageTransferBody>()) { + adapter = &TRANSFER_99_0; + } else if(frames.isA<Message010TransferBody>()) { adapter = &TRANSFER; } else { const AMQMethodBody* method = frames.getMethod(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 8a30c5770b..801834d519 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -133,6 +133,7 @@ public: mutable MessageAdapter* adapter; static TransferAdapter TRANSFER; + static PreviewAdapter TRANSFER_99_0; MessageAdapter& getAdapter() const; }; diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp index 2c29aa5444..a70de72646 100644 --- a/cpp/src/qpid/broker/MessageAdapter.cpp +++ b/cpp/src/qpid/broker/MessageAdapter.cpp @@ -36,12 +36,12 @@ namespace broker{ std::string TransferAdapter::getExchange(const framing::FrameSet& f) { - return f.as<framing::MessageTransferBody>()->getDestination(); + return f.as<framing::Message010TransferBody>()->getDestination(); } bool TransferAdapter::isImmediate(const framing::FrameSet&) { - //TODO: we seem to have lost the immediate flag + //TODO: delete this, immediate is no longer part of the spec return false; } @@ -57,4 +57,8 @@ namespace broker{ return p && p->getDeliveryMode() == 2; } + std::string PreviewAdapter::getExchange(const framing::FrameSet& f) + { + return f.as<framing::MessageTransferBody>()->getDestination(); + } }} diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h index 3220f304cc..de488b295e 100644 --- a/cpp/src/qpid/broker/MessageAdapter.h +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -29,6 +29,7 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/Message010TransferBody.h" namespace qpid { namespace broker { @@ -48,12 +49,17 @@ struct MessageAdapter struct TransferAdapter : MessageAdapter { std::string getRoutingKey(const framing::FrameSet& f); - std::string getExchange(const framing::FrameSet& f); + virtual std::string getExchange(const framing::FrameSet& f); bool isImmediate(const framing::FrameSet&); const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); bool isPersistent(const framing::FrameSet& f); }; +struct PreviewAdapter : TransferAdapter +{ + std::string getExchange(const framing::FrameSet& f); +}; + }} diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index 376a321d2d..269fc2d423 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -28,6 +28,10 @@ using namespace qpid::broker; using namespace qpid::framing; +namespace +{ + std::string type_str(uint8_t type); +} MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) : state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {} @@ -39,7 +43,19 @@ void MessageBuilder::handle(AMQFrame& frame) state = HEADER; break; case HEADER: - checkType(HEADER_BODY, frame.getBody()->type()); + switch (frame.getBody()->type()) { + case CONTENT_BODY: + //TODO: rethink how to handle non-existent headers... + //didn't get a header: add in a dummy + message->getFrames().append(AMQFrame(AMQHeaderBody())); + break; + case HEADER_BODY: + break; + default: + throw CommandInvalidException( + QPID_MSG("Invalid frame sequence for message, expected header or content got " + << type_str(frame.getBody()->type()) << ")")); + } state = CONTENT; break; case CONTENT: diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp new file mode 100644 index 0000000000..15c29ed482 --- /dev/null +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -0,0 +1,382 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "SessionAdapter.h" +#include "Connection.h" +#include "DeliveryToken.h" +#include "MessageDelivery.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" +#include <boost/format.hpp> +#include <boost/cast.hpp> +#include <boost/bind.hpp> + +namespace qpid { +namespace broker { + +using namespace qpid; +using namespace qpid::framing; + +typedef std::vector<Queue::shared_ptr> QueueVector; + +SessionAdapter::SessionAdapter(SemanticState& s) : + HandlerImpl(s), + exchangeImpl(s), + queueImpl(s), + messageImpl(s) +{} + + +void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type, + const string& alternateExchange, + bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ + + //TODO: implement autoDelete + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = getBroker().getExchanges().get(alternateExchange); + } + if(passive){ + Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); + checkType(actual, type); + checkAlternate(actual, alternate); + }else{ + try{ + std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); + if (response.second) { + if (durable) { + getBroker().getStore().create(*response.first); + } + if (alternate) { + response.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + } else { + checkType(response.first, type); + checkAlternate(response.first, alternate); + } + }catch(UnknownExchangeTypeException& e){ + throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); + } + } +} + +void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) +{ + if (!type.empty() && exchange->getType() != type) { + throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type)); + } +} + +void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) +{ + if (alternate && alternate != exchange->getAlternate()) + throw NotAllowedException( + QPID_MSG("Exchange declared with alternate-exchange " + << exchange->getAlternate()->getName() << ", requested " + << alternate->getName())); +} + +void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){ + //TODO: implement unused + Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); + if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); + getBroker().getExchanges().destroy(name); +} + +Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) +{ + try { + Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); + return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + } catch (const ChannelException& e) { + return Exchange010QueryResult("", false, true, FieldTable()); + } +} +void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, + const string& exchangeName, const string& routingKey, + const FieldTable& arguments){ + + Queue::shared_ptr queue = state.getQueue(queueName); + Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); + if(exchange){ + string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; + if (exchange->bind(queue, exchangeRoutingKey, &arguments)) { + queue->bound(exchangeName, routingKey, arguments); + if (exchange->isDurable() && queue->isDurable()) { + getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); + } + } + }else{ + throw NotFoundException( + "Bind failed. No such exchange: " + exchangeName); + } +} + +void +SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, + const string& exchangeName, + const string& routingKey) +{ + Queue::shared_ptr queue = state.getQueue(queueName); + if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); + + Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); + if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); + + //TODO: revise unbind to rely solely on binding key (not args) + if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) { + getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable()); + } + +} + +Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, + const std::string& queueName, + const std::string& key, + const framing::FieldTable& args) +{ + Exchange::shared_ptr exchange; + try { + exchange = getBroker().getExchanges().get(exchangeName); + } catch (const ChannelException&) {} + + Queue::shared_ptr queue; + if (!queueName.empty()) { + queue = getBroker().getQueues().find(queueName); + } + + if (!exchange) { + return Exchange010BoundResult(true, false, false, false, false); + } else if (!queueName.empty() && !queue) { + return Exchange010BoundResult(false, true, false, false, false); + } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { + return Exchange010BoundResult(false, false, false, false, false); + } else { + //need to test each specified option individually + bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); + bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); + bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); + + return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); + } +} + +Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) +{ + Queue::shared_ptr queue = state.getQueue(name); + Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); + + return Queue010QueryResult(queue->getName(), + alternateExchange ? alternateExchange->getName() : "", + queue->isDurable(), + queue->hasExclusiveOwner(), + queue->isAutoDelete(), + queue->getSettings(), + queue->getMessageCount(), + queue->getConsumerCount()); +} + +void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange, + bool passive, bool durable, bool exclusive, + bool autoDelete, const qpid::framing::FieldTable& arguments){ + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = getBroker().getExchanges().get(alternateExchange); + } + Queue::shared_ptr queue; + if (passive && !name.empty()) { + queue = state.getQueue(name); + //TODO: check alternate-exchange is as expected + } else { + std::pair<Queue::shared_ptr, bool> queue_created = + getBroker().getQueues().declare( + name, durable, + autoDelete, + exclusive ? &getConnection() : 0); + queue = queue_created.first; + assert(queue); + if (queue_created.second) { // This is a new queue + if (alternate) { + queue->setAlternateExchange(alternate); + alternate->incAlternateUsers(); + } + + //apply settings & create persistent record if required + queue_created.first->create(arguments); + + //add default binding: + getBroker().getExchanges().getDefault()->bind(queue, name, 0); + queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); + + //handle automatic cleanup: + if (exclusive) { + getConnection().exclusiveQueues.push_back(queue); + } + } else { + if (exclusive && queue->setExclusiveOwner(&getConnection())) { + getConnection().exclusiveQueues.push_back(queue); + } + } + } + if (exclusive && !queue->isExclusiveOwner(&getConnection())) + throw ResourceLockedException( + QPID_MSG("Cannot grant exclusive access to queue " + << queue->getName())); +} + + +void SessionAdapter::QueueHandlerImpl::purge(const string& queue){ + state.getQueue(queue)->purge(); +} + +void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){ + ChannelException error(0, ""); + Queue::shared_ptr q = state.getQueue(queue); + if(ifEmpty && q->getMessageCount() > 0){ + throw PreconditionFailedException("Queue not empty."); + }else if(ifUnused && q->getConsumerCount() > 0){ + throw PreconditionFailedException("Queue in use."); + }else{ + //remove the queue from the list of exclusive queues if necessary + if(q->isExclusiveOwner(&getConnection())){ + QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); + if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); + } + q->destroy(); + getBroker().getQueues().destroy(queue); + q->unbind(getBroker().getExchanges(), q); + } +} + + +SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : + HandlerImpl(s), + releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), + rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), + acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) + {} + +// +// Message class method handlers +// + +void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/, + uint8_t /*acceptMode*/, + uint8_t /*acquireMode*/) +{ + //not yet used (content containing assemblies treated differently at present +} + + void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool /*setRedelivered*/) +{ + transfers.for_each(releaseOp); +} + +void +SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, + const string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + bool exclusive, + const string& /*resumeId*/,//TODO implement resume behaviour + uint64_t /*resumeTtl*/, + const FieldTable& arguments) +{ + Queue::shared_ptr queue = state.getQueue(queueName); + if(!destination.empty() && state.exists(destination)) + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + + string tag = destination; + state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode), + tag, queue, false, //TODO get rid of no-local + acceptMode == 1, acquireMode == 0, exclusive, &arguments); +} + +void +SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) +{ + state.cancel(destination); +} + +void +SessionAdapter::MessageHandlerImpl::reject(const SequenceSet& transfers, uint16_t /*code*/, const string& /*text*/ ) +{ + transfers.for_each(rejectOp); +} + +void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) +{ + if (unit == 0) { + //message + state.addMessageCredit(destination, value); + } else if (unit == 1) { + //bytes + state.addByteCredit(destination, value); + } else { + //unknown + throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit)); + } + +} + +void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, u_int8_t mode) +{ + if (mode == 0) { + //credit + state.setCreditMode(destination); + } else if (mode == 1) { + //window + state.setWindowMode(destination); + } else{ + throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode)); + } +} + +void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination) +{ + state.flush(destination); +} + +void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination) +{ + state.stop(destination); +} + +void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands) +{ + + commands.for_each(acceptOp); +} + +/* +void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers) +{ + SequenceNumberSet results; + RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); + transfers.processRanges(op); + results = results.condense(); + getProxy().getMessage().acquired(results); +} +*/ + +}} // namespace qpid::broker + + diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h new file mode 100644 index 0000000000..0dd3529359 --- /dev/null +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -0,0 +1,183 @@ +#ifndef _broker_SessionAdapter_h +#define _broker_SessionAdapter_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "HandlerImpl.h" + +#include "qpid/Exception.h" +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/SequenceSet.h" + +#include <boost/function.hpp> + +namespace qpid { +namespace broker { + +class Channel; +class Connection; +class Broker; + +/** + * Per-channel protocol adapter. + * + * A container for a collection of AMQP-class adapters that translate + * AMQP method bodies into calls on the core Broker objects. Each + * adapter class also provides a client proxy to send methods to the + * peer. + * + */ +class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations +{ + public: + SessionAdapter(SemanticState& session); + + + framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} + + Message010Handler* getMessage010Handler(){ return &messageImpl; } + Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; } + Queue010Handler* getQueue010Handler(){ return &queueImpl; } + + + BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); } + ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); } + BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented"); } + QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented"); } + TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented"); } + MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented"); } + DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented"); } + DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented"); } + AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented"); } + FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); } + StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); } + TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); } + ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); } + ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); } + SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); } + Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); } + Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); } + + private: + class ExchangeHandlerImpl : + public Exchange010Handler, + public HandlerImpl + { + public: + ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + + void declare(const std::string& exchange, const std::string& type, + const std::string& alternateExchange, + bool passive, bool durable, bool autoDelete, + const qpid::framing::FieldTable& arguments); + void delete_(const std::string& exchange, bool ifUnused); + framing::Exchange010QueryResult query(const std::string& name); + void bind(const std::string& queue, + const std::string& exchange, const std::string& routingKey, + const qpid::framing::FieldTable& arguments); + void unbind(const std::string& queue, + const std::string& exchange, + const std::string& routingKey); + framing::Exchange010BoundResult bound(const std::string& exchange, + const std::string& queue, + const std::string& routingKey, + const framing::FieldTable& arguments); + private: + void checkType(shared_ptr<Exchange> exchange, const std::string& type); + + void checkAlternate(shared_ptr<Exchange> exchange, + shared_ptr<Exchange> alternate); + }; + + class QueueHandlerImpl : + public Queue010Handler, + public HandlerImpl + { + public: + QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {} + + void declare(const std::string& queue, + const std::string& alternateExchange, + bool passive, bool durable, bool exclusive, + bool autoDelete, + const qpid::framing::FieldTable& arguments); + void delete_(const std::string& queue, + bool ifUnused, bool ifEmpty); + void purge(const std::string& queue); + framing::Queue010QueryResult query(const std::string& queue); + }; + + class MessageHandlerImpl : + public Message010Handler, + public HandlerImpl + { + typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + RangedOperation releaseOp; + RangedOperation rejectOp; + RangedOperation acceptOp; + + public: + MessageHandlerImpl(SemanticState& session); + void transfer(const string& destination, + uint8_t acceptMode, + uint8_t acquireMode); + + void accept(const framing::SequenceSet& commands); + + void reject(const framing::SequenceSet& commands, + uint16_t code, + const string& text); + + void release(const framing::SequenceSet& commands, + bool setRedelivered); + + void subscribe(const string& queue, + const string& destination, + uint8_t acceptMode, + uint8_t acquireMode, + bool exclusive, + const string& resumeId, + uint64_t resumeTtl, + const framing::FieldTable& arguments); + + void cancel(const string& destination); + + void setFlowMode(const string& destination, + uint8_t flowMode); + + void flow(const string& destination, + uint8_t unit, + uint32_t value); + + void flush(const string& destination); + + void stop(const string& destination); + + }; + + ExchangeHandlerImpl exchangeImpl; + QueueHandlerImpl queueImpl; + MessageHandlerImpl messageImpl; +}; +}} // namespace qpid::broker + + + +#endif /*!_broker_SessionAdapter_h*/ diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 2db7d688b7..c936edee21 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -31,7 +31,7 @@ #include "qpid/sys/Time.h" #include "qpid/management/Manageable.h" #include "qpid/management/Session.h" -#include "BrokerAdapter.h" +#include "SessionAdapter.h" #include "DeliveryAdapter.h" #include "MessageBuilder.h" #include "SessionContext.h" @@ -132,7 +132,7 @@ class SessionState : public framing::SessionState, sys::Mutex lock; SemanticState semanticState; - BrokerAdapter adapter; + SessionAdapter adapter; MessageBuilder msgBuilder; RangedOperation ackOp; diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 0e1c9da922..e6f15f2c4e 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -30,7 +30,7 @@ using namespace boost; FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {parts.reserve(4);} -void FrameSet::append(AMQFrame& part) +void FrameSet::append(const AMQFrame& part) { parts.push_back(part); } diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index 8ba22f07cb..454d292d9d 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -43,7 +43,7 @@ public: typedef boost::shared_ptr<FrameSet> shared_ptr; FrameSet(const SequenceNumber& id); - void append(AMQFrame& part); + void append(const AMQFrame& part); bool isComplete() const; uint64_t getContentSize() const; diff --git a/cpp/src/qpid/framing/ModelMethod.h b/cpp/src/qpid/framing/ModelMethod.h index f3c0fa5d65..07600aadca 100644 --- a/cpp/src/qpid/framing/ModelMethod.h +++ b/cpp/src/qpid/framing/ModelMethod.h @@ -33,9 +33,9 @@ class ModelMethod : public AMQMethodBody mutable ExecutionHeader header; public: virtual ~ModelMethod() {} - virtual void encode(Buffer& buffer) const { header.encode(buffer); } - virtual void decode(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); } - virtual uint32_t size() const { return header.size(); } + virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); } + virtual void decodeHeader(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); } + virtual uint32_t headerSize() const { return header.size(); } virtual bool isSync() const { return header.getSync(); } virtual void setSync(bool on) const { header.setSync(on); } ExecutionHeader& getHeader() { return header; } |