diff options
Diffstat (limited to 'cpp/src')
74 files changed, 272 insertions, 3773 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 597e566049..ce36a33933 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -13,10 +13,9 @@ force: if GENERATE -# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac -amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml +# AMQP_FINAL_XML is defined in ../configure.ac amqp_0_10_xml=@AMQP_FINAL_XML@ -specs=$(amqp_99_0_xml) $(amqp_0_10_xml) +specs=$(amqp_0_10_xml) # Ruby generator. rgen_dir=$(top_srcdir)/rubygen @@ -152,7 +151,6 @@ libqpidcommon_la_SOURCES = \ qpid/framing/Buffer.cpp \ qpid/framing/FieldTable.cpp \ qpid/framing/FieldValue.cpp \ - qpid/framing/FramingContent.cpp \ qpid/framing/FrameSet.cpp \ qpid/framing/ProtocolInitiation.cpp \ qpid/framing/ProtocolVersion.cpp \ @@ -193,19 +191,11 @@ libqpidbroker_la_SOURCES = \ qpid/amqp_0_10/Connection.h \ qpid/amqp_0_10/Connection.cpp \ qpid/broker/Broker.cpp \ - qpid/broker/BrokerAdapter.cpp \ - qpid/broker/SessionAdapter.cpp \ qpid/broker/BrokerSingleton.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ qpid/broker/PersistableMessage.cpp \ qpid/broker/Bridge.cpp \ - qpid/broker/PreviewConnection.cpp \ - qpid/broker/PreviewConnectionCodec.cpp \ - qpid/broker/PreviewConnectionHandler.cpp \ - qpid/broker/PreviewSessionHandler.cpp \ - qpid/broker/PreviewSessionManager.cpp \ - qpid/broker/PreviewSessionState.cpp \ qpid/broker/Connection.cpp \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionFactory.cpp \ @@ -215,7 +205,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/DirectExchange.cpp \ qpid/broker/DtxAck.cpp \ qpid/broker/DtxBuffer.cpp \ - qpid/broker/DtxHandlerImpl.cpp \ qpid/broker/DtxManager.cpp \ qpid/broker/DtxTimeout.cpp \ qpid/broker/DtxWorkRecord.cpp \ @@ -228,7 +217,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageDelivery.cpp \ - qpid/broker/MessageHandlerImpl.cpp \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NullMessageStore.cpp \ @@ -241,6 +229,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SaslAuthenticator.cpp \ qpid/broker/SemanticState.h \ qpid/broker/SemanticState.cpp \ + qpid/broker/SessionAdapter.cpp \ qpid/broker/SessionState.h \ qpid/broker/SessionState.cpp \ qpid/broker/SessionManager.h \ @@ -248,7 +237,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SessionHandler.h \ qpid/broker/SessionContext.h \ qpid/broker/SessionHandler.cpp \ - qpid/broker/SemanticHandler.cpp \ qpid/broker/System.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ @@ -308,18 +296,11 @@ nobase_include_HEADERS = \ qpid/memory.h \ qpid/shared_ptr.h \ qpid/broker/Broker.h \ - qpid/broker/BrokerAdapter.h \ qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ qpid/broker/Queue.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Bridge.h \ - qpid/broker/PreviewConnection.h \ - qpid/broker/PreviewConnectionCodec.h \ - qpid/broker/PreviewConnectionHandler.h \ - qpid/broker/PreviewSessionHandler.h \ - qpid/broker/PreviewSessionManager.h \ - qpid/broker/PreviewSessionState.h \ qpid/broker/Connection.h \ qpid/broker/ConnectionState.h \ qpid/broker/ConnectionFactory.h \ @@ -337,7 +318,6 @@ nobase_include_HEADERS = \ qpid/broker/DirectExchange.h \ qpid/broker/DtxAck.h \ qpid/broker/DtxBuffer.h \ - qpid/broker/DtxHandlerImpl.h \ qpid/broker/DtxManager.h \ qpid/broker/DtxTimeout.h \ qpid/broker/DtxWorkRecord.h \ @@ -351,7 +331,6 @@ nobase_include_HEADERS = \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.h \ qpid/broker/MessageDelivery.h \ - qpid/broker/MessageHandlerImpl.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.h \ qpid/broker/NameGenerator.h \ @@ -372,8 +351,8 @@ nobase_include_HEADERS = \ qpid/broker/RecoveredEnqueue.h \ qpid/broker/RecoveryManager.h \ qpid/broker/RecoveryManagerImpl.h \ - qpid/broker/SemanticHandler.h \ qpid/broker/SaslAuthenticator.h \ + qpid/broker/SessionAdapter.h \ qpid/broker/SessionManager.h \ qpid/broker/System.h \ qpid/broker/Timer.h \ @@ -434,7 +413,6 @@ nobase_include_HEADERS = \ qpid/framing/FrameHandler.h \ qpid/framing/FrameHandler.h \ qpid/framing/FrameSet.h \ - qpid/framing/FramingContent.h \ qpid/framing/Handler.h \ qpid/framing/HeaderProperties.h \ qpid/framing/Invoker.h \ diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index 2f934166a7..e74fa79ed9 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -23,6 +23,7 @@ */ #include "qpid/framing/amqp_types.h" +#include "qpid/framing/constants.h" #include "qpid/Msg.h" #include <memory> @@ -51,29 +52,22 @@ class Exception : public std::exception mutable std::string whatStr; }; -/** - * I have made SessionException a common base for Channel- and - * Connection- Exceptions. This is not strictly correct but allows all - * model layer exceptions to be handled as SessionExceptions which is - * how they are classified in the final 0-10 specification. I.e. this - * is a temporary hack to allow the preview and final codepaths to - * co-exist with minimal changes. - */ - struct SessionException : public Exception { const framing::ReplyCode code; SessionException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} }; -struct ChannelException : public SessionException { - ChannelException(framing::ReplyCode code, const std::string& message) - : SessionException(code, message) {} +struct ChannelException : public Exception { + const framing::ReplyCode code; + ChannelException(framing::ReplyCode _code, const std::string& message) + : Exception(message), code(_code) {} }; -struct ConnectionException : public SessionException { - ConnectionException(framing::ReplyCode code, const std::string& message) - : SessionException(code, message) {} +struct ConnectionException : public Exception { + const framing::ReplyCode code; + ConnectionException(framing::ReplyCode _code, const std::string& message) + : Exception(message), code(_code) {} }; struct ClosedException : public Exception { diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 598f428ad2..ea9a41ac9d 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -46,24 +46,24 @@ Bridge::~Bridge() void Bridge::create() { - framing::AMQP_ServerProxy::Session010 session(channel); + framing::AMQP_ServerProxy::Session session(channel); session.attach(name, false); if (args.i_src_is_local) { //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received() } else { if (args.i_src_is_queue) { - peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable()); - peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF); + peer.getMessage().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } else { string queue = "bridge_queue_"; queue += Uuid(true).str(); - peer.getQueue010().declare(queue, "", false, false, true, true, FieldTable()); - peer.getExchange010().bind(queue, args.i_src, args.i_key, FieldTable()); - peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable()); - peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF); + peer.getQueue().declare(queue, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable()); + peer.getMessage().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); } } @@ -71,8 +71,8 @@ void Bridge::create() void Bridge::cancel() { - peer.getMessage010().cancel(args.i_dest); - peer.getSession010().detach(name); + peer.getMessage().cancel(args.i_dest); + peer.getSession().detach(name); } management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 6cbd9bf343..d9cf93f766 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -131,8 +131,7 @@ Broker::Broker(const Broker::Options& conf) : store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), factory(*this), - sessionManager(conf.ack), - previewSessionManager(conf.ack) + sessionManager(conf.ack) { if(conf.enableMgmt){ QPID_LOG(info, "Management enabled"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index fa66061fd0..7297241763 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -30,7 +30,6 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" -#include "PreviewSessionManager.h" #include "Vhost.h" #include "System.h" #include "qpid/management/Manageable.h" @@ -117,7 +116,6 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } - PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -148,7 +146,6 @@ class Broker : public sys::Runnable, public Plugin::Target, ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; - PreviewSessionManager previewSessionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp deleted file mode 100644 index b83a275959..0000000000 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ /dev/null @@ -1,353 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "BrokerAdapter.h" -#include "Connection.h" -#include "DeliveryToken.h" -#include "MessageDelivery.h" -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/Exception.h" -#include "qpid/framing/reply_exceptions.h" - -namespace qpid { -namespace broker { - -using namespace qpid; -using namespace qpid::framing; - -typedef std::vector<Queue::shared_ptr> QueueVector; - -// TODO aconway 2007-08-31: now that functionality is distributed -// between different handlers, BrokerAdapter should be dropped. -// Instead the individual class Handler interfaces can be implemented -// by the handlers responsible for those classes. -// - -BrokerAdapter::BrokerAdapter(SemanticState& s) : - HandlerImpl(s), - basicHandler(s), - exchangeHandler(s), - bindingHandler(s), - messageHandler(s), - queueHandler(s), - txHandler(s), - dtxHandler(s) -{} - - -void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, - const string& alternateExchange, - bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = getBroker().getExchanges().get(alternateExchange); - } - if(passive){ - Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange)); - checkType(actual, type); - checkAlternate(actual, alternate); - }else{ - try{ - std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args); - if (response.second) { - if (durable) { - getBroker().getStore().create(*response.first, args); - } - if (alternate) { - response.first->setAlternate(alternate); - alternate->incAlternateUsers(); - } - } else { - checkType(response.first, type); - checkAlternate(response.first, alternate); - } - }catch(UnknownExchangeTypeException& e){ - throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); - } - } -} - -void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) -{ - if (!type.empty() && exchange->getType() != type) { - throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type)); - } -} - -void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) -{ - if (alternate && alternate != exchange->getAlternate()) - throw NotAllowedException( - QPID_MSG("Exchange declared with alternate-exchange " - << exchange->getAlternate()->getName() << ", requested " - << alternate->getName())); -} - -void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ - //TODO: implement unused - Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); - if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); - if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - getBroker().getExchanges().destroy(name); -} - -ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) -{ - try { - Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); - } catch (const ChannelException& e) { - return ExchangeXQueryResult("", false, true, FieldTable()); - } -} - -BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, - const std::string& exchangeName, - const std::string& queueName, - const std::string& key, - const framing::FieldTable& args) -{ - Exchange::shared_ptr exchange; - try { - exchange = getBroker().getExchanges().get(exchangeName); - } catch (const ChannelException&) {} - - Queue::shared_ptr queue; - if (!queueName.empty()) { - queue = getBroker().getQueues().find(queueName); - } - - if (!exchange) { - return BindingXQueryResult(true, false, false, false, false); - } else if (!queueName.empty() && !queue) { - return BindingXQueryResult(false, true, false, false, false); - } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return BindingXQueryResult(false, false, false, false, false); - } else { - //need to test each specified option individually - bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); - bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); - bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - - return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); - } -} - -QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) -{ - Queue::shared_ptr queue = state.getQueue(name); - Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - - return QueueXQueryResult(queue->getName(), - alternateExchange ? alternateExchange->getName() : "", - queue->isDurable(), - queue->hasExclusiveOwner(), - queue->isAutoDelete(), - queue->getSettings(), - queue->getMessageCount(), - queue->getConsumerCount()); -} - -void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, - bool passive, bool durable, bool exclusive, - bool autoDelete, const qpid::framing::FieldTable& arguments){ - - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = getBroker().getExchanges().get(alternateExchange); - } - Queue::shared_ptr queue; - if (passive && !name.empty()) { - queue = state.getQueue(name); - //TODO: check alternate-exchange is as expected - } else { - std::pair<Queue::shared_ptr, bool> queue_created = - getBroker().getQueues().declare( - name, durable, - autoDelete, - exclusive ? &getConnection() : 0); - queue = queue_created.first; - assert(queue); - if (queue_created.second) { // This is a new queue - if (alternate) { - queue->setAlternateExchange(alternate); - alternate->incAlternateUsers(); - } - - //apply settings & create persistent record if required - queue_created.first->create(arguments); - - //add default binding: - getBroker().getExchanges().getDefault()->bind(queue, name, 0); - queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments); - - //handle automatic cleanup: - if (exclusive) { - getConnection().exclusiveQueues.push_back(queue); - } - } else { - if (exclusive && queue->setExclusiveOwner(&getConnection())) { - getConnection().exclusiveQueues.push_back(queue); - } - } - } - if (exclusive && !queue->isExclusiveOwner(&getConnection())) - throw ResourceLockedException( - QPID_MSG("Cannot grant exclusive access to queue " - << queue->getName())); -} - -void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, - const FieldTable& arguments){ - - Queue::shared_ptr queue = state.getQueue(queueName); - Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); - if(exchange){ - string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; - if (exchange->bind(queue, exchangeRoutingKey, &arguments)) { - queue->bound(exchangeName, routingKey, arguments); - if (exchange->isDurable() && queue->isDurable()) { - getBroker().getStore().bind(*exchange, *queue, routingKey, arguments); - } - } - }else{ - throw NotFoundException( - "Bind failed. No such exchange: " + exchangeName); - } -} - -void -BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, - const string& queueName, - const string& exchangeName, - const string& routingKey, - const qpid::framing::FieldTable& arguments ) -{ - Queue::shared_ptr queue = state.getQueue(queueName); - if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - - Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName); - if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName); - - if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) { - getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments); - } - -} - -void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){ - state.getQueue(queue)->purge(); -} - -void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){ - ChannelException error(0, ""); - Queue::shared_ptr q = state.getQueue(queue); - if(ifEmpty && q->getMessageCount() > 0){ - throw PreconditionFailedException("Queue not empty."); - }else if(ifUnused && q->getConsumerCount() > 0){ - throw PreconditionFailedException("Queue in use."); - }else{ - //remove the queue from the list of exclusive queues if necessary - if(q->isExclusiveOwner(&getConnection())){ - QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q); - if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i); - } - q->destroy(); - getBroker().getQueues().destroy(queue); - q->unbind(getBroker().getExchanges(), q); - } -} - - - - -void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){ - //TODO: handle global - state.setPrefetchSize(prefetchSize); - state.setPrefetchCount(prefetchCount); -} - -void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, - const string& queueName, const string& consumerTag, - bool noLocal, bool noAck, bool exclusive, - bool nowait, const FieldTable& fields) -{ - - Queue::shared_ptr queue = state.getQueue(queueName); - if(!consumerTag.empty() && state.exists(consumerTag)){ - throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); - } - string newTag = consumerTag; - //need to generate name here, so we have it for the adapter (it is - //also version specific behaviour now) - if (newTag.empty()) newTag = tagGenerator.generate(); - DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag)); - state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields); - - if(!nowait) - getProxy().getBasic().consumeOk(newTag); -} - -void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){ - state.cancel(consumerTag); -} - -void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ - Queue::shared_ptr queue = state.getQueue(queueName); - DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue)); - if(!state.get(token, queue, !noAck)){ - string clusterId;//not used, part of an imatix hack - - getProxy().getBasic().getEmpty(clusterId); - } -} - -void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ - if (multiple) { - state.ackCumulative(deliveryTag); - } else { - state.ackRange(deliveryTag, deliveryTag); - } -} - -void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){} - -void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) -{ - state.recover(requeue); -} - -void BrokerAdapter::TxHandlerImpl::select() -{ - state.startTx(); -} - -void BrokerAdapter::TxHandlerImpl::commit() -{ - state.commit(&getBroker().getStore(), true); -} - -void BrokerAdapter::TxHandlerImpl::rollback() -{ - state.rollback(); - state.recover(true); -} - -}} // namespace qpid::broker - diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h deleted file mode 100644 index 26dfe802e1..0000000000 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ /dev/null @@ -1,208 +0,0 @@ -#ifndef _broker_BrokerAdapter_h -#define _broker_BrokerAdapter_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "DtxHandlerImpl.h" -#include "MessageHandlerImpl.h" - -#include "qpid/Exception.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/reply_exceptions.h" - -namespace qpid { -namespace broker { - -class Channel; -class Connection; -class Broker; -class ConnectionHandler; -class BasicHandler; -class ExchangeHandler; -class QueueHandler; -class TxHandler; -class MessageHandler; -class AccessHandler; -class FileHandler; -class StreamHandler; -class DtxHandler; -class TunnelHandler; -class MessageHandlerImpl; -class Exchange; - -/** - * Per-channel protocol adapter. - * - * A container for a collection of AMQP-class adapters that translate - * AMQP method bodies into calls on the core Broker objects. Each - * adapter class also provides a client proxy to send methods to the - * peer. - * - */ -class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations -{ - public: - BrokerAdapter(SemanticState& session); - - BasicHandler* getBasicHandler() { return &basicHandler; } - ExchangeHandler* getExchangeHandler() { return &exchangeHandler; } - BindingHandler* getBindingHandler() { return &bindingHandler; } - QueueHandler* getQueueHandler() { return &queueHandler; } - TxHandler* getTxHandler() { return &txHandler; } - MessageHandler* getMessageHandler() { return &messageHandler; } - DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } - DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } - - framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} - - - AccessHandler* getAccessHandler() { - throw framing::NotImplementedException("Access class not implemented"); } - FileHandler* getFileHandler() { - throw framing::NotImplementedException("File class not implemented"); } - StreamHandler* getStreamHandler() { - throw framing::NotImplementedException("Stream class not implemented"); } - TunnelHandler* getTunnelHandler() { - throw framing::NotImplementedException("Tunnel class not implemented"); } - - Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); } - Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); } - Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); } - Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); } - Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); } - Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); } - - // Handlers no longer implemented in BrokerAdapter: -#define BADHANDLER() assert(0); throw framing::NotImplementedException("") - ExecutionHandler* getExecutionHandler() { BADHANDLER(); } - ConnectionHandler* getConnectionHandler() { BADHANDLER(); } - SessionHandler* getSessionHandler() { BADHANDLER(); } - Connection010Handler* getConnection010Handler() { BADHANDLER(); } - Session010Handler* getSession010Handler() { BADHANDLER(); } -#undef BADHANDLER - - private: - class ExchangeHandlerImpl : - public ExchangeHandler, - public HandlerImpl - { - public: - ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {} - - void declare(uint16_t ticket, - const std::string& exchange, const std::string& type, - const std::string& alternateExchange, - bool passive, bool durable, bool autoDelete, - const qpid::framing::FieldTable& arguments); - void delete_(uint16_t ticket, - const std::string& exchange, bool ifUnused); - framing::ExchangeXQueryResult query(u_int16_t ticket, - const std::string& name); - private: - void checkType(shared_ptr<Exchange> exchange, const std::string& type); - - void checkAlternate(shared_ptr<Exchange> exchange, - shared_ptr<Exchange> alternate); - }; - - class BindingHandlerImpl : - public BindingHandler, - public HandlerImpl - { - public: - BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {} - - framing::BindingXQueryResult query(u_int16_t ticket, - const std::string& exchange, - const std::string& queue, - const std::string& routingKey, - const framing::FieldTable& arguments); - }; - - class QueueHandlerImpl : - public QueueHandler, - public HandlerImpl - { - public: - QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {} - - void declare(uint16_t ticket, const std::string& queue, - const std::string& alternateExchange, - bool passive, bool durable, bool exclusive, - bool autoDelete, - const qpid::framing::FieldTable& arguments); - void bind(uint16_t ticket, const std::string& queue, - const std::string& exchange, const std::string& routingKey, - const qpid::framing::FieldTable& arguments); - void unbind(uint16_t ticket, - const std::string& queue, - const std::string& exchange, - const std::string& routingKey, - const qpid::framing::FieldTable& arguments ); - framing::QueueXQueryResult query(const std::string& queue); - void purge(uint16_t ticket, const std::string& queue); - void delete_(uint16_t ticket, const std::string& queue, - bool ifUnused, bool ifEmpty); - }; - - class BasicHandlerImpl : - public BasicHandler, - public HandlerImpl - { - NameGenerator tagGenerator; - public: - BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {} - - void qos(uint32_t prefetchSize, - uint16_t prefetchCount, bool global); - void consume(uint16_t ticket, const std::string& queue, - const std::string& consumerTag, - bool noLocal, bool noAck, bool exclusive, bool nowait, - const qpid::framing::FieldTable& fields); - void cancel(const std::string& consumerTag); - void get(uint16_t ticket, const std::string& queue, bool noAck); - void ack(uint64_t deliveryTag, bool multiple); - void reject(uint64_t deliveryTag, bool requeue); - void recover(bool requeue); - }; - - class TxHandlerImpl : - public TxHandler, - public HandlerImpl - { - public: - TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {} - - void select(); - void commit(); - void rollback(); - }; - - BasicHandlerImpl basicHandler; - ExchangeHandlerImpl exchangeHandler; - BindingHandlerImpl bindingHandler; - MessageHandlerImpl messageHandler; - QueueHandlerImpl queueHandler; - TxHandlerImpl txHandler; - DtxHandlerImpl dtxHandler; -}; -}} // namespace qpid::broker - - - -#endif /*!_broker_BrokerAdapter_h*/ diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index fca381063e..1994c4fdf5 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -20,9 +20,7 @@ */ #include "Connection.h" #include "SessionState.h" -#include "BrokerAdapter.h" #include "Bridge.h" -#include "SemanticHandler.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -143,7 +141,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) - ptr_map_ptr(i)->localSuspend(); + ptr_map_ptr(i)->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index 7e20408388..5de5a0230a 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -21,7 +21,6 @@ #include "ConnectionFactory.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/amqp_0_10/Connection.h" -#include "PreviewConnectionCodec.h" namespace qpid { namespace broker { @@ -34,8 +33,6 @@ ConnectionFactory::~ConnectionFactory() {} sys::ConnectionCodec* ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == ProtocolVersion(99, 0)) - return new PreviewConnectionCodec(out, broker, id); if (v == ProtocolVersion(0, 10)) return new amqp_0_10::Connection(out, broker, id); return 0; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index c7738cc4ea..4ed2f5bfa2 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -50,9 +50,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) try{ bool handled = false; if (handler->serverMode) { - handled = invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method); + handled = invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method); } else { - handled = invoke(static_cast<AMQP_ClientOperations::Connection010Handler&>(*handler.get()), *method); + handled = invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method); } if (!handled) { handler->connection.getChannel(frame.getChannel()).in(frame); diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index ea8b84b07c..a04936a943 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -41,11 +41,11 @@ class Connection; class ConnectionHandler : public framing::FrameHandler { - struct Handler : public framing::AMQP_ServerOperations::Connection010Handler, - public framing::AMQP_ClientOperations::Connection010Handler + struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, + public framing::AMQP_ClientOperations::ConnectionHandler { - framing::AMQP_ClientProxy::Connection010 client; - framing::AMQP_ServerProxy::Connection010 server; + framing::AMQP_ClientProxy::Connection client; + framing::AMQP_ServerProxy::Connection server; Connection& connection; bool serverMode; std::auto_ptr<SaslAuthenticator> authenticator; diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp deleted file mode 100644 index 61ab856fa9..0000000000 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "DtxHandlerImpl.h" - -#include <boost/format.hpp> -#include "Broker.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/Array.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using std::string; - -DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {} - -// DtxDemarcationHandler: - - -void DtxHandlerImpl::select() -{ - state.selectDtx(); -} - -DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, - const string& xid, - bool fail, - bool suspend) -{ - try { - if (fail) { - state.endDtx(xid, true); - if (suspend) { - throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); - } else { - return DtxDemarcationXEndResult(XA_RBROLLBACK); - } - } else { - if (suspend) { - state.suspendDtx(xid); - } else { - state.endDtx(xid, false); - } - return DtxDemarcationXEndResult(XA_OK); - } - } catch (const DtxTimeoutException& e) { - return DtxDemarcationXEndResult(XA_RBTIMEOUT); - } -} - -DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, - const string& xid, - bool join, - bool resume) -{ - if (join && resume) { - throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); - } - try { - if (resume) { - state.resumeDtx(xid); - } else { - state.startDtx(xid, getBroker().getDtxManager(), join); - } - return DtxDemarcationXStartResult(XA_OK); - } catch (const DtxTimeoutException& e) { - return DtxDemarcationXStartResult(XA_RBTIMEOUT); - } -} - -// DtxCoordinationHandler: - -DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, - const string& xid) -{ - try { - bool ok = getBroker().getDtxManager().prepare(xid); - return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK); - } catch (const DtxTimeoutException& e) { - return DtxCoordinationXPrepareResult(XA_RBTIMEOUT); - } -} - -DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, - const string& xid, - bool onePhase) -{ - try { - bool ok = getBroker().getDtxManager().commit(xid, onePhase); - return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK); - } catch (const DtxTimeoutException& e) { - return DtxCoordinationXCommitResult(XA_RBTIMEOUT); - } -} - - -DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, - const string& xid ) -{ - try { - getBroker().getDtxManager().rollback(xid); - return DtxCoordinationXRollbackResult(XA_OK); - } catch (const DtxTimeoutException& e) { - return DtxCoordinationXRollbackResult(XA_RBTIMEOUT); - } -} - -DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, - bool /*startscan*/, - bool /*endscan*/ ) -{ - //TODO: what do startscan and endscan actually mean? - - // response should hold on key value pair with key = 'xids' and - // value = sequence of xids - - // until sequences are supported (0-10 encoding), an alternate - // scheme is used for testing: - // - // key = 'xids' and value = a longstr containing shortstrs for each xid - // - // note that this restricts the length of the xids more than is - // strictly 'legal', but that is ok for testing - std::set<std::string> xids; - getBroker().getStore().collectPreparedXids(xids); - - //TODO: remove the need to copy from one container type to another - std::vector<std::string> data; - for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) { - data.push_back(*i); - } - Array indoubt(data); - return DtxCoordinationXRecoverResult(indoubt); -} - -void DtxHandlerImpl::forget(u_int16_t /*ticket*/, - const string& xid) -{ - //Currently no heuristic completion is supported, so this should never be used. - throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); -} - -DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) -{ - uint32_t timeout = getBroker().getDtxManager().getTimeout(xid); - return DtxCoordinationXGetTimeoutResult(timeout); -} - - -void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, - const string& xid, - u_int32_t timeout) -{ - getBroker().getDtxManager().setTimeout(xid, timeout); -} - - diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h deleted file mode 100644 index efb56dba95..0000000000 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ /dev/null @@ -1,67 +0,0 @@ -#ifndef _broker_DtxHandlerImpl_h -#define _broker_DtxHandlerImpl_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "HandlerImpl.h" - -namespace qpid { -namespace broker { - -class DtxHandlerImpl - : public HandlerImpl, - public framing::AMQP_ServerOperations::DtxCoordinationHandler, - public framing::AMQP_ServerOperations::DtxDemarcationHandler -{ -public: - DtxHandlerImpl(SemanticState&); - - // DtxCoordinationHandler: - - framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase); - - void forget(u_int16_t ticket, const std::string& xid); - - framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid); - - framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid); - - framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan); - - framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid); - - void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout); - - // DtxDemarcationHandler: - - framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); - - void select(); - - framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); -}; - - -}} // namespace qpid::broker - - - -#endif /*!_broker_DtxHandlerImpl_h*/ diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 297e610418..dd013843f9 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -24,7 +24,6 @@ #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageXTransferBody.h" #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" @@ -36,7 +35,6 @@ using namespace qpid::framing; using std::string; TransferAdapter Message::TRANSFER; -PreviewAdapter Message::TRANSFER_99_0; Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {} @@ -225,9 +223,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) MessageAdapter& Message::getAdapter() const { if (!adapter) { - if(frames.isA<MessageXTransferBody>()) { - adapter = &TRANSFER_99_0; - } else if(frames.isA<MessageTransferBody>()) { + if(frames.isA<MessageTransferBody>()) { adapter = &TRANSFER; } else { const AMQMethodBody* method = frames.getMethod(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index 4fd2f1401d..87c7a9c43e 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -136,7 +136,6 @@ public: mutable MessageAdapter* adapter; static TransferAdapter TRANSFER; - static PreviewAdapter TRANSFER_99_0; MessageAdapter& getAdapter() const; }; diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp index 013e2c91ac..12f01494de 100644 --- a/cpp/src/qpid/broker/MessageAdapter.cpp +++ b/cpp/src/qpid/broker/MessageAdapter.cpp @@ -24,7 +24,6 @@ #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageXTransferBody.h" namespace { const std::string empty; @@ -68,27 +67,4 @@ namespace broker{ return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/; } - std::string PreviewAdapter::getExchange(const framing::FrameSet& f) - { - return f.as<framing::MessageXTransferBody>()->getDestination(); - } - - std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f) - { - const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>(); - return p ? p->getRoutingKey() : empty; - } - - const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f) - { - const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>(); - return p ? &(p->getApplicationHeaders()) : 0; - } - - bool PreviewAdapter::isPersistent(const framing::FrameSet& f) - { - const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>(); - return p && p->getDeliveryMode() == 2; - } - }} diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h index 4c13e756e9..61a1bc4794 100644 --- a/cpp/src/qpid/broker/MessageAdapter.h +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -52,14 +52,6 @@ struct TransferAdapter : MessageAdapter bool requiresAccept(const framing::FrameSet& f); }; -struct PreviewAdapter : TransferAdapter -{ - std::string getExchange(const framing::FrameSet& f); - std::string getRoutingKey(const framing::FrameSet& f); - const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); - bool isPersistent(const framing::FrameSet& f); -}; - }} diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 36862edf37..f2a55e2790 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -24,10 +24,7 @@ #include "Message.h" #include "Queue.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/BasicXDeliverBody.h" -#include "qpid/framing/BasicXGetOkBody.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageXTransferBody.h" using namespace boost; @@ -43,41 +40,6 @@ struct BaseToken : DeliveryToken virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0; }; -struct BasicGetToken : BaseToken -{ - typedef boost::shared_ptr<BasicGetToken> shared_ptr; - - Queue::shared_ptr queue; - - BasicGetToken(Queue::shared_ptr q) : queue(q) {} - - AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) - { - return AMQFrame(in_place<BasicXGetOkBody>( - ProtocolVersion(), id.getValue(), - msg->getRedelivered(), msg->getExchangeName(), - msg->getRoutingKey(), queue->getMessageCount())); - } -}; - -struct BasicConsumeToken : BaseToken -{ - typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; - - const string consumer; - - BasicConsumeToken(const string c) : consumer(c) {} - - AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) - { - return AMQFrame(in_place<BasicXDeliverBody>( - ProtocolVersion(), consumer, id.getValue(), - msg->getRedelivered(), msg->getExchangeName(), - msg->getRoutingKey())); - } - -}; - struct MessageDeliveryToken : BaseToken { const std::string destination; @@ -91,48 +53,23 @@ struct MessageDeliveryToken : BaseToken AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/) { //may need to set the redelivered flag: - if (isPreview) { - if (msg->getRedelivered()){ - msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true); - } - return AMQFrame(in_place<MessageXTransferBody>( - ProtocolVersion(), 0, destination, - confirmMode, acquireMode)); - } else { - if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties>()->setRedelivered(true); - } - return AMQFrame(in_place<MessageTransferBody>( - ProtocolVersion(), destination, confirmMode, acquireMode)); + if (msg->getRedelivered()){ + msg->getProperties<DeliveryProperties>()->setRedelivered(true); } + return AMQFrame(in_place<MessageTransferBody>( + ProtocolVersion(), destination, confirmMode, acquireMode)); } }; } } -DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue) -{ - return DeliveryToken::shared_ptr(new BasicGetToken(queue)); -} - -DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer) -{ - return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); -} - DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination, u_int8_t confirmMode, u_int8_t acquireMode) { return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false)); } -DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination, - u_int8_t confirmMode, u_int8_t acquireMode) -{ - return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true)); -} - void MessageDelivery::deliver(QueuedMessage& msg, framing::FrameHandler& handler, DeliveryId id, diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h index 564e1456a0..6deafbf519 100644 --- a/cpp/src/qpid/broker/MessageDelivery.h +++ b/cpp/src/qpid/broker/MessageDelivery.h @@ -34,15 +34,12 @@ class Message; class Queue; /** + * TODO: clean this up; we don't need it anymore in its current form + * * Encapsulates the different options for message delivery currently supported. */ class MessageDelivery { public: - static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue); - static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer); - static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination, - u_int8_t confirmMode, - u_int8_t acquireMode); static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination, u_int8_t confirmMode, u_int8_t acquireMode); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp deleted file mode 100644 index 5e0e759dfb..0000000000 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/Exception.h" -#include "qpid/log/Statement.h" -#include "MessageHandlerImpl.h" -#include "qpid/framing/FramingContent.h" -#include "Connection.h" -#include "Broker.h" -#include "MessageDelivery.h" -#include "qpid/framing/reply_exceptions.h" -#include "BrokerAdapter.h" - -#include <boost/format.hpp> -#include <boost/cast.hpp> -#include <boost/bind.hpp> - -namespace qpid { -namespace broker { - -using namespace framing; - -MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : - HandlerImpl(s), - releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), - rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) - {} - -// -// Message class method handlers -// - -void -MessageHandlerImpl::cancel(const string& destination ) -{ - state.cancel(destination); -} - -void -MessageHandlerImpl::open(const string& /*reference*/) -{ - throw NotImplementedException("References no longer supported"); -} - -void -MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) -{ - throw NotImplementedException("References no longer supported"); -} - -void -MessageHandlerImpl::close(const string& /*reference*/) -{ - throw NotImplementedException("References no longer supported"); -} - -void -MessageHandlerImpl::checkpoint(const string& /*reference*/, - const string& /*identifier*/ ) -{ - throw NotImplementedException("References no longer supported"); -} - -void -MessageHandlerImpl::resume(const string& /*reference*/, - const string& /*identifier*/ ) -{ - throw NotImplementedException("References no longer supported"); -} - -void -MessageHandlerImpl::offset(uint64_t /*value*/ ) -{ - throw NotImplementedException("References no longer supported"); -} - -void -MessageHandlerImpl::get(uint16_t /*ticket*/, - const string& /*queueName*/, - const string& /*destination*/, - bool /*noAck*/ ) -{ - throw NotImplementedException("get no longer supported"); -} - -void -MessageHandlerImpl::empty() -{ - throw NotImplementedException("empty no longer supported"); -} - -void -MessageHandlerImpl::ok() -{ - throw NotImplementedException("Message.Ok no longer supported"); -} - -void -MessageHandlerImpl::qos(uint32_t prefetchSize, - uint16_t prefetchCount, - bool /*global*/ ) -{ - //TODO: handle global - state.setPrefetchSize(prefetchSize); - state.setPrefetchCount(prefetchCount); -} - -void -MessageHandlerImpl::subscribe(uint16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - u_int8_t confirmMode, - u_int8_t acquireMode, - bool exclusive, - const framing::FieldTable& filter ) -{ - Queue::shared_ptr queue = state.getQueue(queueName); - if(!destination.empty() && state.exists(destination)) - throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); - - string tag = destination; - state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode), - tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter); -} - -void -MessageHandlerImpl::recover(bool requeue) -{ - state.recover(requeue); -} - -void -MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ ) -{ - transfers.processRanges(rejectOp); -} - -void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) -{ - if (unit == 0) { - //message - state.addMessageCredit(destination, value); - } else if (unit == 1) { - //bytes - state.addByteCredit(destination, value); - } else { - //unknown - throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit)); - } - -} - -void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) -{ - if (mode == 0) { - //credit - state.setCreditMode(destination); - } else if (mode == 1) { - //window - state.setWindowMode(destination); - } else{ - throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode)); - } -} - -void MessageHandlerImpl::flush(const std::string& destination) -{ - state.flush(destination); -} - -void MessageHandlerImpl::stop(const std::string& destination) -{ - state.stop(destination); -} - -void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/) -{ - //TODO: implement mode - - SequenceNumberSet results; - RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results)); - transfers.processRanges(op); - results = results.condense(); - getProxy().getMessage().acquired(results); -} - -void MessageHandlerImpl::release(const SequenceNumberSet& transfers) -{ - transfers.processRanges(releaseOp); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h deleted file mode 100644 index dd70f35dbb..0000000000 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ /dev/null @@ -1,110 +0,0 @@ -#ifndef _broker_MessageHandlerImpl_h -#define _broker_MessageHandlerImpl_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <memory> - -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "HandlerImpl.h" - -#include <boost/function.hpp> - -namespace qpid { -namespace broker { - -class Connection; -class Broker; -class MessageMessage; - -class MessageHandlerImpl : - public framing::AMQP_ServerOperations::MessageHandler, - public HandlerImpl -{ - typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; - RangedOperation releaseOp; - RangedOperation rejectOp; - - public: - MessageHandlerImpl(SemanticState&); - - void append(const std::string& reference, const std::string& bytes); - - void cancel(const std::string& destination ); - - void checkpoint(const std::string& reference, - const std::string& identifier ); - - void close(const std::string& reference ); - - void empty(); - - void get(uint16_t ticket, - const std::string& queue, - const std::string& destination, - bool noAck ); - - void offset(uint64_t value); - - void ok(); - - void open(const std::string& reference ); - - void qos(uint32_t prefetchSize, - uint16_t prefetchCount, - bool global ); - - void recover(bool requeue ); - - void reject(const framing::SequenceNumberSet& transfers, - uint16_t code, - const std::string& text ); - - void resume(const std::string& reference, - const std::string& identifier ); - - void flow(const std::string& destination, u_int8_t unit, u_int32_t value); - - void flowMode(const std::string& destination, u_int8_t mode); - - void flush(const std::string& destination); - - void stop(const std::string& destination); - - void acquire(const framing::SequenceNumberSet& transfers, u_int8_t mode); - - void release(const framing::SequenceNumberSet& transfers); - - void subscribe(u_int16_t ticket, - const std::string& queue, - const std::string& destination, - bool noLocal, - u_int8_t confirmMode, - u_int8_t acquireMode, - bool exclusive, - const framing::FieldTable& filter); - -}; - -}} // namespace qpid::broker - - - -#endif /*!_broker_MessageHandlerImpl_h*/ diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp deleted file mode 100644 index 2643c85824..0000000000 --- a/cpp/src/qpid/broker/PreviewConnection.cpp +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "PreviewConnection.h" -#include "SessionState.h" -#include "BrokerAdapter.h" -#include "Bridge.h" -#include "SemanticHandler.h" - -#include "qpid/log/Statement.h" -#include "qpid/ptr_map.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/management/ManagementAgent.h" - -#include <boost/bind.hpp> -#include <boost/ptr_container/ptr_vector.hpp> - -#include <algorithm> -#include <iostream> -#include <assert.h> - -using namespace boost; -using namespace qpid::sys; -using namespace qpid::framing; -using namespace qpid::sys; -using qpid::ptr_map_ptr; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; - -namespace qpid { -namespace broker { - -class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper -{ - management::Client::shared_ptr mgmtClient; - -public: - MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); - ~MgmtClient(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); -}; - -class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper -{ - typedef boost::ptr_vector<Bridge> Bridges; - - management::Link::shared_ptr mgmtLink; - Bridges created;//holds list of bridges pending creation - Bridges cancelled;//holds list of bridges pending cancellation - Bridges active;//holds active bridges - uint channelCounter; - sys::Mutex linkLock; - - void cancel(Bridge*); - -public: - MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId); - ~MgmtLink(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); - void processPending(); - void process(PreviewConnection& connection, const management::Args& args); -}; - - -PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : - ConnectionState(out_, broker_), - adapter(*this, isLink), - mgmtClosing(false), - mgmtId(mgmtId_) -{ - Manageable* parent = broker.GetVhostObject (); - - if (parent != 0) - { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - - if (agent.get () != 0) - { - if (isLink) { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId)); - } else { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId)); - } - } - } -} - -PreviewConnection::~PreviewConnection () { -} - -void PreviewConnection::received(framing::AMQFrame& frame){ - if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); - - if (frame.getChannel() == 0) { - adapter.handle(frame); - } else { - getChannel(frame.getChannel()).in(frame); - } - - if (mgmtWrapper.get()) mgmtWrapper->received(frame); -} - -void PreviewConnection::close( - ReplyCode code, const string& text, ClassId classId, MethodId methodId) -{ - adapter.close(code, text, classId, methodId); - channels.clear(); - getOutput().close(); -} - -void PreviewConnection::idleOut(){} - -void PreviewConnection::idleIn(){} - -void PreviewConnection::closed(){ // Physically closed, suspend open sessions. - try { - for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i) - ptr_map_ptr(i)->localSuspend(); - while (!exclusiveQueues.empty()) { - Queue::shared_ptr q(exclusiveQueues.front()); - q->releaseExclusiveOwnership(); - if (q->canAutoDelete()) { - Queue::tryAutoDelete(broker, q); - } - exclusiveQueues.erase(exclusiveQueues.begin()); - } - } catch(std::exception& e) { - QPID_LOG(error, " Unhandled exception while closing session: " << - e.what()); - assert(0); - } -} - -bool PreviewConnection::doOutput() -{ - try{ - //process any pending mgmt commands: - if (mgmtWrapper.get()) mgmtWrapper->processPending(); - if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); - - - //then do other output as needed: - return outputTasks.doOutput(); - }catch(ConnectionException& e){ - close(e.code, e.what(), 0, 0); - }catch(std::exception& e){ - close(541/*internal error*/, e.what(), 0, 0); - } - return false; -} - -void PreviewConnection::closeChannel(uint16_t id) { - ChannelMap::iterator i = channels.find(id); - if (i != channels.end()) channels.erase(i); -} - -PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) { - ChannelMap::iterator i=channels.find(id); - if (i == channels.end()) { - i = channels.insert(id, new PreviewSessionHandler(*this, id)).first; - } - return *ptr_map_ptr(i); -} - -ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const -{ - return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); -} - -Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId, - Args& args) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - - QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]"); - - switch (methodId) - { - case management::Client::METHOD_CLOSE : - mgmtClosing = true; - if (mgmtWrapper.get()) mgmtWrapper->closing(); - out->activateOutput(); - status = Manageable::STATUS_OK; - break; - case management::Link::METHOD_BRIDGE : - //queue this up and request chance to do output (i.e. get connections thread of control): - mgmtWrapper->process(*this, args); - out->activateOutput(); - status = Manageable::STATUS_OK; - break; - } - - return status; -} - -PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) - : channelCounter(1) -{ - mgmtLink = management::Link::shared_ptr - (new management::Link(conn, parent, mgmtId)); - agent->addObject (mgmtLink); -} - -PreviewConnection::MgmtLink::~MgmtLink() -{ - if (mgmtLink.get () != 0) - mgmtLink->resourceDestroy (); -} - -void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame) -{ - if (mgmtLink.get () != 0) - { - mgmtLink->inc_framesFromPeer (); - mgmtLink->inc_bytesFromPeer (frame.size ()); - } -} - -management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtLink); -} - -void PreviewConnection::MgmtLink::closing() -{ - if (mgmtLink) mgmtLink->set_closing (1); -} - -void PreviewConnection::MgmtLink::processPending() -{ - Mutex::ScopedLock l(linkLock); - //process any pending creates - if (!created.empty()) { - for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { - i->create(); - } - active.transfer(active.end(), created.begin(), created.end(), created); - } - if (!cancelled.empty()) { - //process any pending cancellations - for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) { - i->cancel(); - } - cancelled.clear(); - } -} - -void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args) -{ - Mutex::ScopedLock l(linkLock); - created.push_back(new Bridge(channelCounter++, connection, - boost::bind(&MgmtLink::cancel, this, _1), - dynamic_cast<const management::ArgsLinkBridge&>(args))); -} - -void PreviewConnection::MgmtLink::cancel(Bridge* b) -{ - Mutex::ScopedLock l(linkLock); - //need to take this out the active map and add it to the cancelled map - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if (&(*i) == b) { - cancelled.transfer(cancelled.end(), i, active); - break; - } - } -} - -PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId) -{ - mgmtClient = management::Client::shared_ptr - (new management::Client (conn, parent, mgmtId)); - agent->addObject (mgmtClient); -} - -PreviewConnection::MgmtClient::~MgmtClient() -{ - if (mgmtClient.get () != 0) - mgmtClient->resourceDestroy (); -} - -void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame) -{ - if (mgmtClient.get () != 0) - { - mgmtClient->inc_framesFromClient (); - mgmtClient->inc_bytesFromClient (frame.size ()); - } -} - -management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtClient); -} - -void PreviewConnection::MgmtClient::closing() -{ - if (mgmtClient) mgmtClient->set_closing (1); -} - -}} - diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h deleted file mode 100644 index 7a8404bf77..0000000000 --- a/cpp/src/qpid/broker/PreviewConnection.h +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _PreviewConnection_ -#define _PreviewConnection_ - -#include <memory> -#include <sstream> -#include <vector> - -#include <boost/ptr_container/ptr_map.hpp> - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/sys/AggregateOutput.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/framing/ProtocolVersion.h" -#include "Broker.h" -#include "qpid/sys/Socket.h" -#include "qpid/Exception.h" -#include "PreviewConnectionHandler.h" -#include "ConnectionState.h" -#include "PreviewSessionHandler.h" -#include "qpid/management/Manageable.h" -#include "qpid/management/Client.h" -#include "qpid/management/Link.h" - -#include <boost/ptr_container/ptr_map.hpp> - -namespace qpid { -namespace broker { - -class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState -{ - public: - PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); - ~PreviewConnection (); - - /** Get the PreviewSessionHandler for channel. Create if it does not already exist */ - PreviewSessionHandler& getChannel(framing::ChannelId channel); - - /** Close the connection */ - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); - - // ConnectionInputHandler methods - void received(framing::AMQFrame& frame); - void idleOut(); - void idleIn(); - void closed(); - bool doOutput(); - - void closeChannel(framing::ChannelId channel); - - // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; - management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args); - - private: - typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap; - typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - - /** - * Connection may appear, for the purposes of management, as a - * normal client initiated connection or as an agent initiated - * inter-broker link. This wrapper abstracts the common interface - * for both. - */ - class MgmtWrapper - { - public: - virtual ~MgmtWrapper(){} - virtual void received(framing::AMQFrame& frame) = 0; - virtual management::ManagementObject::shared_ptr getManagementObject() const = 0; - virtual void closing() = 0; - virtual void processPending(){} - virtual void process(PreviewConnection&, const management::Args&){} - }; - class MgmtClient; - class MgmtLink; - - ChannelMap channels; - framing::AMQP_ClientProxy::Connection* client; - uint64_t stagingThreshold; - PreviewConnectionHandler adapter; - std::auto_ptr<MgmtWrapper> mgmtWrapper; - bool mgmtClosing; - const std::string mgmtId; -}; - -}} - -#endif diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp deleted file mode 100644 index b6c9b03776..0000000000 --- a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "PreviewConnectionCodec.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace broker { - -using sys::Mutex; - -PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id, bool isClient) - : frameQueueClosed(false), output(o), connection(this, broker, id, isClient), identifier(id) {} - -size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) { - framing::Buffer in(const_cast<char*>(buffer), size); - framing::AMQFrame frame; - while(frame.decode(in)) { - QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection.received(frame); - } - return in.getPosition(); -} - -bool PreviewConnectionCodec::canEncode() { - if (!frameQueueClosed && frameQueue.empty()) connection.doOutput(); - return !frameQueue.empty(); -} - -bool PreviewConnectionCodec::isClosed() const { - Mutex::ScopedLock l(frameQueueLock); - return frameQueueClosed; -} - -size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) { - Mutex::ScopedLock l(frameQueueLock); - framing::Buffer out(const_cast<char*>(buffer), size); - while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { - frameQueue.front().encode(out); - QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); - frameQueue.pop(); - if (!frameQueueClosed && frameQueue.empty()) connection.doOutput(); - } - if (!frameQueue.empty() && frameQueue.front().size() > size) - throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); - return out.getPosition(); -} - -void PreviewConnectionCodec::activateOutput() { output.activateOutput(); } - -void PreviewConnectionCodec::close() { - // Close the output queue. - Mutex::ScopedLock l(frameQueueLock); - frameQueueClosed = true; -} - -void PreviewConnectionCodec::closed() { - connection.closed(); -} - -void PreviewConnectionCodec::send(framing::AMQFrame& f) { - { - Mutex::ScopedLock l(frameQueueLock); - if (!frameQueueClosed) - frameQueue.push(f); - } - activateOutput(); -} - -framing::ProtocolVersion PreviewConnectionCodec::getVersion() const { - return framing::ProtocolVersion(99,0); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h deleted file mode 100644 index f2ab086d06..0000000000 --- a/cpp/src/qpid/broker/PreviewConnectionCodec.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H -#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/sys/ConnectionCodec.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/Mutex.h" -#include "PreviewConnection.h" - -namespace qpid { -namespace broker { - -class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { - std::queue<framing::AMQFrame> frameQueue; - bool frameQueueClosed; - mutable sys::Mutex frameQueueLock; - sys::OutputControl& output; - PreviewConnection connection; - std::string identifier; - - public: - PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id, bool isClient = false); - size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); - bool isClosed() const; - bool canEncode(); - void activateOutput(); - void closed(); // connection closed by peer. - void close(); // closing from this end. - void send(framing::AMQFrame&); - framing::ProtocolVersion getVersion() const; -}; - -}} // namespace qpid::broker - -#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/ diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp deleted file mode 100644 index 3477b59cb5..0000000000 --- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp +++ /dev/null @@ -1,315 +0,0 @@ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "config.h" - -#include "PreviewConnectionHandler.h" -#include "PreviewConnection.h" -#include "qpid/framing/ConnectionStartBody.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/log/Statement.h" - -#if HAVE_SASL -#include <sasl/sasl.h> -#endif - -using namespace qpid; -using namespace qpid::broker; -using namespace qpid::framing; - - -namespace -{ -const std::string PLAIN = "PLAIN"; -const std::string en_US = "en_US"; -} - -void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) -{ - handler->client.close(code, text, classId, methodId); -} - -void PreviewConnectionHandler::handle(framing::AMQFrame& frame) -{ - AMQMethodBody* method=frame.getBody()->getMethod(); - try{ - if (handler->serverMode) { - if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method)) - throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); - } else { - if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method)) - throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); - } - }catch(ConnectionException& e){ - handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); - } -} - -PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection, bool isClient) : handler(new Handler(connection)) { - FieldTable properties; - string mechanisms; - string locales(en_US); - if (isClient) { - handler->serverMode = false; - }else { -#if HAVE_SASL - if (connection.getBroker().getOptions().auth) { - const char *list; - unsigned int list_len; - int count; - int code = sasl_listmech(handler->sasl_conn, NULL, - "", " ", "", - &list, &list_len, - &count); - - if (SASL_OK != code) { - QPID_LOG(info, "SASL: Mechanism listing failed: " - << sasl_errdetail(handler->sasl_conn)); - - // TODO: Change this to an exception signaling - // server error, when one is available - throw CommandInvalidException("Mechanism listing failed"); - } else { - // TODO: For 0-10 the mechanisms must be returned - // in a list instead of space separated - mechanisms = list; - } - } else { -#endif - // TODO: It would be more proper for this to be ANONYMOUS - mechanisms = PLAIN; -#if HAVE_SASL - } -#endif - - QPID_LOG(info, "SASL: Sending mechanism list: " << mechanisms); - - handler->serverMode = true; - handler->client.start(99, 0, properties, mechanisms, locales); - } -} - -PreviewConnectionHandler::Handler::Handler(PreviewConnection& c) : -#if HAVE_SASL - sasl_conn(NULL), -#endif - client(c.getOutput()), server(c.getOutput()), - connection(c), serverMode(false) -{ -#if HAVE_SASL - if (connection.getBroker().getOptions().auth) { - int code = sasl_server_new(BROKER_SASL_NAME, - NULL, NULL, NULL, NULL, NULL, 0, - &sasl_conn); - - if (SASL_OK != code) { - QPID_LOG(info, "SASL: Connection creation failed: " - << sasl_errdetail(sasl_conn)); - - // TODO: Change this to an exception signaling - // server error, when one is available - throw CommandInvalidException("Unable to perform authentication"); - } - } -#endif -} - -PreviewConnectionHandler::Handler::~Handler() -{ -#if HAVE_SASL - if (NULL != sasl_conn) { - sasl_dispose(&sasl_conn); - sasl_conn = NULL; - } -#endif -} - -#if HAVE_SASL -void PreviewConnectionHandler::Handler::processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len) -{ - if (SASL_OK == code) { - const void *uid; - - code = sasl_getprop(sasl_conn, - SASL_USERNAME, - &uid); - if (SASL_OK != code) { - QPID_LOG(info, "SASL: Authentication succeeded, username unavailable"); - // TODO: Change this to an exception signaling - // authentication failure, when one is available - throw ConnectionForcedException("Authenticated username unavailable"); - } - - QPID_LOG(info, "SASL: Authentication succeeded for: " << (char *)uid); - - connection.setUserId((char *)uid); - - client.tune(framing::CHANNEL_MAX, - connection.getFrameMax(), - connection.getHeartbeat()); - } else if (SASL_CONTINUE == code) { - string challenge_str(challenge, challenge_len); - - QPID_LOG(debug, "SASL: sending challenge to client"); - - client.secure(challenge_str); - } else { - QPID_LOG(info, "SASL: Authentication failed: " - << sasl_errdetail(sasl_conn)); - - // TODO: Change to more specific exceptions, when they are - // available - switch (code) { - case SASL_NOMECH: - throw ConnectionForcedException("Unsupported mechanism"); - break; - case SASL_TRYAGAIN: - throw ConnectionForcedException("Transient failure, try again"); - break; - default: - throw ConnectionForcedException("Authentication failed"); - break; - } - } -} -#endif - -void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/, -#if HAVE_SASL - const string& mechanism, - const string& response, -#else - const string& /*mechanism*/, - const string& /*response*/, -#endif - const string& /*locale*/) -{ -#if HAVE_SASL - if (connection.getBroker().getOptions().auth) { - const char *challenge; - unsigned int challenge_len; - - QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); - int code = sasl_server_start(sasl_conn, - mechanism.c_str(), - response.c_str(), response.length(), - &challenge, &challenge_len); - - processAuthenticationStep(code, challenge, challenge_len); - } else { -#endif - QPID_LOG(warning, "SASL: No Authentication Performed"); - - // TODO: Figure out what should actually be set in this case - connection.setUserId("anonymous"); - - client.tune(framing::CHANNEL_MAX, - connection.getFrameMax(), - connection.getHeartbeat()); -#if HAVE_SASL - } -#endif -} - -void PreviewConnectionHandler::Handler::secureOk(const string& -#if HAVE_SASL - response -#endif - ) { -#if HAVE_SASL - int code; - const char *challenge; - unsigned int challenge_len; - - code = sasl_server_step(sasl_conn, - response.c_str(), response.length(), - &challenge, &challenge_len); - - processAuthenticationStep(code, challenge, challenge_len); -#endif -} - -void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, - uint32_t framemax, uint16_t heartbeat) -{ - connection.setFrameMax(framemax); - connection.setHeartbeat(heartbeat); -} - -void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/, - const string& /*capabilities*/, bool /*insist*/) -{ - string knownhosts; - client.openOk(knownhosts); -} - - -void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, - uint16_t /*classId*/, uint16_t /*methodId*/) -{ - client.closeOk(); - connection.getOutput().close(); -} - -void PreviewConnectionHandler::Handler::closeOk(){ - connection.getOutput().close(); -} - - -void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/, - uint8_t /*versionMinor*/, - const FieldTable& /*serverProperties*/, - const string& /*mechanisms*/, - const string& /*locales*/) -{ - string uid = "qpidd"; - string pwd = "qpidd"; - string response = ((char)0) + uid + ((char)0) + pwd; - server.startOk(FieldTable(), PLAIN, response, en_US); -} - -void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/) -{ - server.secureOk(""); -} - -void PreviewConnectionHandler::Handler::tune(uint16_t channelMax, - uint32_t frameMax, - uint16_t heartbeat) -{ - connection.setFrameMax(frameMax); - connection.setHeartbeat(heartbeat); - server.tuneOk(channelMax, frameMax, heartbeat); - server.open("/", "", true); -} - -void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/) -{ -} - -void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/) -{ - -} diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h deleted file mode 100644 index b71068d81d..0000000000 --- a/cpp/src/qpid/broker/PreviewConnectionHandler.h +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _PreviewConnectionAdapter_ -#define _PreviewConnectionAdapter_ - -#include "config.h" - -#include <memory> -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/AMQP_ServerProxy.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/Exception.h" - -#if HAVE_SASL -#include <sasl/sasl.h> -#endif - -namespace qpid { -namespace broker { - -class PreviewConnection; - -// TODO aconway 2007-09-18: Rename to ConnectionHandler -class PreviewConnectionHandler : public framing::FrameHandler -{ - struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, - public framing::AMQP_ClientOperations::ConnectionHandler - { -#if HAVE_SASL - sasl_conn_t *sasl_conn; -#endif - framing::AMQP_ClientProxy::Connection client; - framing::AMQP_ServerProxy::Connection server; - PreviewConnection& connection; - bool serverMode; - - Handler(PreviewConnection& connection); - ~Handler(); - void startOk(const qpid::framing::FieldTable& clientProperties, - const std::string& mechanism, const std::string& response, - const std::string& locale); - void secureOk(const std::string& response); - void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); - void open(const std::string& virtualHost, - const std::string& capabilities, bool insist); - void close(uint16_t replyCode, const std::string& replyText, - uint16_t classId, uint16_t methodId); - void closeOk(); - - - void start(uint8_t versionMajor, - uint8_t versionMinor, - const qpid::framing::FieldTable& serverProperties, - const std::string& mechanisms, - const std::string& locales); - - void secure(const std::string& challenge); - - void tune(uint16_t channelMax, - uint32_t frameMax, - uint16_t heartbeat); - - void openOk(const std::string& knownHosts); - - void redirect(const std::string& host, const std::string& knownHosts); - private: -#if HAVE_SASL - void processAuthenticationStep(int code, - const char *challenge, - unsigned int challenge_len); -#endif - }; - std::auto_ptr<Handler> handler; - public: - PreviewConnectionHandler(PreviewConnection& connection, bool isClient); - void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); - void handle(framing::AMQFrame& frame); -}; - - -}} - -#endif diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/cpp/src/qpid/broker/PreviewSessionHandler.cpp deleted file mode 100644 index 36092bb7f6..0000000000 --- a/cpp/src/qpid/broker/PreviewSessionHandler.cpp +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "PreviewSessionHandler.h" -#include "PreviewSessionState.h" -#include "PreviewConnection.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/constants.h" -#include "qpid/framing/ClientInvoker.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/log/Statement.h" - -#include <boost/bind.hpp> - -namespace qpid { -namespace broker { -using namespace framing; -using namespace std; -using namespace qpid::sys; - -PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch) - : InOutHandler(0, &out), - connection(c), channel(ch, &c.getOutput()), - proxy(out), // Via my own handleOut() for L2 data. - peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) {} - -PreviewSessionHandler::~PreviewSessionHandler() {} - -namespace { -ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } -MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } -} // namespace - -void PreviewSessionHandler::handleIn(AMQFrame& f) { - // Note on channel states: a channel is open if session != 0. A - // channel that is closed (session == 0) can be in the "ignoring" - // state. This is a temporary state after we have sent a channel - // exception, where extra frames might arrive that should be - // ignored. - // - AMQMethodBody* m = f.getBody()->getMethod(); - try { - if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { - return; - } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); - session->in.handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; - } else if (!ignoring) { - throw ChannelErrorException( - QPID_MSG("Channel " << channel.get() << " is not open")); - } - } catch(const ChannelException& e) { - ignoring=true; // Ignore trailing frames sent by client. - session->detach(); - session.reset(); - peerSession.closed(e.code, e.what()); - }catch(const ConnectionException& e){ - connection.close(e.code, e.what(), classId(m), methodId(m)); - }catch(const std::exception& e){ - connection.close( - framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); - } -} - -void PreviewSessionHandler::handleOut(AMQFrame& f) { - channel.handle(f); // Send it. - if (session->sent(f)) - peerSession.solicitAck(); -} - -void PreviewSessionHandler::assertAttached(const char* method) const { - if (!session.get()) - throw ChannelErrorException( - QPID_MSG(method << " failed: No session for channel " - << getChannel())); -} - -void PreviewSessionHandler::assertClosed(const char* method) const { - if (session.get()) - throw ChannelBusyException( - QPID_MSG(method << " failed: channel " << channel.get() - << " is already open.")); -} - -void PreviewSessionHandler::open(uint32_t detachedLifetime) { - assertClosed("open"); - std::auto_ptr<PreviewSessionState> state( - connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); - peerSession.attached(session->getId(), session->getTimeout()); -} - -void PreviewSessionHandler::resume(const Uuid& id) { - assertClosed("resume"); - session = connection.broker.getPreviewSessionManager().resume(id); - session->attach(*this); - SequenceNumber seq = session->resuming(); - peerSession.attached(session->getId(), session->getTimeout()); - proxy.getSession().ack(seq, SequenceNumberSet()); -} - -void PreviewSessionHandler::flow(bool /*active*/) { - assertAttached("flow"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flow"); -} - -void PreviewSessionHandler::flowOk(bool /*active*/) { - assertAttached("flowOk"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flowOk"); -} - -void PreviewSessionHandler::close() { - assertAttached("close"); - QPID_LOG(info, "Received session.close"); - ignoring=false; - session->detach(); - session.reset(); - peerSession.closed(REPLY_SUCCESS, "ok"); - assert(&connection.getChannel(channel.get()) == this); - connection.closeChannel(channel.get()); -} - -void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) { - QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); - ignoring=false; - session->detach(); - session.reset(); -} - -void PreviewSessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getPreviewSessionManager().suspend(session); - session.reset(); - } -} - -void PreviewSessionHandler::suspend() { - assertAttached("suspend"); - localSuspend(); - peerSession.detached(); - assert(&connection.getChannel(channel.get()) == this); - connection.closeChannel(channel.get()); -} - -void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark, - const SequenceNumberSet& /*seenFrameSet*/) -{ - assertAttached("ack"); - if (session->getState() == PreviewSessionState::RESUMING) { - session->receivedAck(cumulativeSeenMark); - framing::SessionState::Replay replay=session->replay(); - std::for_each(replay.begin(), replay.end(), - boost::bind(&PreviewSessionHandler::handleOut, this, _1)); - } - else - session->receivedAck(cumulativeSeenMark); -} - -void PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.high-water-mark"); -} - -void PreviewSessionHandler::solicitAck() { - assertAttached("solicit-ack"); - peerSession.ack(session->sendingAck(), SequenceNumberSet()); -} - -void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) -{ - std::auto_ptr<PreviewSessionState> state( - connection.broker.getPreviewSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); -} - -void PreviewSessionHandler::detached() -{ - connection.broker.getPreviewSessionManager().suspend(session); - session.reset(); -} - -ConnectionState& PreviewSessionHandler::getConnection() { return connection; } -const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; } - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.h b/cpp/src/qpid/broker/PreviewSessionHandler.h deleted file mode 100644 index 4c517367d7..0000000000 --- a/cpp/src/qpid/broker/PreviewSessionHandler.h +++ /dev/null @@ -1,111 +0,0 @@ -#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H -#define QPID_BROKER_PREVIEWSESSIONHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AMQP_ClientOperations.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/AMQP_ClientProxy.h" -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/ChannelHandler.h" -#include "SessionContext.h" - -#include <boost/noncopyable.hpp> - -namespace qpid { -namespace broker { - -class PreviewConnection; -class PreviewSessionState; - -/** - * A SessionHandler is associated with each active channel. It - * receives incoming frames, handles session commands and manages the - * association between the channel and a session. - */ -class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler, - public framing::AMQP_ClientOperations::SessionHandler, - public framing::FrameHandler::InOutHandler, - private boost::noncopyable -{ - public: - PreviewSessionHandler(PreviewConnection&, framing::ChannelId); - ~PreviewSessionHandler(); - - /** Returns 0 if not attached to a session */ - PreviewSessionState* getSession() { return session.get(); } - const PreviewSessionState* getSession() const { return session.get(); } - - framing::ChannelId getChannel() const { return channel.get(); } - - ConnectionState& getConnection(); - const ConnectionState& getConnection() const; - - framing::AMQP_ClientProxy& getProxy() { return proxy; } - const framing::AMQP_ClientProxy& getProxy() const { return proxy; } - - // Called by closing connection. - void localSuspend(); - void detach() { localSuspend(); } - - protected: - void handleIn(framing::AMQFrame&); - void handleOut(framing::AMQFrame&); - - private: - /// Session methods - void open(uint32_t detachedLifetime); - void flow(bool active); - void flowOk(bool active); - void close(); - void closed(uint16_t replyCode, const std::string& replyText); - void resume(const framing::Uuid& sessionId); - void suspend(); - void ack(uint32_t cumulativeSeenMark, - const framing::SequenceNumberSet& seenFrameSet); - void highWaterMark(uint32_t lastSentMark); - void solicitAck(); - - //extra methods required for assuming client role - void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); - void detached(); - - - void assertAttached(const char* method) const; - void assertActive(const char* method) const; - void assertClosed(const char* method) const; - - - PreviewConnection& connection; - framing::ChannelHandler channel; - framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session peerSession; - bool ignoring; - std::auto_ptr<PreviewSessionState> session; -}; - -}} // namespace qpid::broker - - - -#endif /*!QPID_BROKER_SESSIONHANDLER_H*/ diff --git a/cpp/src/qpid/broker/PreviewSessionManager.cpp b/cpp/src/qpid/broker/PreviewSessionManager.cpp deleted file mode 100644 index 97a7c87e34..0000000000 --- a/cpp/src/qpid/broker/PreviewSessionManager.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "PreviewSessionManager.h" -#include "PreviewSessionState.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" -#include "qpid/log/Helpers.h" -#include "qpid/memory.h" - -#include <boost/bind.hpp> -#include <boost/range.hpp> -#include <boost/intrusive_ptr.hpp> - -#include <algorithm> -#include <functional> -#include <ostream> - -namespace qpid { -namespace broker { - -using namespace sys; -using namespace framing; - -PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {} - -PreviewSessionManager::~PreviewSessionManager() {} - -// FIXME aconway 2008-02-01: pass handler*, allow open unattached. -std::auto_ptr<PreviewSessionState> PreviewSessionManager::open( - PreviewSessionHandler& h, uint32_t timeout_) -{ - Mutex::ScopedLock l(lock); - std::auto_ptr<PreviewSessionState> session( - new PreviewSessionState(this, &h, timeout_, ack)); - active.insert(session->getId()); - for_each(observers.begin(), observers.end(), - boost::bind(&Observer::opened, _1,boost::ref(*session))); - return session; -} - -void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) { - Mutex::ScopedLock l(lock); - active.erase(session->getId()); - session->suspend(); - session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); - if (session->mgmtObject.get() != 0) - session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry)); - suspended.push_back(session.release()); // In expiry order - eraseExpired(); -} - -std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id) -{ - Mutex::ScopedLock l(lock); - eraseExpired(); - if (active.find(id) != active.end()) - throw SessionBusyException( - QPID_MSG("Session already active: " << id)); - Suspended::iterator i = std::find_if( - suspended.begin(), suspended.end(), - boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1)) - ); - if (i == suspended.end()) - throw InvalidArgumentException( - QPID_MSG("No suspended session with id=" << id)); - active.insert(id); - std::auto_ptr<PreviewSessionState> state(suspended.release(i).release()); - return state; -} - -void PreviewSessionManager::erase(const framing::Uuid& id) -{ - Mutex::ScopedLock l(lock); - active.erase(id); -} - -void PreviewSessionManager::eraseExpired() { - // Called with lock held. - if (!suspended.empty()) { - Suspended::iterator keep = std::lower_bound( - suspended.begin(), suspended.end(), now(), - boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2)); - if (suspended.begin() != keep) { - QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); - suspended.erase(suspended.begin(), keep); - } - } -} - -void PreviewSessionManager::add(const boost::intrusive_ptr<Observer>& o) { - observers.push_back(o); -} - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionManager.h b/cpp/src/qpid/broker/PreviewSessionManager.h deleted file mode 100644 index 9bc6bc5bbc..0000000000 --- a/cpp/src/qpid/broker/PreviewSessionManager.h +++ /dev/null @@ -1,101 +0,0 @@ -#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H -#define QPID_BROKER_PREVIEWSESSIONMANAGER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/framing/Uuid.h> -#include <qpid/sys/Time.h> -#include <qpid/sys/Mutex.h> -#include <qpid/RefCounted.h> - -#include <set> -#include <vector> -#include <memory> - -#include <boost/noncopyable.hpp> -#include <boost/ptr_container/ptr_vector.hpp> -#include <boost/intrusive_ptr.hpp> - -namespace qpid { -namespace broker { - -class PreviewSessionState; -class PreviewSessionHandler; - -/** - * Create and manage PreviewSessionState objects. - */ -class PreviewSessionManager : private boost::noncopyable { - public: - /** - * Observer notified of PreviewSessionManager events. - */ - struct Observer : public RefCounted { - virtual void opened(PreviewSessionState&) {} - }; - - PreviewSessionManager(uint32_t ack); - - ~PreviewSessionManager(); - - /** Open a new active session, caller takes ownership */ - std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_); - - /** Suspend a session, start it's timeout counter. - * The factory takes ownership. - */ - void suspend(std::auto_ptr<PreviewSessionState> session); - - /** Resume a suspended session. - *@throw Exception if timed out or non-existant. - */ - std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&); - - /** Add an Observer. */ - void add(const boost::intrusive_ptr<Observer>&); - - private: - typedef boost::ptr_vector<PreviewSessionState> Suspended; - typedef std::set<framing::Uuid> Active; - typedef std::vector<boost::intrusive_ptr<Observer> > Observers; - - void erase(const framing::Uuid&); - void eraseExpired(); - - sys::Mutex lock; - Suspended suspended; - Active active; - uint32_t ack; - Observers observers; - - friend class PreviewSessionState; // removes deleted sessions from active set. -}; - - - -}} // namespace qpid::broker - - - - - -#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/ diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp deleted file mode 100644 index 43c3b1509e..0000000000 --- a/cpp/src/qpid/broker/PreviewSessionState.cpp +++ /dev/null @@ -1,174 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "PreviewSessionState.h" -#include "PreviewSessionManager.h" -#include "PreviewSessionHandler.h" -#include "ConnectionState.h" -#include "Broker.h" -#include "SemanticHandler.h" -#include "qpid/framing/reply_exceptions.h" - -namespace qpid { -namespace broker { - -using namespace framing; -using sys::Mutex; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; - -PreviewSessionState::PreviewSessionState( - PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack) - : framing::SessionState(ack, timeout_ > 0), - factory(f), handler(h), id(true), timeout(timeout_), - broker(h->getConnection().broker), - version(h->getConnection().getVersion()), - semanticHandler(new SemanticHandler(*this)) -{ - in.next = semanticHandler.get(); - out.next = &handler->out; - - getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); - - Manageable* parent = broker.GetVhostObject (); - - if (parent != 0) - { - ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); - - if (agent.get () != 0) - { - mgmtObject = management::Session::shared_ptr - (new management::Session (this, parent, id.str ())); - mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h->getChannel()); - mgmtObject->set_detachedLifespan (getTimeout()); - agent->addObject (mgmtObject); - } - } -} - -PreviewSessionState::~PreviewSessionState() { - // Remove ID from active session list. - if (factory) - factory->erase(getId()); - if (mgmtObject.get () != 0) - mgmtObject->resourceDestroy (); -} - -PreviewSessionHandler* PreviewSessionState::getHandler() { - return handler; -} - -AMQP_ClientProxy& PreviewSessionState::getProxy() { - assert(isAttached()); - return getHandler()->getProxy(); -} - -ConnectionState& PreviewSessionState::getConnection() { - assert(isAttached()); - return getHandler()->getConnection(); -} - -bool PreviewSessionState::isLocal(const ConnectionToken* t) const -{ - return isAttached() && &(handler->getConnection()) == t; -} - -void PreviewSessionState::detach() { - getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState()); - Mutex::ScopedLock l(lock); - handler = 0; out.next = 0; - if (mgmtObject.get() != 0) - { - mgmtObject->set_attached (0); - } -} - -void PreviewSessionState::attach(PreviewSessionHandler& h) { - { - Mutex::ScopedLock l(lock); - handler = &h; - out.next = &handler->out; - if (mgmtObject.get() != 0) - { - mgmtObject->set_attached (1); - mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId()); - mgmtObject->set_channelId (h.getChannel()); - } - } - h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState()); -} - -void PreviewSessionState::activateOutput() -{ - Mutex::ScopedLock l(lock); - if (isAttached()) { - getConnection().outputTasks.activateOutput(); - } -} - //This class could be used as the callback for queue notifications - //if not attached, it can simply ignore the callback, else pass it - //on to the connection - -ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const -{ - return dynamic_pointer_cast<ManagementObject> (mgmtObject); -} - -Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId, - Args& /*args*/) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - - switch (methodId) - { - case management::Session::METHOD_DETACH : - if (handler != 0) - { - handler->detach(); - } - status = Manageable::STATUS_OK; - break; - - case management::Session::METHOD_CLOSE : - /* - if (handler != 0) - { - handler->getConnection().closeChannel(handler->getChannel()); - } - status = Manageable::STATUS_OK; - break; - */ - - case management::Session::METHOD_SOLICITACK : - case management::Session::METHOD_RESETLIFESPAN : - status = Manageable::STATUS_NOT_IMPLEMENTED; - break; - } - - return status; -} - - -}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h deleted file mode 100644 index 1aecb12e72..0000000000 --- a/cpp/src/qpid/broker/PreviewSessionState.h +++ /dev/null @@ -1,125 +0,0 @@ -#ifndef QPID_BROKER_PREVIEWSESSION_H -#define QPID_BROKER_PREVIEWSESSION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/Uuid.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SessionState.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Time.h" -#include "qpid/management/Manageable.h" -#include "qpid/management/Session.h" -#include "SessionContext.h" - -#include <boost/noncopyable.hpp> -#include <boost/scoped_ptr.hpp> - -#include <set> -#include <vector> -#include <ostream> - -namespace qpid { - -namespace framing { -class AMQP_ClientProxy; -} - -namespace broker { - -class SemanticHandler; -class PreviewSessionHandler; -class PreviewSessionManager; -class Broker; -class ConnectionState; - -/** - * Broker-side session state includes sessions handler chains, which may - * themselves have state. - */ -class PreviewSessionState : public framing::SessionState, - public SessionContext, - public framing::FrameHandler::Chains, - public management::Manageable -{ - public: - ~PreviewSessionState(); - bool isAttached() const { return handler; } - - void detach(); - void attach(PreviewSessionHandler& handler); - - - PreviewSessionHandler* getHandler(); - - /** @pre isAttached() */ - framing::AMQP_ClientProxy& getProxy(); - - /** @pre isAttached() */ - ConnectionState& getConnection(); - bool isLocal(const ConnectionToken* t) const; - - uint32_t getTimeout() const { return timeout; } - Broker& getBroker() { return broker; } - framing::ProtocolVersion getVersion() const { return version; } - - /** OutputControl **/ - void activateOutput(); - - // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject (void) const; - management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args); - - // Normally SessionManager creates sessions. - PreviewSessionState(PreviewSessionManager*, - PreviewSessionHandler* out, - uint32_t timeout, - uint32_t ackInterval); - - - private: - PreviewSessionManager* factory; - PreviewSessionHandler* handler; - framing::Uuid id; - uint32_t timeout; - sys::AbsTime expiry; // Used by SessionManager. - Broker& broker; - framing::ProtocolVersion version; - sys::Mutex lock; - boost::scoped_ptr<SemanticHandler> semanticHandler; - management::Session::shared_ptr mgmtObject; - - friend class PreviewSessionManager; -}; - - -inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) { - return out << session.getId(); -} - -}} // namespace qpid::broker - - - -#endif /*!QPID_BROKER_SESSION_H*/ diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 628d969c69..e799cde2b9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -340,11 +340,11 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) { void Queue::consume(Consumer& c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { - throw AccessRefusedException( + throw ResourceLockedException( QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } else if(requestExclusive) { if(consumerCount) { - throw AccessRefusedException( + throw ResourceLockedException( QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } else { exclusive = c.getSession(); @@ -596,7 +596,6 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) queue->unbind(broker.getExchanges(), queue); queue->destroy(); } - } bool Queue::isExclusiveOwner(const OwnershipToken* const o) const diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp index e2fcd493db..95e529f47e 100644 --- a/cpp/src/qpid/broker/QueueBindings.cpp +++ b/cpp/src/qpid/broker/QueueBindings.cpp @@ -20,8 +20,10 @@ */ #include "QueueBindings.h" #include "ExchangeRegistry.h" +#include "qpid/framing/reply_exceptions.h" using qpid::framing::FieldTable; +using qpid::framing::NotFoundException; using std::string; using namespace qpid::broker; @@ -35,7 +37,7 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue) for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { try { exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args)); - } catch (ChannelException&) { + } catch (const NotFoundException&) { } } } diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp index 9ca4069a12..56718502f1 100644 --- a/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -23,6 +23,7 @@ #include "Connection.h" #include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #if HAVE_SASL #include <sasl/sasl.h> @@ -37,7 +38,7 @@ namespace broker { class NullAuthenticator : public SaslAuthenticator { Connection& connection; - framing::AMQP_ClientProxy::Connection010 client; + framing::AMQP_ClientProxy::Connection client; public: NullAuthenticator(Connection& connection); ~NullAuthenticator(); @@ -52,7 +53,7 @@ class CyrusAuthenticator : public SaslAuthenticator { sasl_conn_t *sasl_conn; Connection& connection; - framing::AMQP_ClientProxy::Connection010 client; + framing::AMQP_ClientProxy::Connection client; void processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len); @@ -117,7 +118,7 @@ void CyrusAuthenticator::init() // TODO: Change this to an exception signaling // server error, when one is available - throw CommandInvalidException("Unable to perform authentication"); + throw ConnectionForcedException("Unable to perform authentication"); } } @@ -146,7 +147,7 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms) // TODO: Change this to an exception signaling // server error, when one is available - throw CommandInvalidException("Mechanism listing failed"); + throw ConnectionForcedException("Mechanism listing failed"); } else { string mechanism; unsigned int start; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp deleted file mode 100644 index 411e0ce3c0..0000000000 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ /dev/null @@ -1,195 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SemanticHandler.h" -#include "SemanticState.h" -#include "SessionContext.h" -#include "BrokerAdapter.h" -#include "MessageDelivery.h" -#include "qpid/framing/ExecutionXCompleteBody.h" -#include "qpid/framing/ServerInvoker.h" -#include "qpid/log/Statement.h" - -#include <boost/format.hpp> -#include <boost/bind.hpp> - -using boost::intrusive_ptr; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -SemanticHandler::SemanticHandler(SessionContext& s) : - state(*this,s), session(s), - msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()), - ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2)) - {} - -void SemanticHandler::handle(framing::AMQFrame& frame) -{ - //TODO: assembly for method and headers - - //have potentially three separate tracks at this point: - // - // (1) execution controls - // (2) commands - // (3) data i.e. content-bearing commands - // - //framesets on each can be interleaved. framesets on the latter - //two share a command-id sequence. controls on the first track are - //used to communicate details about that command-id sequence. - // - //need to decide what to do if a frame on the command track - //arrives while a frameset on the data track is still - //open. execute it (i.e. out-of order execution with respect to - //the command id sequence) or queue it up? - - TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header - - switch(track) { - case EXECUTION_CONTROL_TRACK: - handleL3(frame.getMethod()); - break; - case MODEL_COMMAND_TRACK: - handleCommand(frame.getMethod()); - break; - case MODEL_CONTENT_TRACK: - handleContent(frame); - break; - } -} - -void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - //record: - SequenceNumber mark(cumulative); - if (outgoing.lwm < mark) { - outgoing.lwm = mark; - //ack messages: - state.ackCumulative(mark.getValue()); - } - range.processRanges(ackOp); -} - -void SemanticHandler::sendCompletion() -{ - SequenceNumber mark = incoming.getMark(); - SequenceNumberSet range = incoming.getRange(); - session.getProxy().getExecution().complete(mark.getValue(), range); -} - -void SemanticHandler::flush() -{ - incoming.flush(); - sendCompletion(); -} -void SemanticHandler::sync() -{ - incoming.sync(); - sendCompletion(); -} - -void SemanticHandler::noop() -{ - incoming.noop(); -} - -void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/) -{ - //never actually sent by client at present -} - -void SemanticHandler::handleCommand(framing::AMQMethodBody* method) -{ - SequenceNumber id = incoming.next(); - BrokerAdapter adapter(state); - Invoker::Result invoker = invoke(adapter, *method); - incoming.complete(id); - - if (!invoker.wasHandled()) { - throw NotImplementedException("Not implemented"); - } else if (invoker.hasResult()) { - session.getProxy().getExecution().result(id.getValue(), invoker.getResult()); - } - if (method->isSync()) { - incoming.sync(id); - sendCompletion(); - } - //TODO: if window gets too large send unsolicited completion -} - -void SemanticHandler::handleL3(framing::AMQMethodBody* method) -{ - if (!invoke(*this, *method)) - throw NotImplementedException("Not implemented"); -} - -void SemanticHandler::handleContent(AMQFrame& frame) -{ - intrusive_ptr<Message> msg(msgBuilder.getMessage()); - if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(incoming.next()); - msg = msgBuilder.getMessage(); - } - msgBuilder.handle(frame); - if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags - msg->setPublisher(&session.getConnection()); - state.handle(msg); - msgBuilder.end(); - incoming.track(msg); - if (msg->getFrames().getMethod()->isSync()) { - incoming.sync(msg->getCommandId()); - sendCompletion(); - } - } -} - -DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) -{ - uint32_t maxFrameSize = session.getConnection().getFrameMax(); - MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize); - return outgoing.hwm; -} - -SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) -{ - //will be replaced by field in 0-10 frame header - uint8_t type = frame.getBody()->type(); - uint16_t classId; - switch(type) { - case METHOD_BODY: - if (frame.castBody<AMQMethodBody>()->isContentBearing()) { - return MODEL_CONTENT_TRACK; - } - - classId = frame.castBody<AMQMethodBody>()->amqpClassId(); - switch (classId) { - case ExecutionXCompleteBody::CLASS_ID: - return EXECUTION_CONTROL_TRACK; - } - - return MODEL_COMMAND_TRACK; - case HEADER_BODY: - case CONTENT_BODY: - return MODEL_CONTENT_TRACK; - } - throw Exception("Could not determine track"); -} - diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h deleted file mode 100644 index 893a0cbded..0000000000 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SemanticHandler_ -#define _SemanticHandler_ - -#include <memory> -#include "BrokerAdapter.h" -#include "DeliveryAdapter.h" -#include "MessageBuilder.h" -#include "IncomingExecutionContext.h" -#include "HandlerImpl.h" - -#include "qpid/framing/amqp_types.h" -#include "qpid/framing/AMQP_ServerOperations.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SequenceNumber.h" - -#include <boost/function.hpp> - -namespace qpid { - -namespace framing { -class AMQMethodBody; -class AMQHeaderBody; -class AMQContentBody; -class AMQHeaderBody; -} - -namespace broker { - -class SessionContext; - -class SemanticHandler : public DeliveryAdapter, - public framing::FrameHandler, - public framing::AMQP_ServerOperations::ExecutionHandler - -{ - typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; - - SemanticState state; - SessionContext& session; - // TODO aconway 2007-09-20: Why are these on the handler rather than the - // state? - IncomingExecutionContext incoming; - framing::Window outgoing; - MessageBuilder msgBuilder; - RangedOperation ackOp; - - enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK}; - TrackId getTrack(const framing::AMQFrame& frame); - - void handleL3(framing::AMQMethodBody* method); - void handleCommand(framing::AMQMethodBody* method); - void handleContent(framing::AMQFrame& frame); - - void sendCompletion(); - - //delivery adapter methods: - DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); - - framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } - //Connection& getConnection() { return session.getConnection(); } - Broker& getBroker() { return session.getConnection().getBroker(); } - -public: - SemanticHandler(SessionContext& session); - - //frame handler: - void handle(framing::AMQFrame& frame); - - //execution class method handlers: - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void result(uint32_t command, const std::string& data); - void sync(); - - - SemanticState& getSemanticState() { return state; } -}; - -}} - -#endif diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index ab6b82a232..e73540891c 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -19,22 +19,19 @@ * */ -#include "SessionContext.h" -#include "BrokerAdapter.h" -#include "Queue.h" +#include "SessionState.h" #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" #include "DtxTimeout.h" #include "Message.h" -#include "SemanticHandler.h" -#include "SessionHandler.h" +#include "Queue.h" +#include "SessionContext.h" #include "TxAccept.h" #include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/MessageXTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -357,9 +354,7 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) - if (msg->isA<MessageXTransferBody>()) { - msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName); - } else if (msg->isA<MessageTransferBody>()) { + if (msg->isA<MessageTransferBody>()) { msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c92b9bb945..e284451d14 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -113,7 +113,7 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); - } catch (const ChannelException& e) { + } catch (const NotFoundException& e) { return ExchangeQueryResult("", false, true, FieldTable()); } } @@ -163,7 +163,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string Exchange::shared_ptr exchange; try { exchange = getBroker().getExchanges().get(exchangeName); - } catch (const ChannelException&) {} + } catch (const NotFoundException&) {} Queue::shared_ptr queue; if (!queueName.empty()) { @@ -192,7 +192,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() { - destroyExclusiveQueues(); + try { + destroyExclusiveQueues(); + } catch (std::exception& e) { + QPID_LOG(error, e.what()); + } } void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() @@ -370,7 +374,7 @@ void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, u_ state.addByteCredit(destination, value); } else { //unknown - throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit)); + throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit)); } } @@ -384,7 +388,7 @@ void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destinat //window state.setWindowMode(destination); } else{ - throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode)); + throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode)); } } @@ -419,19 +423,26 @@ framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const return MessageAcquireResult(acquisitions); } +framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const std::string& /*destination*/, + const std::string& /*resumeId*/) +{ + throw NotImplementedException("resuming transfers not yet supported"); +} + + void SessionAdapter::ExecutionHandlerImpl::sync() { //TODO } -void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/) +void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/) { //TODO } void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/, - uint32_t /*commandId*/, + const SequenceNumber& /*commandId*/, uint8_t /*classCode*/, uint8_t /*commandCode*/, uint8_t /*fieldIndex*/, @@ -470,7 +481,7 @@ void SessionAdapter::DtxHandlerImpl::select() state.selectDtx(); } -DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, +XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, bool fail, bool suspend) { @@ -480,7 +491,7 @@ DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return DtxEndResult(XA_RBROLLBACK); + return XaResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -488,14 +499,14 @@ DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, } else { state.endDtx(convert(xid), false); } - return DtxEndResult(XA_OK); + return XaResult(XA_OK); } } catch (const DtxTimeoutException& e) { - return DtxEndResult(XA_RBTIMEOUT); + return XaResult(XA_RBTIMEOUT); } } -DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, +XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, bool join, bool resume) { @@ -508,41 +519,41 @@ DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, } else { state.startDtx(convert(xid), getBroker().getDtxManager(), join); } - return DtxStartResult(XA_OK); + return XaResult(XA_OK); } catch (const DtxTimeoutException& e) { - return DtxStartResult(XA_RBTIMEOUT); + return XaResult(XA_RBTIMEOUT); } } -DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) +XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) { try { bool ok = getBroker().getDtxManager().prepare(convert(xid)); - return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK); + return XaResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return DtxPrepareResult(XA_RBTIMEOUT); + return XaResult(XA_RBTIMEOUT); } } -DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, +XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool onePhase) { try { bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); - return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK); + return XaResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return DtxCommitResult(XA_RBTIMEOUT); + return XaResult(XA_RBTIMEOUT); } } -DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) +XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) { try { getBroker().getDtxManager().rollback(convert(xid)); - return DtxRollbackResult(XA_OK); + return XaResult(XA_OK); } catch (const DtxTimeoutException& e) { - return DtxRollbackResult(XA_RBTIMEOUT); + return XaResult(XA_RBTIMEOUT); } } diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index b5bf44ceba..4eaaf13f8d 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -54,35 +54,19 @@ class Queue; public: SessionAdapter(SemanticState& session); - framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();} - Message010Handler* getMessage010Handler(){ return &messageImpl; } - Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; } - Queue010Handler* getQueue010Handler(){ return &queueImpl; } - Execution010Handler* getExecution010Handler(){ return &executionImpl; } - Tx010Handler* getTx010Handler(){ return &txImpl; } - Dtx010Handler* getDtx010Handler(){ return &dtxImpl; } - - BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); } - ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); } - BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented"); } - QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented"); } - TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented"); } - MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented"); } - DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented"); } - DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented"); } - AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented"); } - FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); } - StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); } - TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); } - ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); } + MessageHandler* getMessageHandler(){ return &messageImpl; } + ExchangeHandler* getExchangeHandler(){ return &exchangeImpl; } + QueueHandler* getQueueHandler(){ return &queueImpl; } + ExecutionHandler* getExecutionHandler(){ return &executionImpl; } + TxHandler* getTxHandler(){ return &txImpl; } + DtxHandler* getDtxHandler(){ return &dtxImpl; } + ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); } SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); } - Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); } - Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); } - - void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); } + FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); } + StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); } private: //common base for utility methods etc that are specific to this adapter @@ -95,7 +79,7 @@ class Queue; class ExchangeHandlerImpl : - public Exchange010Handler, + public ExchangeHandler, public HandlerHelper { public: @@ -124,7 +108,7 @@ class Queue; shared_ptr<Exchange> alternate); }; - class QueueHandlerImpl : public Queue010Handler, + class QueueHandlerImpl : public QueueHandler, public HandlerHelper, public OwnershipToken { Broker& broker; @@ -149,7 +133,7 @@ class Queue; }; class MessageHandlerImpl : - public Message010Handler, + public MessageHandler, public HandlerHelper { typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; @@ -196,18 +180,21 @@ class Queue; void flush(const string& destination); void stop(const string& destination); + + framing::MessageResumeResult resume(const std::string& destination, + const std::string& resumeId); }; - class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper + class ExecutionHandlerImpl : public ExecutionHandler, public HandlerHelper { public: ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void sync(); - void result(uint32_t commandId, const string& value); + void result(const framing::SequenceNumber& commandId, const string& value); void exception(uint16_t errorCode, - uint32_t commandId, + const framing::SequenceNumber& commandId, uint8_t classCode, uint8_t commandCode, uint8_t fieldIndex, @@ -216,7 +203,7 @@ class Queue; }; - class TxHandlerImpl : public Tx010Handler, public HandlerHelper + class TxHandlerImpl : public TxHandler, public HandlerHelper { public: TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} @@ -226,7 +213,7 @@ class Queue; void rollback(); }; - class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper + class DtxHandlerImpl : public DtxHandler, public HandlerHelper, private framing::StructHelper { std::string convert(const framing::Xid& xid); @@ -235,26 +222,26 @@ class Queue; void select(); - framing::DtxStartResult start(const framing::Xid& xid, + framing::XaResult start(const framing::Xid& xid, bool join, bool resume); - framing::DtxEndResult end(const framing::Xid& xid, + framing::XaResult end(const framing::Xid& xid, bool fail, bool suspend); - framing::DtxCommitResult commit(const framing::Xid& xid, + framing::XaResult commit(const framing::Xid& xid, bool onePhase); void forget(const framing::Xid& xid); framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid); - framing::DtxPrepareResult prepare(const framing::Xid& xid); + framing::XaResult prepare(const framing::Xid& xid); framing::DtxRecoverResult recover(); - framing::DtxRollbackResult rollback(const framing::Xid& xid); + framing::XaResult rollback(const framing::Xid& xid); void setTimeout(const framing::Xid& xid, uint32_t timeout); }; diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index d5caf789c0..f5fa22060f 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -42,7 +42,8 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch) connection(c), channel(ch, &c.getOutput()), proxy(out), // Via my own handleOut() for L2 data. peerSession(channel), // Direct to channel for L2 commands. - ignoring(false) {} + ignoring(false) +{} SessionHandler::~SessionHandler() {} @@ -52,33 +53,30 @@ MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace void SessionHandler::handleIn(AMQFrame& f) { - // Note on channel states: a channel is open if session != 0. A - // channel that is closed (session == 0) can be in the "ignoring" - // state. This is a temporary state after we have sent a channel - // exception, where extra frames might arrive that should be - // ignored. - // + // Note on channel states: a channel is attached if session != 0 AMQMethodBody* m = f.getBody()->getMethod(); try { - if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { + if (ignoring && !(m && m->isA<SessionDetachedBody>())) { + return; + } + if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) { + //frame was a valid session control and has been handled return; } else if (session.get()) { + //we are attached and frame was not a session control so it is for upper layers session->handle(f); - } else if (!ignoring) { - throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached")); + } else { + throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached")); } + }catch(const ChannelException& e){ + QPID_LOG(error, "Session detached due to: " << e.what()); + peerSession.detached(name, e.code); + handleDetach(); + connection.closeChannel(channel.get()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); - }catch(const SessionException& e){ - //execution.exception will have been sent already - ignoring = true; - //peerSession.requestTimeout(0); - session->setTimeout(0); - peerSession.detach(name); - localSuspend(); }catch(const std::exception& e){ - connection.close( - framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m)); + connection.close(501, e.what(), classId(m), methodId(m)); } } @@ -95,7 +93,7 @@ void SessionHandler::handleOut(AMQFrame& f) { void SessionHandler::assertAttached(const char* method) const { if (!session.get()) { std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl; - throw ChannelErrorException( + throw NotAttachedException( QPID_MSG(method << " failed: No session for channel " << getChannel())); } @@ -103,33 +101,23 @@ void SessionHandler::assertAttached(const char* method) const { void SessionHandler::assertClosed(const char* method) const { if (session.get()) - throw ChannelBusyException( + throw SessionBusyException( QPID_MSG(method << " failed: channel " << channel.get() << " is already open.")); } -void SessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getSessionManager().suspend(session); - session.reset(); - } -} - - ConnectionState& SessionHandler::getConnection() { return connection; } const ConnectionState& SessionHandler::getConnection() const { return connection; } //new methods: void SessionHandler::attach(const std::string& _name, bool /*force*/) { - //TODO: need to revise session manager to support resume as well - assertClosed("attach"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, 0)); name = _name;//TODO: this should be used in conjunction with //userid for connection as sessions identity - session.reset(state.release()); + + //TODO: need to revise session manager to support resume as well + assertClosed("attach"); + session.reset(new SessionState(0, this, 0, 0)); peerSession.attached(name); peerSession.commandPoint(session->nextOut, 0); } @@ -138,31 +126,46 @@ void SessionHandler::attached(const std::string& _name) { name = _name;//TODO: this should be used in conjunction with //userid for connection as sessions identity - std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); - session.reset(state.release()); + session.reset(new SessionState(0, this, 0, 0)); peerSession.commandPoint(session->nextOut, 0); } void SessionHandler::detach(const std::string& name) { assertAttached("detach"); - localSuspend(); - peerSession.detached(name, 0); + peerSession.detached(name, session::NORMAL); + handleDetach(); assert(&connection.getChannel(channel.get()) == this); connection.closeChannel(channel.get()); } void SessionHandler::detached(const std::string& name, uint8_t code) { - ignoring=false; - session->detach(); - session.reset(); + ignoring = false; + handleDetach(); if (code) { //no error } else { //error occured QPID_LOG(warning, "Received session.closed: "<< name << " " << code); } + connection.closeChannel(channel.get()); +} + +void SessionHandler::handleDetach() +{ + if (session.get()) { + session->detach(); + session.reset(); + } +} + +void SessionHandler::requestDetach() +{ + //TODO: request timeout when python can handle it + //peerSession.requestTimeout(0); + ignoring = true; + peerSession.detach(name); } void SessionHandler::requestTimeout(uint32_t t) diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index c299c465cf..47c534441a 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -46,7 +46,7 @@ class SessionState; * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ -class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, +class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, public framing::FrameHandler::InOutHandler, private boost::noncopyable { @@ -66,9 +66,8 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } - // Called by closing connection. - void localSuspend(); - void detach() { localSuspend(); } + void requestDetach(); + void handleDetach(); void sendCompletion(); protected: @@ -93,9 +92,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, void flush(bool expected, bool confirmed, bool completed); void gap(const framing::SequenceSet& commands); - //hacks for old generator: - void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } - void assertAttached(const char* method) const; void assertActive(const char* method) const; void assertClosed(const char* method) const; @@ -105,7 +101,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session010 peerSession; + framing::AMQP_ClientProxy::Session peerSession; bool ignoring; std::auto_ptr<SessionState> session; std::string name;//TODO: this should be part of the session state and replace the id diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index b96d7b5e3f..50938de8ac 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -22,7 +22,6 @@ #include "Broker.h" #include "ConnectionState.h" #include "MessageDelivery.h" -#include "SemanticHandler.h" #include "SessionManager.h" #include "SessionHandler.h" #include "qpid/framing/AMQContentBody.h" @@ -30,6 +29,7 @@ #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> @@ -50,6 +50,7 @@ SessionState::SessionState( factory(f), handler(h), id(true), timeout(timeout_), broker(h->getConnection().broker), version(h->getConnection().getVersion()), + ignoring(false), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), @@ -154,7 +155,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, case management::Session::METHOD_DETACH : if (handler != 0) { - handler->detach(); + handler->requestDetach(); } status = Manageable::STATUS_OK; break; @@ -188,7 +189,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { nextOut++;//execution result is now a command, so the counter must be incremented - getProxy().getExecution010().result(id, invocation.getResult()); + getProxy().getExecution().result(id, invocation.getResult()); } if (method->isSync()) { incomplete.process(enqueuedOp, true); @@ -242,12 +243,13 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg) completed.add(msg->getCommandId()); if (msg->requiresAccept()) { nextOut++;//accept is a command, so the counter must be incremented - getProxy().getMessage010().accept(SequenceSet(msg->getCommandId())); + getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } } void SessionState::handle(AMQFrame& frame) { + if (ignoring) return; received(frame); SequenceNumber commandId; @@ -271,11 +273,13 @@ void SessionState::handle(AMQFrame& frame) AMQMethodBody* m = frame.getMethod(); if (m) { - getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); + getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable()); } else { - getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); + getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - throw e; + timeout = 0; + ignoring = true; + handler->requestDetach(); } } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 4fc2ae4cc5..2ec68260a1 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -130,6 +130,7 @@ class SessionState : public framing::SessionState, Broker& broker; framing::ProtocolVersion version; sys::Mutex lock; + bool ignoring; SemanticState semanticState; SessionAdapter adapter; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index f32b5e2614..3bcba8983c 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -32,6 +32,7 @@ #include <boost/format.hpp> #include <boost/bind.hpp> #include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/reply_exceptions.h" using namespace std; using namespace boost; @@ -75,7 +76,7 @@ void Channel::open(const Session& s) { Mutex::ScopedLock l(stopLock); if (isOpen()) - throw ChannelBusyException(); + throw SessionBusyException(); active = true; session = s; if(isTransactional()) { @@ -146,7 +147,7 @@ void Channel::consume( Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i != consumers.end()) - throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag )); + throw PreconditionFailedException(QPID_MSG("Consumer already exists with tag " << tag )); Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 83cc357ded..df27942008 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -25,6 +25,7 @@ #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ClientInvoker.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::client; using namespace qpid::framing; diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 2ce36d6991..d7ab97ce31 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -55,9 +55,9 @@ class ConnectionHandler : private StateManager, public ConnectionProperties, public ChainableFrameHandler, public framing::InputHandler, - private framing::AMQP_ClientOperations::Connection010Handler + private framing::AMQP_ClientOperations::ConnectionHandler { - typedef framing::AMQP_ClientOperations::Connection010Handler ConnectionOperations; + typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations; enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; std::set<int> ESTABLISHED; @@ -70,7 +70,7 @@ class ConnectionHandler : private StateManager, }; Adapter outHandler; - framing::AMQP_ServerProxy::Connection010 proxy; + framing::AMQP_ServerProxy::Connection proxy; uint16_t errorCode; std::string errorText; diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index d1fd66ff26..ce95e43f58 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -32,6 +32,7 @@ using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; +using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c), isClosed(false), isClosing(false) @@ -39,7 +40,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); handler.out = boost::bind(&Connector::send, connector, _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, - REPLY_SUCCESS, std::string()); + NORMAL, std::string()); handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); connector->setInputHandler(&handler); connector->setTimeoutHandler(this); @@ -57,7 +58,7 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) { Mutex::ScopedLock l(lock); boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; - if (s.lock()) throw ChannelBusyException(); + if (s.lock()) throw SessionBusyException(); s = session; } @@ -74,7 +75,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s = sessions[frame.getChannel()].lock(); } if (!s) - throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel())); + throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel())); s->in(frame); } @@ -113,7 +114,7 @@ void ConnectionImpl::close() Mutex::ScopedUnlock u(lock); handler.close(); } - closed(REPLY_SUCCESS, "Closed by client"); + closed(NORMAL, "Closed by client"); } // Set closed flags and erase the sessions map, but keep the contents @@ -149,7 +150,7 @@ void ConnectionImpl::shutdown() handler.fail(CONN_CLOSED); Mutex::ScopedUnlock u(lock); std::for_each(save.begin(), save.end(), - boost::bind(&SessionImpl::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED)); + boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h index 5d91f289e2..fc4175ef22 100644 --- a/cpp/src/qpid/client/Session.h +++ b/cpp/src/qpid/client/Session.h @@ -21,7 +21,7 @@ * under the License. * */ -#include "qpid/client/Session_99_0.h" +#include "qpid/client/Session_0_10.h" namespace qpid { namespace client { @@ -31,7 +31,7 @@ namespace client { * * \ingroup clientapi */ -typedef Session_99_0 Session; +typedef Session_0_10 Session; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index 4f3869319c..571d54df0c 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -30,16 +30,18 @@ #include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" #include "qpid/framing/SequenceSet.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> -namespace { const std::string OK="ok"; } +namespace { const std::string EMPTY; } namespace qpid { namespace client { using namespace qpid::framing; +using namespace qpid::framing::session;//for detach codes typedef sys::Monitor::ScopedLock Lock; typedef sys::Monitor::ScopedUnlock UnLock; @@ -47,8 +49,9 @@ typedef sys::Monitor::ScopedUnlock UnLock; SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t _maxFrameSize) - : code(REPLY_SUCCESS), - text(OK), + : error(OK), + code(NORMAL), + text(EMPTY), state(INACTIVE), syncMode(false), detachedLifetime(0), @@ -250,6 +253,7 @@ void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool void SessionImpl::connectionClosed(uint16_t _code, const std::string& _text) { Lock l(state); + error = CONNECTION_CLOSE; code = _code; text = _text; setState(DETACHED); @@ -379,6 +383,7 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread //TODO: proper 0-10 exception handling QPID_LOG(error, "Session exception:" << e.what()); Lock l(state); + error = EXCEPTION; code = e.code; text = e.what(); } @@ -443,6 +448,7 @@ void SessionImpl::detached(const std::string& _name, uint8_t _code) //TODO: make sure this works with execution.exception - don't //want to overwrite the code from that QPID_LOG(error, "Session detached by peer: " << name << " " << code); + error = SESSION_DETACH; code = _code; text = "Session detached by peer"; } @@ -545,14 +551,14 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/) void SessionImpl::sync() {} -void SessionImpl::result(uint32_t commandId, const std::string& value) +void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value) { Lock l(state); results.received(commandId, value); } void SessionImpl::exception(uint16_t errorCode, - uint32_t commandId, + const framing::SequenceNumber& commandId, uint8_t classCode, uint8_t commandCode, uint8_t /*fieldIndex*/, @@ -563,6 +569,7 @@ void SessionImpl::exception(uint16_t errorCode, << " [caused by " << commandId << " " << classCode << ":" << commandCode << "]"); Lock l(state); + error = EXCEPTION; code = errorCode; text = description; if (detachedLifetime) { @@ -589,8 +596,11 @@ inline void SessionImpl::waitFor(State s) //call with lock held void SessionImpl::check() const //call with lock held. { - if (code != REPLY_SUCCESS) { - throwReplyException(code, text); + switch (error) { + case OK: break; + case CONNECTION_CLOSE: throw ConnectionException(code, text); + case SESSION_DETACH: throw ChannelException(code, text); + case EXCEPTION: throwExecutionException(code, text); } } @@ -598,7 +608,7 @@ void SessionImpl::checkOpen() const //call with lock held. { check(); if (state != ATTACHED) { - throwReplyException(0, "Session isn't attached"); + throw NotAttachedException("Session isn't attached"); } } diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h index 86820dbb92..3b2e80fefd 100644 --- a/cpp/src/qpid/client/SessionImpl.h +++ b/cpp/src/qpid/client/SessionImpl.h @@ -54,8 +54,8 @@ class ConnectionImpl; class SessionImpl : public framing::FrameHandler::InOutHandler, public Execution, - private framing::AMQP_ClientOperations::Session010Handler, - private framing::AMQP_ClientOperations::Execution010Handler + private framing::AMQP_ClientOperations::SessionHandler, + private framing::AMQP_ClientOperations::ExecutionHandler { public: SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); @@ -95,6 +95,12 @@ public: void connectionBroke(uint16_t code, const std::string& text); private: + enum ErrorType { + OK, + CONNECTION_CLOSE, + SESSION_DETACH, + EXCEPTION + }; enum State { INACTIVE, ATTACHING, @@ -102,8 +108,8 @@ private: DETACHING, DETACHED }; - typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler; - typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler; + typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; + typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler; typedef sys::StateMonitor<State, DETACHED> StateMonitor; typedef StateMonitor::Set States; @@ -145,19 +151,16 @@ private: // Note: Following methods are called by network thread in // response to execution commands from the broker void sync(); - void result(uint32_t commandId, const std::string& value); + void result(const framing::SequenceNumber& commandId, const std::string& value); void exception(uint16_t errorCode, - uint32_t commandId, + const framing::SequenceNumber& commandId, uint8_t classCode, uint8_t commandCode, uint8_t fieldIndex, const std::string& description, const framing::FieldTable& errorInfo); - - //hack for old generator: - void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } - + ErrorType error; int code; // Error code std::string text; // Error text mutable StateMonitor state; @@ -170,7 +173,7 @@ private: shared_ptr<ConnectionImpl> connection; framing::ChannelHandler channel; - framing::AMQP_ServerProxy::Session010 proxy; + framing::AMQP_ServerProxy::Session proxy; Results results; Demux demux; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 5152aa2e43..bca6c49c13 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -17,7 +17,7 @@ */ #include "Cluster.h" -#include "qpid/broker/PreviewSessionState.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,18 +32,18 @@ namespace cluster { using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::PreviewSessionState; +using broker::SessionState; namespace { // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - PreviewSessionState& session; + SessionState& session; Cluster& cluster; bool busy; Monitor lock; - ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} + ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} void handle(AMQFrame& f) { Mutex::ScopedLock l(lock); @@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) { c.next = h; } -struct SessionObserver : public broker::PreviewSessionManager::Observer { +struct SessionObserver : public broker::SessionManager::Observer { Cluster& cluster; SessionObserver(Cluster& c) : cluster(c) {} - void opened(PreviewSessionState& s) { + void opened(SessionState& s) { // FIXME aconway 2008-01-29: IList for memory management. ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 1b0c1b1689..6cc8dd7f78 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -63,7 +63,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler virtual ~Cluster(); // FIXME aconway 2008-01-29: - boost::intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; } + boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -117,7 +117,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler MemberMap members; sys::Thread dispatcher; boost::function<void()> callback; - boost::intrusive_ptr<broker::PreviewSessionManager::Observer> observer; + boost::intrusive_ptr<broker::SessionManager::Observer> observer; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index eeb658600d..3ebb61feb5 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -73,7 +73,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t flags = buffer.getOctet(); uint8_t framing_version = (flags & 0xc0) >> 6; if (framing_version != 0) - throw SyntaxErrorException(QPID_MSG("Framing version unsupported")); + throw FramingErrorException(QPID_MSG("Framing version unsupported")); bof = flags & 0x08; eof = flags & 0x04; bos = flags & 0x02; @@ -81,7 +81,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t type = buffer.getOctet(); uint16_t frame_size = buffer.getShort(); if (frame_size < frameOverhead()-1) - throw SyntaxErrorException(QPID_MSG("Frame size too small")); + throw FramingErrorException(QPID_MSG("Frame size too small")); uint8_t reserved1 = buffer.getOctet(); uint8_t field1 = buffer.getOctet(); subchannel = field1 & 0x0f; @@ -92,7 +92,7 @@ bool AMQFrame::decode(Buffer& buffer) // TODO: should we check reserved2 against zero as well? - the // spec isn't clear if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) - throw SyntaxErrorException(QPID_MSG("Reserved bits not zero")); + throw FramingErrorException(QPID_MSG("Reserved bits not zero")); // TODO: should no longer care about body size and only pass up // B,E,b,e flags @@ -105,7 +105,7 @@ bool AMQFrame::decode(Buffer& buffer) body->decode(type,buffer, body_size); uint8_t end = buffer.getOctet(); if (end != 0xCE) - throw SyntaxErrorException(QPID_MSG("Frame end not found")); + throw FramingErrorException(QPID_MSG("Frame end not found")); return true; } diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h index 2064468785..c69a768291 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/cpp/src/qpid/framing/AMQHeaderBody.h @@ -26,8 +26,6 @@ #include "Buffer.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/MessageProperties.h" -#include "qpid/framing/PreviewDeliveryProperties.h" -#include "qpid/framing/PreviewMessageProperties.h" #include <iostream> #include <boost/optional.hpp> @@ -77,10 +75,7 @@ class AMQHeaderBody : public AMQBody }; // Could use boost::mpl::fold to construct a larger set. - typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>, - MessageProperties>, - PreviewDeliveryProperties>, - PreviewMessageProperties> Properties; + typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties; Properties properties; diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp index 71281c7a52..f0b6331ff3 100644 --- a/cpp/src/qpid/framing/Array.cpp +++ b/cpp/src/qpid/framing/Array.cpp @@ -77,7 +77,7 @@ void Array::decode(Buffer& buffer){ uint32_t size = buffer.getLong();//size added only when array is a top-level type uint32_t available = buffer.available(); if (available < size) { - throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected " + throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " << size << " bytes but only " << available << " available")); } if (size) { @@ -88,7 +88,7 @@ void Array::decode(Buffer& buffer){ dummy.setType(typeOctet); available = buffer.available(); if (available < count * dummy.getData().size()) { - throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected " + throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " << count << " items of " << dummy.getData().size() << " bytes each but only " << available << " bytes available")); } @@ -117,7 +117,7 @@ bool Array::operator==(const Array& x) const { void Array::add(ValuePtr value) { if (typeOctet != value->getType()) { - throw SyntaxErrorException(QPID_MSG("Wrong type of value, expected " << typeOctet)); + throw IllegalArgumentException(QPID_MSG("Wrong type of value, expected " << typeOctet)); } values.push_back(value); } diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp index fb84be7cd6..ffbcf33a95 100644 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ b/cpp/src/qpid/framing/BodyHandler.cpp @@ -48,7 +48,7 @@ void BodyHandler::handleBody(AMQBody* body) { handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); break; default: - throw SyntaxErrorException( + throw FramingErrorException( QPID_MSG("Invalid frame type " << body->type())); } } diff --git a/cpp/src/qpid/framing/BodyHolder.cpp b/cpp/src/qpid/framing/BodyHolder.cpp index de971b5b28..1b2f74de2c 100644 --- a/cpp/src/qpid/framing/BodyHolder.cpp +++ b/cpp/src/qpid/framing/BodyHolder.cpp @@ -59,7 +59,7 @@ void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) { case CONTENT_BODY: *this=in_place<AMQContentBody>(); break; case HEARTBEAT_BODY: *this=in_place<AMQHeartbeatBody>(); break; default: - throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type)); + throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type)); } get()->decode(buffer, size); } diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp index 69168d462a..19c94ffd58 100644 --- a/cpp/src/qpid/framing/Buffer.cpp +++ b/cpp/src/qpid/framing/Buffer.cpp @@ -19,7 +19,6 @@ * */ #include "Buffer.h" -#include "FramingContent.h" #include "FieldTable.h" #include <string.h> #include <boost/format.hpp> diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index ac2a0f286d..903c7ed100 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -133,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){ uint32_t len = buffer.getLong(); uint32_t available = buffer.available(); if (available < len) - throw SyntaxErrorException(QPID_MSG("Not enough data for field table.")); + throw IllegalArgumentException(QPID_MSG("Not enough data for field table.")); uint32_t leftover = available - len; while(buffer.available() > leftover){ std::string name; diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp index 8171a94ef2..681f20a793 100644 --- a/cpp/src/qpid/framing/FieldValue.cpp +++ b/cpp/src/qpid/framing/FieldValue.cpp @@ -79,7 +79,7 @@ void FieldValue::setType(uint8_t type) data.reset(new FixedWidthValue<0>()); break; default: - throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); + throw IllegalArgumentException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); } } diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp deleted file mode 100644 index cd134b0e89..0000000000 --- a/cpp/src/qpid/framing/FramingContent.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "Buffer.h" -#include "FramingContent.h" -#include "qpid/Exception.h" -#include "qpid/framing/reply_exceptions.h" - -namespace qpid { -namespace framing { - -Content::Content() : discriminator(0) {} - -Content::Content(uint8_t _discriminator, const string& _value): discriminator(_discriminator), value(_value) { - validate(); -} - -void Content::validate() { - if (discriminator == REFERENCE) { - if(value.empty()) { - throw InvalidArgumentException( - QPID_MSG("Reference cannot be empty")); - } - }else if (discriminator != INLINE) { - throw SyntaxErrorException( - QPID_MSG("Invalid discriminator: " << discriminator)); - } -} - -Content::~Content() {} - -void Content::encode(Buffer& buffer) const { - buffer.putOctet(discriminator); - buffer.putLongString(value); -} - -void Content::decode(Buffer& buffer) { - discriminator = buffer.getOctet(); - buffer.getLongString(value); - validate(); -} - -size_t Content::size() const { - return 1/*discriminator*/ + 4/*for recording size of long string*/ + value.size(); -} - -std::ostream& operator<<(std::ostream& out, const Content& content) { - if (content.discriminator == REFERENCE) { - out << "{REF:" << content.value << "}"; - } else if (content.discriminator == INLINE) { - out << "{INLINE:" << content.value.size() << " bytes}"; - } - return out; -} - -}} // namespace framing::qpid diff --git a/cpp/src/qpid/framing/FramingContent.h b/cpp/src/qpid/framing/FramingContent.h deleted file mode 100644 index 36f5d641b2..0000000000 --- a/cpp/src/qpid/framing/FramingContent.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _framing_FramingContent_h -#define _framing_FramingContent_h - -#include <ostream> - -namespace qpid { -namespace framing { - -class Buffer; - -enum discriminator_types { INLINE = 0, REFERENCE = 1 }; - -/** - * A representation of the AMQP Content data type (used for message - * bodies) which can hold inline data or a reference. - */ -class Content -{ - uint8_t discriminator; - string value; - - void validate(); - - public: - Content(); - Content(uint8_t _discriminator, const string& _value); - ~Content(); - - void encode(Buffer& buffer) const; - void decode(Buffer& buffer); - size_t size() const; - bool isInline() const { return discriminator == INLINE; } - bool isReference() const { return discriminator == REFERENCE; } - const string& getValue() const { return value; } - void setValue(const string& newValue) { value = newValue; } - - friend std::ostream& operator<<(std::ostream&, const Content&); -}; - -}} // namespace qpid::framing - - -#endif /*!_framing_FramingContent_h*/ diff --git a/cpp/src/qpid/framing/ModelMethod.h b/cpp/src/qpid/framing/ModelMethod.h index 07600aadca..8e4361e761 100644 --- a/cpp/src/qpid/framing/ModelMethod.h +++ b/cpp/src/qpid/framing/ModelMethod.h @@ -22,7 +22,7 @@ * */ #include "AMQMethodBody.h" -#include "qpid/framing/ExecutionHeader.h" +#include "qpid/framing/Header.h" namespace qpid { namespace framing { @@ -30,7 +30,7 @@ namespace framing { class ModelMethod : public AMQMethodBody { - mutable ExecutionHeader header; + mutable Header header; public: virtual ~ModelMethod() {} virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); } @@ -38,8 +38,8 @@ public: virtual uint32_t headerSize() const { return header.size(); } virtual bool isSync() const { return header.getSync(); } virtual void setSync(bool on) const { header.setSync(on); } - ExecutionHeader& getHeader() { return header; } - const ExecutionHeader& getHeader() const { return header; } + Header& getHeader() { return header; } + const Header& getHeader() const { return header; } }; diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp index 1b62d296c6..cba00c860a 100644 --- a/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/cpp/src/qpid/framing/SequenceNumber.cpp @@ -20,8 +20,10 @@ */ #include "SequenceNumber.h" +#include "Buffer.h" using qpid::framing::SequenceNumber; +using qpid::framing::Buffer; SequenceNumber::SequenceNumber() : value(0 - 1) {} @@ -77,6 +79,20 @@ bool SequenceNumber::operator>=(const SequenceNumber& other) const return *this == other || *this > other; } +void SequenceNumber::encode(Buffer& buffer) const +{ + buffer.putLong(value); +} + +void SequenceNumber::decode(Buffer& buffer) +{ + value = buffer.getLong(); +} + +uint32_t SequenceNumber::size() const { + return 4; +} + namespace qpid { namespace framing { diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index 0ed591b804..d659bec5c1 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -26,6 +26,8 @@ namespace qpid { namespace framing { +class Buffer; + /** * 4-byte sequence number that 'wraps around'. */ @@ -51,6 +53,10 @@ class SequenceNumber friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + uint32_t size() const; + template <class S> void serialize(S& s) { s(value); } }; diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp index 2683b0025d..cdf890b7f8 100644 --- a/cpp/src/qpid/framing/SequenceSet.cpp +++ b/cpp/src/qpid/framing/SequenceSet.cpp @@ -49,7 +49,7 @@ void SequenceSet::decode(Buffer& buffer) uint16_t size = buffer.getShort(); uint16_t count = size / RANGE_SIZE;//number of ranges if (size % RANGE_SIZE) - throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size)); + throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size)); for (uint16_t i = 0; i < count; i++) { add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index 2918c48ce3..b58d9fce96 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const { void Uuid::decode(Buffer& buf) { if (buf.available() < size()) - throw SyntaxErrorException(QPID_MSG("Not enough data for UUID.")); + throw IllegalArgumentException(QPID_MSG("Not enough data for UUID.")); buf.getRawData(c_array(), size()); } diff --git a/cpp/src/qpid/framing/amqp_types_full.h b/cpp/src/qpid/framing/amqp_types_full.h index 1ab8777896..d0aaf28cb4 100644 --- a/cpp/src/qpid/framing/amqp_types_full.h +++ b/cpp/src/qpid/framing/amqp_types_full.h @@ -31,9 +31,7 @@ #include "amqp_types.h" #include "Array.h" -#include "FramingContent.h" #include "FieldTable.h" -#include "SequenceNumberSet.h" #include "SequenceSet.h" #include "Uuid.h" diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 94e2c025d6..0b69f76a76 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -28,6 +28,7 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/TopicExchange.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid_test_plugin.h" #include <iostream> #include "MessageUtils.h" @@ -166,7 +167,7 @@ class ExchangeTest : public CppUnit::TestCase exchanges.destroy("my-exchange"); try { exchanges.get("my-exchange"); - } catch (const ChannelException&) {} + } catch (const NotFoundException&) {} std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, FieldTable()); CPPUNIT_ASSERT_EQUAL(string("direct"), response.first->getType()); diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp index 7c68973d4d..f75269c959 100644 --- a/cpp/src/tests/exception_test.cpp +++ b/cpp/src/tests/exception_test.cpp @@ -91,7 +91,7 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) { Thread t(fix.subs); fix.connection.proxy.close(); t.join(); - BOOST_CHECK_THROW(fix.session.close(), InternalErrorException); + BOOST_CHECK_THROW(fix.session.close(), ConnectionException); } QPID_AUTO_TEST_CASE(NoSuchQueueTest) { diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp index 1d77408eff..51dd2cb924 100644 --- a/cpp/src/tests/interop_runner.cpp +++ b/cpp/src/tests/interop_runner.cpp @@ -158,7 +158,7 @@ void Listener::sendResponse(Message& response, Message& request) void Listener::sendResponse(Message& response, ReplyTo replyTo) { - string exchange = replyTo.getExchangeName(); + string exchange = replyTo.getExchange(); string routingKey = replyTo.getRoutingKey(); channel.publish(response, exchange, routingKey); } diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests index ce6b1f3810..e4b70f5ff5 100755 --- a/cpp/src/tests/python_tests +++ b/cpp/src/tests/python_tests @@ -14,7 +14,6 @@ run() { if test -d ${PYTHON_DIR} ; then cd ${PYTHON_DIR} run 0-10-errata cpp_failing_0-10.txt - if test -z "$QPID_NO_PREVIEW" ; then run ../specs/amqp.0-10-preview.xml cpp_failing_0-10_preview.txt; fi else echo Warning: python tests not found. fi |