summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-24 21:07:34 +0000
committerGordon Sim <gsim@apache.org>2008-04-24 21:07:34 +0000
commit1c86294add5cbb640aac7f458c4de693de48dd9f (patch)
tree22d7774e00c7514c4b76be5cf8b50727b74fe4e3 /cpp/src
parent96f12949244b5af2b717156823309b66fe7bfb84 (diff)
downloadqpid-python-1c86294add5cbb640aac7f458c4de693de48dd9f.tar.gz
Generate c++ code from final 0-10 spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651423 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am30
-rw-r--r--cpp/src/qpid/Exception.h24
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp22
-rw-r--r--cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--cpp/src/qpid/broker/Broker.h3
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp353
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h208
-rw-r--r--cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp3
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h8
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp171
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h67
-rw-r--r--cpp/src/qpid/broker/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp24
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h8
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp71
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.h7
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp208
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.h110
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp325
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.h113
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.cpp91
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.h55
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.cpp315
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.h107
-rw-r--r--cpp/src/qpid/broker/PreviewSessionHandler.cpp210
-rw-r--r--cpp/src/qpid/broker/PreviewSessionHandler.h111
-rw-r--r--cpp/src/qpid/broker/PreviewSessionManager.cpp113
-rw-r--r--cpp/src/qpid/broker/PreviewSessionManager.h101
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.cpp174
-rw-r--r--cpp/src/qpid/broker/PreviewSessionState.h125
-rw-r--r--cpp/src/qpid/broker/Queue.cpp5
-rw-r--r--cpp/src/qpid/broker/QueueBindings.cpp4
-rw-r--r--cpp/src/qpid/broker/SaslAuthenticator.cpp9
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp195
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h102
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp13
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp57
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h63
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp87
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h12
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp18
-rw-r--r--cpp/src/qpid/broker/SessionState.h1
-rw-r--r--cpp/src/qpid/client/Channel.cpp5
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp1
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h6
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp11
-rw-r--r--cpp/src/qpid/client/Session.h4
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp26
-rw-r--r--cpp/src/qpid/client/SessionImpl.h25
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp8
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h7
-rw-r--r--cpp/src/qpid/framing/Array.cpp6
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp2
-rw-r--r--cpp/src/qpid/framing/BodyHolder.cpp2
-rw-r--r--cpp/src/qpid/framing/Buffer.cpp1
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp2
-rw-r--r--cpp/src/qpid/framing/FieldValue.cpp2
-rw-r--r--cpp/src/qpid/framing/FramingContent.cpp73
-rw-r--r--cpp/src/qpid/framing/FramingContent.h63
-rw-r--r--cpp/src/qpid/framing/ModelMethod.h8
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.cpp16
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h6
-rw-r--r--cpp/src/qpid/framing/SequenceSet.cpp2
-rw-r--r--cpp/src/qpid/framing/Uuid.cpp2
-rw-r--r--cpp/src/qpid/framing/amqp_types_full.h2
-rw-r--r--cpp/src/tests/ExchangeTest.cpp3
-rw-r--r--cpp/src/tests/exception_test.cpp2
-rw-r--r--cpp/src/tests/interop_runner.cpp2
-rwxr-xr-xcpp/src/tests/python_tests1
74 files changed, 272 insertions, 3773 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 597e566049..ce36a33933 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -13,10 +13,9 @@ force:
if GENERATE
-# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac
-amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml
+# AMQP_FINAL_XML is defined in ../configure.ac
amqp_0_10_xml=@AMQP_FINAL_XML@
-specs=$(amqp_99_0_xml) $(amqp_0_10_xml)
+specs=$(amqp_0_10_xml)
# Ruby generator.
rgen_dir=$(top_srcdir)/rubygen
@@ -152,7 +151,6 @@ libqpidcommon_la_SOURCES = \
qpid/framing/Buffer.cpp \
qpid/framing/FieldTable.cpp \
qpid/framing/FieldValue.cpp \
- qpid/framing/FramingContent.cpp \
qpid/framing/FrameSet.cpp \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
@@ -193,19 +191,11 @@ libqpidbroker_la_SOURCES = \
qpid/amqp_0_10/Connection.h \
qpid/amqp_0_10/Connection.cpp \
qpid/broker/Broker.cpp \
- qpid/broker/BrokerAdapter.cpp \
- qpid/broker/SessionAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
- qpid/broker/PreviewConnection.cpp \
- qpid/broker/PreviewConnectionCodec.cpp \
- qpid/broker/PreviewConnectionHandler.cpp \
- qpid/broker/PreviewSessionHandler.cpp \
- qpid/broker/PreviewSessionManager.cpp \
- qpid/broker/PreviewSessionState.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -215,7 +205,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/DirectExchange.cpp \
qpid/broker/DtxAck.cpp \
qpid/broker/DtxBuffer.cpp \
- qpid/broker/DtxHandlerImpl.cpp \
qpid/broker/DtxManager.cpp \
qpid/broker/DtxTimeout.cpp \
qpid/broker/DtxWorkRecord.cpp \
@@ -228,7 +217,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageDelivery.cpp \
- qpid/broker/MessageHandlerImpl.cpp \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NullMessageStore.cpp \
@@ -241,6 +229,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SaslAuthenticator.cpp \
qpid/broker/SemanticState.h \
qpid/broker/SemanticState.cpp \
+ qpid/broker/SessionAdapter.cpp \
qpid/broker/SessionState.h \
qpid/broker/SessionState.cpp \
qpid/broker/SessionManager.h \
@@ -248,7 +237,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SessionHandler.h \
qpid/broker/SessionContext.h \
qpid/broker/SessionHandler.cpp \
- qpid/broker/SemanticHandler.cpp \
qpid/broker/System.cpp \
qpid/broker/Timer.cpp \
qpid/broker/TopicExchange.cpp \
@@ -308,18 +296,11 @@ nobase_include_HEADERS = \
qpid/memory.h \
qpid/shared_ptr.h \
qpid/broker/Broker.h \
- qpid/broker/BrokerAdapter.h \
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
- qpid/broker/PreviewConnection.h \
- qpid/broker/PreviewConnectionCodec.h \
- qpid/broker/PreviewConnectionHandler.h \
- qpid/broker/PreviewSessionHandler.h \
- qpid/broker/PreviewSessionManager.h \
- qpid/broker/PreviewSessionState.h \
qpid/broker/Connection.h \
qpid/broker/ConnectionState.h \
qpid/broker/ConnectionFactory.h \
@@ -337,7 +318,6 @@ nobase_include_HEADERS = \
qpid/broker/DirectExchange.h \
qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
- qpid/broker/DtxHandlerImpl.h \
qpid/broker/DtxManager.h \
qpid/broker/DtxTimeout.h \
qpid/broker/DtxWorkRecord.h \
@@ -351,7 +331,6 @@ nobase_include_HEADERS = \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
qpid/broker/MessageDelivery.h \
- qpid/broker/MessageHandlerImpl.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.h \
qpid/broker/NameGenerator.h \
@@ -372,8 +351,8 @@ nobase_include_HEADERS = \
qpid/broker/RecoveredEnqueue.h \
qpid/broker/RecoveryManager.h \
qpid/broker/RecoveryManagerImpl.h \
- qpid/broker/SemanticHandler.h \
qpid/broker/SaslAuthenticator.h \
+ qpid/broker/SessionAdapter.h \
qpid/broker/SessionManager.h \
qpid/broker/System.h \
qpid/broker/Timer.h \
@@ -434,7 +413,6 @@ nobase_include_HEADERS = \
qpid/framing/FrameHandler.h \
qpid/framing/FrameHandler.h \
qpid/framing/FrameSet.h \
- qpid/framing/FramingContent.h \
qpid/framing/Handler.h \
qpid/framing/HeaderProperties.h \
qpid/framing/Invoker.h \
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index 2f934166a7..e74fa79ed9 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -23,6 +23,7 @@
*/
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/constants.h"
#include "qpid/Msg.h"
#include <memory>
@@ -51,29 +52,22 @@ class Exception : public std::exception
mutable std::string whatStr;
};
-/**
- * I have made SessionException a common base for Channel- and
- * Connection- Exceptions. This is not strictly correct but allows all
- * model layer exceptions to be handled as SessionExceptions which is
- * how they are classified in the final 0-10 specification. I.e. this
- * is a temporary hack to allow the preview and final codepaths to
- * co-exist with minimal changes.
- */
-
struct SessionException : public Exception {
const framing::ReplyCode code;
SessionException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
};
-struct ChannelException : public SessionException {
- ChannelException(framing::ReplyCode code, const std::string& message)
- : SessionException(code, message) {}
+struct ChannelException : public Exception {
+ const framing::ReplyCode code;
+ ChannelException(framing::ReplyCode _code, const std::string& message)
+ : Exception(message), code(_code) {}
};
-struct ConnectionException : public SessionException {
- ConnectionException(framing::ReplyCode code, const std::string& message)
- : SessionException(code, message) {}
+struct ConnectionException : public Exception {
+ const framing::ReplyCode code;
+ ConnectionException(framing::ReplyCode _code, const std::string& message)
+ : Exception(message), code(_code) {}
};
struct ClosedException : public Exception {
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 598f428ad2..ea9a41ac9d 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -46,24 +46,24 @@ Bridge::~Bridge()
void Bridge::create()
{
- framing::AMQP_ServerProxy::Session010 session(channel);
+ framing::AMQP_ServerProxy::Session session(channel);
session.attach(name, false);
if (args.i_src_is_local) {
//TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
} else {
if (args.i_src_is_queue) {
- peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable());
- peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getMessage().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
string queue = "bridge_queue_";
queue += Uuid(true).str();
- peer.getQueue010().declare(queue, "", false, false, true, true, FieldTable());
- peer.getExchange010().bind(queue, args.i_src, args.i_key, FieldTable());
- peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
- peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
+ peer.getQueue().declare(queue, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queue, args.i_src, args.i_key, FieldTable());
+ peer.getMessage().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
}
}
@@ -71,8 +71,8 @@ void Bridge::create()
void Bridge::cancel()
{
- peer.getMessage010().cancel(args.i_dest);
- peer.getSession010().detach(name);
+ peer.getMessage().cancel(args.i_dest);
+ peer.getSession().detach(name);
}
management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 6cbd9bf343..d9cf93f766 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -131,8 +131,7 @@ Broker::Broker(const Broker::Options& conf) :
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
factory(*this),
- sessionManager(conf.ack),
- previewSessionManager(conf.ack)
+ sessionManager(conf.ack)
{
if(conf.enableMgmt){
QPID_LOG(info, "Management enabled");
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index fa66061fd0..7297241763 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -30,7 +30,6 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
-#include "PreviewSessionManager.h"
#include "Vhost.h"
#include "System.h"
#include "qpid/management/Manageable.h"
@@ -117,7 +116,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
- PreviewSessionManager& getPreviewSessionManager() { return previewSessionManager; }
management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
@@ -148,7 +146,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConnectionFactory factory;
DtxManager dtxManager;
SessionManager sessionManager;
- PreviewSessionManager previewSessionManager;
management::ManagementAgent::shared_ptr managementAgent;
management::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
deleted file mode 100644
index b83a275959..0000000000
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "BrokerAdapter.h"
-#include "Connection.h"
-#include "DeliveryToken.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/Exception.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace broker {
-
-using namespace qpid;
-using namespace qpid::framing;
-
-typedef std::vector<Queue::shared_ptr> QueueVector;
-
-// TODO aconway 2007-08-31: now that functionality is distributed
-// between different handlers, BrokerAdapter should be dropped.
-// Instead the individual class Handler interfaces can be implemented
-// by the handlers responsible for those classes.
-//
-
-BrokerAdapter::BrokerAdapter(SemanticState& s) :
- HandlerImpl(s),
- basicHandler(s),
- exchangeHandler(s),
- bindingHandler(s),
- messageHandler(s),
- queueHandler(s),
- txHandler(s),
- dtxHandler(s)
-{}
-
-
-void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type,
- const string& alternateExchange,
- bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
- Exchange::shared_ptr alternate;
- if (!alternateExchange.empty()) {
- alternate = getBroker().getExchanges().get(alternateExchange);
- }
- if(passive){
- Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
- checkType(actual, type);
- checkAlternate(actual, alternate);
- }else{
- try{
- std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
- if (response.second) {
- if (durable) {
- getBroker().getStore().create(*response.first, args);
- }
- if (alternate) {
- response.first->setAlternate(alternate);
- alternate->incAlternateUsers();
- }
- } else {
- checkType(response.first, type);
- checkAlternate(response.first, alternate);
- }
- }catch(UnknownExchangeTypeException& e){
- throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
- }
- }
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
-{
- if (!type.empty() && exchange->getType() != type) {
- throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
- }
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
-{
- if (alternate && alternate != exchange->getAlternate())
- throw NotAllowedException(
- QPID_MSG("Exchange declared with alternate-exchange "
- << exchange->getAlternate()->getName() << ", requested "
- << alternate->getName()));
-}
-
-void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
- //TODO: implement unused
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
- if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
- if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
- getBroker().getExchanges().destroy(name);
-}
-
-ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
-{
- try {
- Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
- } catch (const ChannelException& e) {
- return ExchangeXQueryResult("", false, true, FieldTable());
- }
-}
-
-BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
- const std::string& exchangeName,
- const std::string& queueName,
- const std::string& key,
- const framing::FieldTable& args)
-{
- Exchange::shared_ptr exchange;
- try {
- exchange = getBroker().getExchanges().get(exchangeName);
- } catch (const ChannelException&) {}
-
- Queue::shared_ptr queue;
- if (!queueName.empty()) {
- queue = getBroker().getQueues().find(queueName);
- }
-
- if (!exchange) {
- return BindingXQueryResult(true, false, false, false, false);
- } else if (!queueName.empty() && !queue) {
- return BindingXQueryResult(false, true, false, false, false);
- } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return BindingXQueryResult(false, false, false, false, false);
- } else {
- //need to test each specified option individually
- bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
- bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
- bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
-
- return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
- }
-}
-
-QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
-{
- Queue::shared_ptr queue = state.getQueue(name);
- Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
-
- return QueueXQueryResult(queue->getName(),
- alternateExchange ? alternateExchange->getName() : "",
- queue->isDurable(),
- queue->hasExclusiveOwner(),
- queue->isAutoDelete(),
- queue->getSettings(),
- queue->getMessageCount(),
- queue->getConsumerCount());
-}
-
-void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange,
- bool passive, bool durable, bool exclusive,
- bool autoDelete, const qpid::framing::FieldTable& arguments){
-
- Exchange::shared_ptr alternate;
- if (!alternateExchange.empty()) {
- alternate = getBroker().getExchanges().get(alternateExchange);
- }
- Queue::shared_ptr queue;
- if (passive && !name.empty()) {
- queue = state.getQueue(name);
- //TODO: check alternate-exchange is as expected
- } else {
- std::pair<Queue::shared_ptr, bool> queue_created =
- getBroker().getQueues().declare(
- name, durable,
- autoDelete,
- exclusive ? &getConnection() : 0);
- queue = queue_created.first;
- assert(queue);
- if (queue_created.second) { // This is a new queue
- if (alternate) {
- queue->setAlternateExchange(alternate);
- alternate->incAlternateUsers();
- }
-
- //apply settings & create persistent record if required
- queue_created.first->create(arguments);
-
- //add default binding:
- getBroker().getExchanges().getDefault()->bind(queue, name, 0);
- queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
-
- //handle automatic cleanup:
- if (exclusive) {
- getConnection().exclusiveQueues.push_back(queue);
- }
- } else {
- if (exclusive && queue->setExclusiveOwner(&getConnection())) {
- getConnection().exclusiveQueues.push_back(queue);
- }
- }
- }
- if (exclusive && !queue->isExclusiveOwner(&getConnection()))
- throw ResourceLockedException(
- QPID_MSG("Cannot grant exclusive access to queue "
- << queue->getName()));
-}
-
-void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName,
- const string& exchangeName, const string& routingKey,
- const FieldTable& arguments){
-
- Queue::shared_ptr queue = state.getQueue(queueName);
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if(exchange){
- string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
- if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
- queue->bound(exchangeName, routingKey, arguments);
- if (exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
- }
- }
- }else{
- throw NotFoundException(
- "Bind failed. No such exchange: " + exchangeName);
- }
-}
-
-void
-BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
- const string& queueName,
- const string& exchangeName,
- const string& routingKey,
- const qpid::framing::FieldTable& arguments )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
- if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
-
- if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
- getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments);
- }
-
-}
-
-void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
- state.getQueue(queue)->purge();
-}
-
-void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
- ChannelException error(0, "");
- Queue::shared_ptr q = state.getQueue(queue);
- if(ifEmpty && q->getMessageCount() > 0){
- throw PreconditionFailedException("Queue not empty.");
- }else if(ifUnused && q->getConsumerCount() > 0){
- throw PreconditionFailedException("Queue in use.");
- }else{
- //remove the queue from the list of exclusive queues if necessary
- if(q->isExclusiveOwner(&getConnection())){
- QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
- if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
- }
- q->destroy();
- getBroker().getQueues().destroy(queue);
- q->unbind(getBroker().getExchanges(), q);
- }
-}
-
-
-
-
-void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
- //TODO: handle global
- state.setPrefetchSize(prefetchSize);
- state.setPrefetchCount(prefetchCount);
-}
-
-void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
- const string& queueName, const string& consumerTag,
- bool noLocal, bool noAck, bool exclusive,
- bool nowait, const FieldTable& fields)
-{
-
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!consumerTag.empty() && state.exists(consumerTag)){
- throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
- }
- string newTag = consumerTag;
- //need to generate name here, so we have it for the adapter (it is
- //also version specific behaviour now)
- if (newTag.empty()) newTag = tagGenerator.generate();
- DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
- state.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
-
- if(!nowait)
- getProxy().getBasic().consumeOk(newTag);
-}
-
-void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
- state.cancel(consumerTag);
-}
-
-void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
- Queue::shared_ptr queue = state.getQueue(queueName);
- DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
- if(!state.get(token, queue, !noAck)){
- string clusterId;//not used, part of an imatix hack
-
- getProxy().getBasic().getEmpty(clusterId);
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
- if (multiple) {
- state.ackCumulative(deliveryTag);
- } else {
- state.ackRange(deliveryTag, deliveryTag);
- }
-}
-
-void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){}
-
-void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
-{
- state.recover(requeue);
-}
-
-void BrokerAdapter::TxHandlerImpl::select()
-{
- state.startTx();
-}
-
-void BrokerAdapter::TxHandlerImpl::commit()
-{
- state.commit(&getBroker().getStore(), true);
-}
-
-void BrokerAdapter::TxHandlerImpl::rollback()
-{
- state.rollback();
- state.recover(true);
-}
-
-}} // namespace qpid::broker
-
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
deleted file mode 100644
index 26dfe802e1..0000000000
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ /dev/null
@@ -1,208 +0,0 @@
-#ifndef _broker_BrokerAdapter_h
-#define _broker_BrokerAdapter_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "DtxHandlerImpl.h"
-#include "MessageHandlerImpl.h"
-
-#include "qpid/Exception.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace broker {
-
-class Channel;
-class Connection;
-class Broker;
-class ConnectionHandler;
-class BasicHandler;
-class ExchangeHandler;
-class QueueHandler;
-class TxHandler;
-class MessageHandler;
-class AccessHandler;
-class FileHandler;
-class StreamHandler;
-class DtxHandler;
-class TunnelHandler;
-class MessageHandlerImpl;
-class Exchange;
-
-/**
- * Per-channel protocol adapter.
- *
- * A container for a collection of AMQP-class adapters that translate
- * AMQP method bodies into calls on the core Broker objects. Each
- * adapter class also provides a client proxy to send methods to the
- * peer.
- *
- */
-class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
-{
- public:
- BrokerAdapter(SemanticState& session);
-
- BasicHandler* getBasicHandler() { return &basicHandler; }
- ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
- BindingHandler* getBindingHandler() { return &bindingHandler; }
- QueueHandler* getQueueHandler() { return &queueHandler; }
- TxHandler* getTxHandler() { return &txHandler; }
- MessageHandler* getMessageHandler() { return &messageHandler; }
- DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
- DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
-
- framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
-
-
- AccessHandler* getAccessHandler() {
- throw framing::NotImplementedException("Access class not implemented"); }
- FileHandler* getFileHandler() {
- throw framing::NotImplementedException("File class not implemented"); }
- StreamHandler* getStreamHandler() {
- throw framing::NotImplementedException("Stream class not implemented"); }
- TunnelHandler* getTunnelHandler() {
- throw framing::NotImplementedException("Tunnel class not implemented"); }
-
- Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Tx010Handler* getTx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Dtx010Handler* getDtx010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
-
- // Handlers no longer implemented in BrokerAdapter:
-#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
- ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
- ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
- SessionHandler* getSessionHandler() { BADHANDLER(); }
- Connection010Handler* getConnection010Handler() { BADHANDLER(); }
- Session010Handler* getSession010Handler() { BADHANDLER(); }
-#undef BADHANDLER
-
- private:
- class ExchangeHandlerImpl :
- public ExchangeHandler,
- public HandlerImpl
- {
- public:
- ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- void declare(uint16_t ticket,
- const std::string& exchange, const std::string& type,
- const std::string& alternateExchange,
- bool passive, bool durable, bool autoDelete,
- const qpid::framing::FieldTable& arguments);
- void delete_(uint16_t ticket,
- const std::string& exchange, bool ifUnused);
- framing::ExchangeXQueryResult query(u_int16_t ticket,
- const std::string& name);
- private:
- void checkType(shared_ptr<Exchange> exchange, const std::string& type);
-
- void checkAlternate(shared_ptr<Exchange> exchange,
- shared_ptr<Exchange> alternate);
- };
-
- class BindingHandlerImpl :
- public BindingHandler,
- public HandlerImpl
- {
- public:
- BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- framing::BindingXQueryResult query(u_int16_t ticket,
- const std::string& exchange,
- const std::string& queue,
- const std::string& routingKey,
- const framing::FieldTable& arguments);
- };
-
- class QueueHandlerImpl :
- public QueueHandler,
- public HandlerImpl
- {
- public:
- QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- void declare(uint16_t ticket, const std::string& queue,
- const std::string& alternateExchange,
- bool passive, bool durable, bool exclusive,
- bool autoDelete,
- const qpid::framing::FieldTable& arguments);
- void bind(uint16_t ticket, const std::string& queue,
- const std::string& exchange, const std::string& routingKey,
- const qpid::framing::FieldTable& arguments);
- void unbind(uint16_t ticket,
- const std::string& queue,
- const std::string& exchange,
- const std::string& routingKey,
- const qpid::framing::FieldTable& arguments );
- framing::QueueXQueryResult query(const std::string& queue);
- void purge(uint16_t ticket, const std::string& queue);
- void delete_(uint16_t ticket, const std::string& queue,
- bool ifUnused, bool ifEmpty);
- };
-
- class BasicHandlerImpl :
- public BasicHandler,
- public HandlerImpl
- {
- NameGenerator tagGenerator;
- public:
- BasicHandlerImpl(SemanticState& session) : HandlerImpl(session), tagGenerator("sgen") {}
-
- void qos(uint32_t prefetchSize,
- uint16_t prefetchCount, bool global);
- void consume(uint16_t ticket, const std::string& queue,
- const std::string& consumerTag,
- bool noLocal, bool noAck, bool exclusive, bool nowait,
- const qpid::framing::FieldTable& fields);
- void cancel(const std::string& consumerTag);
- void get(uint16_t ticket, const std::string& queue, bool noAck);
- void ack(uint64_t deliveryTag, bool multiple);
- void reject(uint64_t deliveryTag, bool requeue);
- void recover(bool requeue);
- };
-
- class TxHandlerImpl :
- public TxHandler,
- public HandlerImpl
- {
- public:
- TxHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
-
- void select();
- void commit();
- void rollback();
- };
-
- BasicHandlerImpl basicHandler;
- ExchangeHandlerImpl exchangeHandler;
- BindingHandlerImpl bindingHandler;
- MessageHandlerImpl messageHandler;
- QueueHandlerImpl queueHandler;
- TxHandlerImpl txHandler;
- DtxHandlerImpl dtxHandler;
-};
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_BrokerAdapter_h*/
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index fca381063e..1994c4fdf5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -20,9 +20,7 @@
*/
#include "Connection.h"
#include "SessionState.h"
-#include "BrokerAdapter.h"
#include "Bridge.h"
-#include "SemanticHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -143,7 +141,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
- ptr_map_ptr(i)->localSuspend();
+ ptr_map_ptr(i)->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index 7e20408388..5de5a0230a 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -21,7 +21,6 @@
#include "ConnectionFactory.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/amqp_0_10/Connection.h"
-#include "PreviewConnectionCodec.h"
namespace qpid {
namespace broker {
@@ -34,8 +33,6 @@ ConnectionFactory::~ConnectionFactory() {}
sys::ConnectionCodec*
ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == ProtocolVersion(99, 0))
- return new PreviewConnectionCodec(out, broker, id);
if (v == ProtocolVersion(0, 10))
return new amqp_0_10::Connection(out, broker, id);
return 0;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index c7738cc4ea..4ed2f5bfa2 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -50,9 +50,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
try{
bool handled = false;
if (handler->serverMode) {
- handled = invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method);
+ handled = invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method);
} else {
- handled = invoke(static_cast<AMQP_ClientOperations::Connection010Handler&>(*handler.get()), *method);
+ handled = invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method);
}
if (!handled) {
handler->connection.getChannel(frame.getChannel()).in(frame);
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index ea8b84b07c..a04936a943 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -41,11 +41,11 @@ class Connection;
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
- public framing::AMQP_ClientOperations::Connection010Handler
+ struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ public framing::AMQP_ClientOperations::ConnectionHandler
{
- framing::AMQP_ClientProxy::Connection010 client;
- framing::AMQP_ServerProxy::Connection010 server;
+ framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ServerProxy::Connection server;
Connection& connection;
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
deleted file mode 100644
index 61ab856fa9..0000000000
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "DtxHandlerImpl.h"
-
-#include <boost/format.hpp>
-#include "Broker.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/Array.h"
-
-using namespace qpid::broker;
-using namespace qpid::framing;
-using std::string;
-
-DtxHandlerImpl::DtxHandlerImpl(SemanticState& s) : HandlerImpl(s) {}
-
-// DtxDemarcationHandler:
-
-
-void DtxHandlerImpl::select()
-{
- state.selectDtx();
-}
-
-DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
- const string& xid,
- bool fail,
- bool suspend)
-{
- try {
- if (fail) {
- state.endDtx(xid, true);
- if (suspend) {
- throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
- } else {
- return DtxDemarcationXEndResult(XA_RBROLLBACK);
- }
- } else {
- if (suspend) {
- state.suspendDtx(xid);
- } else {
- state.endDtx(xid, false);
- }
- return DtxDemarcationXEndResult(XA_OK);
- }
- } catch (const DtxTimeoutException& e) {
- return DtxDemarcationXEndResult(XA_RBTIMEOUT);
- }
-}
-
-DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
- const string& xid,
- bool join,
- bool resume)
-{
- if (join && resume) {
- throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
- }
- try {
- if (resume) {
- state.resumeDtx(xid);
- } else {
- state.startDtx(xid, getBroker().getDtxManager(), join);
- }
- return DtxDemarcationXStartResult(XA_OK);
- } catch (const DtxTimeoutException& e) {
- return DtxDemarcationXStartResult(XA_RBTIMEOUT);
- }
-}
-
-// DtxCoordinationHandler:
-
-DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
- const string& xid)
-{
- try {
- bool ok = getBroker().getDtxManager().prepare(xid);
- return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
- } catch (const DtxTimeoutException& e) {
- return DtxCoordinationXPrepareResult(XA_RBTIMEOUT);
- }
-}
-
-DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
- const string& xid,
- bool onePhase)
-{
- try {
- bool ok = getBroker().getDtxManager().commit(xid, onePhase);
- return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK);
- } catch (const DtxTimeoutException& e) {
- return DtxCoordinationXCommitResult(XA_RBTIMEOUT);
- }
-}
-
-
-DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
- const string& xid )
-{
- try {
- getBroker().getDtxManager().rollback(xid);
- return DtxCoordinationXRollbackResult(XA_OK);
- } catch (const DtxTimeoutException& e) {
- return DtxCoordinationXRollbackResult(XA_RBTIMEOUT);
- }
-}
-
-DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
- bool /*startscan*/,
- bool /*endscan*/ )
-{
- //TODO: what do startscan and endscan actually mean?
-
- // response should hold on key value pair with key = 'xids' and
- // value = sequence of xids
-
- // until sequences are supported (0-10 encoding), an alternate
- // scheme is used for testing:
- //
- // key = 'xids' and value = a longstr containing shortstrs for each xid
- //
- // note that this restricts the length of the xids more than is
- // strictly 'legal', but that is ok for testing
- std::set<std::string> xids;
- getBroker().getStore().collectPreparedXids(xids);
-
- //TODO: remove the need to copy from one container type to another
- std::vector<std::string> data;
- for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
- data.push_back(*i);
- }
- Array indoubt(data);
- return DtxCoordinationXRecoverResult(indoubt);
-}
-
-void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
- const string& xid)
-{
- //Currently no heuristic completion is supported, so this should never be used.
- throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
-}
-
-DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
-{
- uint32_t timeout = getBroker().getDtxManager().getTimeout(xid);
- return DtxCoordinationXGetTimeoutResult(timeout);
-}
-
-
-void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
- const string& xid,
- u_int32_t timeout)
-{
- getBroker().getDtxManager().setTimeout(xid, timeout);
-}
-
-
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
deleted file mode 100644
index efb56dba95..0000000000
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ /dev/null
@@ -1,67 +0,0 @@
-#ifndef _broker_DtxHandlerImpl_h
-#define _broker_DtxHandlerImpl_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "HandlerImpl.h"
-
-namespace qpid {
-namespace broker {
-
-class DtxHandlerImpl
- : public HandlerImpl,
- public framing::AMQP_ServerOperations::DtxCoordinationHandler,
- public framing::AMQP_ServerOperations::DtxDemarcationHandler
-{
-public:
- DtxHandlerImpl(SemanticState&);
-
- // DtxCoordinationHandler:
-
- framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
-
- void forget(u_int16_t ticket, const std::string& xid);
-
- framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid);
-
- framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid);
-
- framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
-
- framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid);
-
- void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
-
- // DtxDemarcationHandler:
-
- framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
-
- void select();
-
- framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
-};
-
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_DtxHandlerImpl_h*/
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 297e610418..dd013843f9 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -24,7 +24,6 @@
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
@@ -36,7 +35,6 @@ using namespace qpid::framing;
using std::string;
TransferAdapter Message::TRANSFER;
-PreviewAdapter Message::TRANSFER_99_0;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
@@ -225,9 +223,7 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
MessageAdapter& Message::getAdapter() const
{
if (!adapter) {
- if(frames.isA<MessageXTransferBody>()) {
- adapter = &TRANSFER_99_0;
- } else if(frames.isA<MessageTransferBody>()) {
+ if(frames.isA<MessageTransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 4fd2f1401d..87c7a9c43e 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -136,7 +136,6 @@ public:
mutable MessageAdapter* adapter;
static TransferAdapter TRANSFER;
- static PreviewAdapter TRANSFER_99_0;
MessageAdapter& getAdapter() const;
};
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index 013e2c91ac..12f01494de 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -24,7 +24,6 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
namespace {
const std::string empty;
@@ -68,27 +67,4 @@ namespace broker{
return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
}
- std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::MessageXTransferBody>()->getDestination();
- }
-
- std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f)
- {
- const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
- return p ? p->getRoutingKey() : empty;
- }
-
- const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f)
- {
- const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>();
- return p ? &(p->getApplicationHeaders()) : 0;
- }
-
- bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
- {
- const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
- return p && p->getDeliveryMode() == 2;
- }
-
}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 4c13e756e9..61a1bc4794 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -52,14 +52,6 @@ struct TransferAdapter : MessageAdapter
bool requiresAccept(const framing::FrameSet& f);
};
-struct PreviewAdapter : TransferAdapter
-{
- std::string getExchange(const framing::FrameSet& f);
- std::string getRoutingKey(const framing::FrameSet& f);
- const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
- bool isPersistent(const framing::FrameSet& f);
-};
-
}}
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 36862edf37..f2a55e2790 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -24,10 +24,7 @@
#include "Message.h"
#include "Queue.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/BasicXDeliverBody.h"
-#include "qpid/framing/BasicXGetOkBody.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
using namespace boost;
@@ -43,41 +40,6 @@ struct BaseToken : DeliveryToken
virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0;
};
-struct BasicGetToken : BaseToken
-{
- typedef boost::shared_ptr<BasicGetToken> shared_ptr;
-
- Queue::shared_ptr queue;
-
- BasicGetToken(Queue::shared_ptr q) : queue(q) {}
-
- AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
- {
- return AMQFrame(in_place<BasicXGetOkBody>(
- ProtocolVersion(), id.getValue(),
- msg->getRedelivered(), msg->getExchangeName(),
- msg->getRoutingKey(), queue->getMessageCount()));
- }
-};
-
-struct BasicConsumeToken : BaseToken
-{
- typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
-
- const string consumer;
-
- BasicConsumeToken(const string c) : consumer(c) {}
-
- AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
- {
- return AMQFrame(in_place<BasicXDeliverBody>(
- ProtocolVersion(), consumer, id.getValue(),
- msg->getRedelivered(), msg->getExchangeName(),
- msg->getRoutingKey()));
- }
-
-};
-
struct MessageDeliveryToken : BaseToken
{
const std::string destination;
@@ -91,48 +53,23 @@ struct MessageDeliveryToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
{
//may need to set the redelivered flag:
- if (isPreview) {
- if (msg->getRedelivered()){
- msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true);
- }
- return AMQFrame(in_place<MessageXTransferBody>(
- ProtocolVersion(), 0, destination,
- confirmMode, acquireMode));
- } else {
- if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties>()->setRedelivered(true);
- }
- return AMQFrame(in_place<MessageTransferBody>(
- ProtocolVersion(), destination, confirmMode, acquireMode));
+ if (msg->getRedelivered()){
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
+ return AMQFrame(in_place<MessageTransferBody>(
+ ProtocolVersion(), destination, confirmMode, acquireMode));
}
};
}
}
-DeliveryToken::shared_ptr MessageDelivery::getBasicGetToken(Queue::shared_ptr queue)
-{
- return DeliveryToken::shared_ptr(new BasicGetToken(queue));
-}
-
-DeliveryToken::shared_ptr MessageDelivery::getBasicConsumeToken(const string& consumer)
-{
- return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
-}
-
DeliveryToken::shared_ptr MessageDelivery::getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode, u_int8_t acquireMode)
{
return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, false));
}
-DeliveryToken::shared_ptr MessageDelivery::getPreviewMessageDeliveryToken(const std::string& destination,
- u_int8_t confirmMode, u_int8_t acquireMode)
-{
- return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination, confirmMode, acquireMode, true));
-}
-
void MessageDelivery::deliver(QueuedMessage& msg,
framing::FrameHandler& handler,
DeliveryId id,
diff --git a/cpp/src/qpid/broker/MessageDelivery.h b/cpp/src/qpid/broker/MessageDelivery.h
index 564e1456a0..6deafbf519 100644
--- a/cpp/src/qpid/broker/MessageDelivery.h
+++ b/cpp/src/qpid/broker/MessageDelivery.h
@@ -34,15 +34,12 @@ class Message;
class Queue;
/**
+ * TODO: clean this up; we don't need it anymore in its current form
+ *
* Encapsulates the different options for message delivery currently supported.
*/
class MessageDelivery {
public:
- static boost::shared_ptr<DeliveryToken> getBasicGetToken(boost::shared_ptr<Queue> queue);
- static boost::shared_ptr<DeliveryToken> getBasicConsumeToken(const std::string& consumer);
- static boost::shared_ptr<DeliveryToken> getPreviewMessageDeliveryToken(const std::string& destination,
- u_int8_t confirmMode,
- u_int8_t acquireMode);
static boost::shared_ptr<DeliveryToken> getMessageDeliveryToken(const std::string& destination,
u_int8_t confirmMode,
u_int8_t acquireMode);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
deleted file mode 100644
index 5e0e759dfb..0000000000
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/Exception.h"
-#include "qpid/log/Statement.h"
-#include "MessageHandlerImpl.h"
-#include "qpid/framing/FramingContent.h"
-#include "Connection.h"
-#include "Broker.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "BrokerAdapter.h"
-
-#include <boost/format.hpp>
-#include <boost/cast.hpp>
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace broker {
-
-using namespace framing;
-
-MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
- HandlerImpl(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
- {}
-
-//
-// Message class method handlers
-//
-
-void
-MessageHandlerImpl::cancel(const string& destination )
-{
- state.cancel(destination);
-}
-
-void
-MessageHandlerImpl::open(const string& /*reference*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::close(const string& /*reference*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::checkpoint(const string& /*reference*/,
- const string& /*identifier*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::resume(const string& /*reference*/,
- const string& /*identifier*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::offset(uint64_t /*value*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::get(uint16_t /*ticket*/,
- const string& /*queueName*/,
- const string& /*destination*/,
- bool /*noAck*/ )
-{
- throw NotImplementedException("get no longer supported");
-}
-
-void
-MessageHandlerImpl::empty()
-{
- throw NotImplementedException("empty no longer supported");
-}
-
-void
-MessageHandlerImpl::ok()
-{
- throw NotImplementedException("Message.Ok no longer supported");
-}
-
-void
-MessageHandlerImpl::qos(uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool /*global*/ )
-{
- //TODO: handle global
- state.setPrefetchSize(prefetchSize);
- state.setPrefetchCount(prefetchCount);
-}
-
-void
-MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!destination.empty() && state.exists(destination))
- throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
-
- string tag = destination;
- state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
-}
-
-void
-MessageHandlerImpl::recover(bool requeue)
-{
- state.recover(requeue);
-}
-
-void
-MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
-{
- transfers.processRanges(rejectOp);
-}
-
-void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
-{
- if (unit == 0) {
- //message
- state.addMessageCredit(destination, value);
- } else if (unit == 1) {
- //bytes
- state.addByteCredit(destination, value);
- } else {
- //unknown
- throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
- }
-
-}
-
-void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
-{
- if (mode == 0) {
- //credit
- state.setCreditMode(destination);
- } else if (mode == 1) {
- //window
- state.setWindowMode(destination);
- } else{
- throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
- }
-}
-
-void MessageHandlerImpl::flush(const std::string& destination)
-{
- state.flush(destination);
-}
-
-void MessageHandlerImpl::stop(const std::string& destination)
-{
- state.stop(destination);
-}
-
-void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
-{
- //TODO: implement mode
-
- SequenceNumberSet results;
- RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
- transfers.processRanges(op);
- results = results.condense();
- getProxy().getMessage().acquired(results);
-}
-
-void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
-{
- transfers.processRanges(releaseOp);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h
deleted file mode 100644
index dd70f35dbb..0000000000
--- a/cpp/src/qpid/broker/MessageHandlerImpl.h
+++ /dev/null
@@ -1,110 +0,0 @@
-#ifndef _broker_MessageHandlerImpl_h
-#define _broker_MessageHandlerImpl_h
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <memory>
-
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "HandlerImpl.h"
-
-#include <boost/function.hpp>
-
-namespace qpid {
-namespace broker {
-
-class Connection;
-class Broker;
-class MessageMessage;
-
-class MessageHandlerImpl :
- public framing::AMQP_ServerOperations::MessageHandler,
- public HandlerImpl
-{
- typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
- RangedOperation releaseOp;
- RangedOperation rejectOp;
-
- public:
- MessageHandlerImpl(SemanticState&);
-
- void append(const std::string& reference, const std::string& bytes);
-
- void cancel(const std::string& destination );
-
- void checkpoint(const std::string& reference,
- const std::string& identifier );
-
- void close(const std::string& reference );
-
- void empty();
-
- void get(uint16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noAck );
-
- void offset(uint64_t value);
-
- void ok();
-
- void open(const std::string& reference );
-
- void qos(uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool global );
-
- void recover(bool requeue );
-
- void reject(const framing::SequenceNumberSet& transfers,
- uint16_t code,
- const std::string& text );
-
- void resume(const std::string& reference,
- const std::string& identifier );
-
- void flow(const std::string& destination, u_int8_t unit, u_int32_t value);
-
- void flowMode(const std::string& destination, u_int8_t mode);
-
- void flush(const std::string& destination);
-
- void stop(const std::string& destination);
-
- void acquire(const framing::SequenceNumberSet& transfers, u_int8_t mode);
-
- void release(const framing::SequenceNumberSet& transfers);
-
- void subscribe(u_int16_t ticket,
- const std::string& queue,
- const std::string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter);
-
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!_broker_MessageHandlerImpl_h*/
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
deleted file mode 100644
index 2643c85824..0000000000
--- a/cpp/src/qpid/broker/PreviewConnection.cpp
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PreviewConnection.h"
-#include "SessionState.h"
-#include "BrokerAdapter.h"
-#include "Bridge.h"
-#include "SemanticHandler.h"
-
-#include "qpid/log/Statement.h"
-#include "qpid/ptr_map.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/management/ManagementAgent.h"
-
-#include <boost/bind.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-
-#include <algorithm>
-#include <iostream>
-#include <assert.h>
-
-using namespace boost;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using qpid::ptr_map_ptr;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection::MgmtClient : public PreviewConnection::MgmtWrapper
-{
- management::Client::shared_ptr mgmtClient;
-
-public:
- MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtClient();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
-};
-
-class PreviewConnection::MgmtLink : public PreviewConnection::MgmtWrapper
-{
- typedef boost::ptr_vector<Bridge> Bridges;
-
- management::Link::shared_ptr mgmtLink;
- Bridges created;//holds list of bridges pending creation
- Bridges cancelled;//holds list of bridges pending cancellation
- Bridges active;//holds active bridges
- uint channelCounter;
- sys::Mutex linkLock;
-
- void cancel(Bridge*);
-
-public:
- MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId);
- ~MgmtLink();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
- void processPending();
- void process(PreviewConnection& connection, const management::Args& args);
-};
-
-
-PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
- ConnectionState(out_, broker_),
- adapter(*this, isLink),
- mgmtClosing(false),
- mgmtId(mgmtId_)
-{
- Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
- if (isLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtLink(this, parent, agent, mgmtId));
- } else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId));
- }
- }
- }
-}
-
-PreviewConnection::~PreviewConnection () {
-}
-
-void PreviewConnection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
-
- if (frame.getChannel() == 0) {
- adapter.handle(frame);
- } else {
- getChannel(frame.getChannel()).in(frame);
- }
-
- if (mgmtWrapper.get()) mgmtWrapper->received(frame);
-}
-
-void PreviewConnection::close(
- ReplyCode code, const string& text, ClassId classId, MethodId methodId)
-{
- adapter.close(code, text, classId, methodId);
- channels.clear();
- getOutput().close();
-}
-
-void PreviewConnection::idleOut(){}
-
-void PreviewConnection::idleIn(){}
-
-void PreviewConnection::closed(){ // Physically closed, suspend open sessions.
- try {
- for (ChannelMap::iterator i = channels.begin(); i != channels.end(); ++i)
- ptr_map_ptr(i)->localSuspend();
- while (!exclusiveQueues.empty()) {
- Queue::shared_ptr q(exclusiveQueues.front());
- q->releaseExclusiveOwnership();
- if (q->canAutoDelete()) {
- Queue::tryAutoDelete(broker, q);
- }
- exclusiveQueues.erase(exclusiveQueues.begin());
- }
- } catch(std::exception& e) {
- QPID_LOG(error, " Unhandled exception while closing session: " <<
- e.what());
- assert(0);
- }
-}
-
-bool PreviewConnection::doOutput()
-{
- try{
- //process any pending mgmt commands:
- if (mgmtWrapper.get()) mgmtWrapper->processPending();
- if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
-
-
- //then do other output as needed:
- return outputTasks.doOutput();
- }catch(ConnectionException& e){
- close(e.code, e.what(), 0, 0);
- }catch(std::exception& e){
- close(541/*internal error*/, e.what(), 0, 0);
- }
- return false;
-}
-
-void PreviewConnection::closeChannel(uint16_t id) {
- ChannelMap::iterator i = channels.find(id);
- if (i != channels.end()) channels.erase(i);
-}
-
-PreviewSessionHandler& PreviewConnection::getChannel(ChannelId id) {
- ChannelMap::iterator i=channels.find(id);
- if (i == channels.end()) {
- i = channels.insert(id, new PreviewSessionHandler(*this, id)).first;
- }
- return *ptr_map_ptr(i);
-}
-
-ManagementObject::shared_ptr PreviewConnection::GetManagementObject (void) const
-{
- return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
-}
-
-Manageable::status_t PreviewConnection::ManagementMethod (uint32_t methodId,
- Args& args)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
-
- QPID_LOG (debug, "PreviewConnection::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case management::Client::METHOD_CLOSE :
- mgmtClosing = true;
- if (mgmtWrapper.get()) mgmtWrapper->closing();
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
- case management::Link::METHOD_BRIDGE :
- //queue this up and request chance to do output (i.e. get connections thread of control):
- mgmtWrapper->process(*this, args);
- out->activateOutput();
- status = Manageable::STATUS_OK;
- break;
- }
-
- return status;
-}
-
-PreviewConnection::MgmtLink::MgmtLink(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
- : channelCounter(1)
-{
- mgmtLink = management::Link::shared_ptr
- (new management::Link(conn, parent, mgmtId));
- agent->addObject (mgmtLink);
-}
-
-PreviewConnection::MgmtLink::~MgmtLink()
-{
- if (mgmtLink.get () != 0)
- mgmtLink->resourceDestroy ();
-}
-
-void PreviewConnection::MgmtLink::received(framing::AMQFrame& frame)
-{
- if (mgmtLink.get () != 0)
- {
- mgmtLink->inc_framesFromPeer ();
- mgmtLink->inc_bytesFromPeer (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr PreviewConnection::MgmtLink::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtLink);
-}
-
-void PreviewConnection::MgmtLink::closing()
-{
- if (mgmtLink) mgmtLink->set_closing (1);
-}
-
-void PreviewConnection::MgmtLink::processPending()
-{
- Mutex::ScopedLock l(linkLock);
- //process any pending creates
- if (!created.empty()) {
- for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create();
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
- }
- cancelled.clear();
- }
-}
-
-void PreviewConnection::MgmtLink::process(PreviewConnection& connection, const management::Args& args)
-{
- Mutex::ScopedLock l(linkLock);
- created.push_back(new Bridge(channelCounter++, connection,
- boost::bind(&MgmtLink::cancel, this, _1),
- dynamic_cast<const management::ArgsLinkBridge&>(args)));
-}
-
-void PreviewConnection::MgmtLink::cancel(Bridge* b)
-{
- Mutex::ScopedLock l(linkLock);
- //need to take this out the active map and add it to the cancelled map
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == b) {
- cancelled.transfer(cancelled.end(), i, active);
- break;
- }
- }
-}
-
-PreviewConnection::MgmtClient::MgmtClient(PreviewConnection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, const std::string& mgmtId)
-{
- mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId));
- agent->addObject (mgmtClient);
-}
-
-PreviewConnection::MgmtClient::~MgmtClient()
-{
- if (mgmtClient.get () != 0)
- mgmtClient->resourceDestroy ();
-}
-
-void PreviewConnection::MgmtClient::received(framing::AMQFrame& frame)
-{
- if (mgmtClient.get () != 0)
- {
- mgmtClient->inc_framesFromClient ();
- mgmtClient->inc_bytesFromClient (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr PreviewConnection::MgmtClient::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtClient);
-}
-
-void PreviewConnection::MgmtClient::closing()
-{
- if (mgmtClient) mgmtClient->set_closing (1);
-}
-
-}}
-
diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h
deleted file mode 100644
index 7a8404bf77..0000000000
--- a/cpp/src/qpid/broker/PreviewConnection.h
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _PreviewConnection_
-#define _PreviewConnection_
-
-#include <memory>
-#include <sstream>
-#include <vector>
-
-#include <boost/ptr_container/ptr_map.hpp>
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "Broker.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/Exception.h"
-#include "PreviewConnectionHandler.h"
-#include "ConnectionState.h"
-#include "PreviewSessionHandler.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/management/Client.h"
-#include "qpid/management/Link.h"
-
-#include <boost/ptr_container/ptr_map.hpp>
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState
-{
- public:
- PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
- ~PreviewConnection ();
-
- /** Get the PreviewSessionHandler for channel. Create if it does not already exist */
- PreviewSessionHandler& getChannel(framing::ChannelId channel);
-
- /** Close the connection */
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
-
- // ConnectionInputHandler methods
- void received(framing::AMQFrame& frame);
- void idleOut();
- void idleIn();
- void closed();
- bool doOutput();
-
- void closeChannel(framing::ChannelId channel);
-
- // Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject (void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args);
-
- private:
- typedef boost::ptr_map<framing::ChannelId, PreviewSessionHandler> ChannelMap;
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
-
- /**
- * Connection may appear, for the purposes of management, as a
- * normal client initiated connection or as an agent initiated
- * inter-broker link. This wrapper abstracts the common interface
- * for both.
- */
- class MgmtWrapper
- {
- public:
- virtual ~MgmtWrapper(){}
- virtual void received(framing::AMQFrame& frame) = 0;
- virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
- virtual void closing() = 0;
- virtual void processPending(){}
- virtual void process(PreviewConnection&, const management::Args&){}
- };
- class MgmtClient;
- class MgmtLink;
-
- ChannelMap channels;
- framing::AMQP_ClientProxy::Connection* client;
- uint64_t stagingThreshold;
- PreviewConnectionHandler adapter;
- std::auto_ptr<MgmtWrapper> mgmtWrapper;
- bool mgmtClosing;
- const std::string mgmtId;
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
deleted file mode 100644
index b6c9b03776..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PreviewConnectionCodec.h"
-#include "qpid/log/Statement.h"
-
-namespace qpid {
-namespace broker {
-
-using sys::Mutex;
-
-PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id, bool isClient)
- : frameQueueClosed(false), output(o), connection(this, broker, id, isClient), identifier(id) {}
-
-size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) {
- framing::Buffer in(const_cast<char*>(buffer), size);
- framing::AMQFrame frame;
- while(frame.decode(in)) {
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection.received(frame);
- }
- return in.getPosition();
-}
-
-bool PreviewConnectionCodec::canEncode() {
- if (!frameQueueClosed && frameQueue.empty()) connection.doOutput();
- return !frameQueue.empty();
-}
-
-bool PreviewConnectionCodec::isClosed() const {
- Mutex::ScopedLock l(frameQueueLock);
- return frameQueueClosed;
-}
-
-size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) {
- Mutex::ScopedLock l(frameQueueLock);
- framing::Buffer out(const_cast<char*>(buffer), size);
- while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
- frameQueue.front().encode(out);
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
- frameQueue.pop();
- if (!frameQueueClosed && frameQueue.empty()) connection.doOutput();
- }
- if (!frameQueue.empty() && frameQueue.front().size() > size)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
- return out.getPosition();
-}
-
-void PreviewConnectionCodec::activateOutput() { output.activateOutput(); }
-
-void PreviewConnectionCodec::close() {
- // Close the output queue.
- Mutex::ScopedLock l(frameQueueLock);
- frameQueueClosed = true;
-}
-
-void PreviewConnectionCodec::closed() {
- connection.closed();
-}
-
-void PreviewConnectionCodec::send(framing::AMQFrame& f) {
- {
- Mutex::ScopedLock l(frameQueueLock);
- if (!frameQueueClosed)
- frameQueue.push(f);
- }
- activateOutput();
-}
-
-framing::ProtocolVersion PreviewConnectionCodec::getVersion() const {
- return framing::ProtocolVersion(99,0);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h
deleted file mode 100644
index f2ab086d06..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionCodec.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H
-#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/Mutex.h"
-#include "PreviewConnection.h"
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler {
- std::queue<framing::AMQFrame> frameQueue;
- bool frameQueueClosed;
- mutable sys::Mutex frameQueueLock;
- sys::OutputControl& output;
- PreviewConnection connection;
- std::string identifier;
-
- public:
- PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id, bool isClient = false);
- size_t decode(const char* buffer, size_t size);
- size_t encode(const char* buffer, size_t size);
- bool isClosed() const;
- bool canEncode();
- void activateOutput();
- void closed(); // connection closed by peer.
- void close(); // closing from this end.
- void send(framing::AMQFrame&);
- framing::ProtocolVersion getVersion() const;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
deleted file mode 100644
index 3477b59cb5..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
+++ /dev/null
@@ -1,315 +0,0 @@
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "config.h"
-
-#include "PreviewConnectionHandler.h"
-#include "PreviewConnection.h"
-#include "qpid/framing/ConnectionStartBody.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/log/Statement.h"
-
-#if HAVE_SASL
-#include <sasl/sasl.h>
-#endif
-
-using namespace qpid;
-using namespace qpid::broker;
-using namespace qpid::framing;
-
-
-namespace
-{
-const std::string PLAIN = "PLAIN";
-const std::string en_US = "en_US";
-}
-
-void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
-{
- handler->client.close(code, text, classId, methodId);
-}
-
-void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
-{
- AMQMethodBody* method=frame.getBody()->getMethod();
- try{
- if (handler->serverMode) {
- if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
- throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
- } else {
- if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
- throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
- }
- }catch(ConnectionException& e){
- handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
- }
-}
-
-PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection, bool isClient) : handler(new Handler(connection)) {
- FieldTable properties;
- string mechanisms;
- string locales(en_US);
- if (isClient) {
- handler->serverMode = false;
- }else {
-#if HAVE_SASL
- if (connection.getBroker().getOptions().auth) {
- const char *list;
- unsigned int list_len;
- int count;
- int code = sasl_listmech(handler->sasl_conn, NULL,
- "", " ", "",
- &list, &list_len,
- &count);
-
- if (SASL_OK != code) {
- QPID_LOG(info, "SASL: Mechanism listing failed: "
- << sasl_errdetail(handler->sasl_conn));
-
- // TODO: Change this to an exception signaling
- // server error, when one is available
- throw CommandInvalidException("Mechanism listing failed");
- } else {
- // TODO: For 0-10 the mechanisms must be returned
- // in a list instead of space separated
- mechanisms = list;
- }
- } else {
-#endif
- // TODO: It would be more proper for this to be ANONYMOUS
- mechanisms = PLAIN;
-#if HAVE_SASL
- }
-#endif
-
- QPID_LOG(info, "SASL: Sending mechanism list: " << mechanisms);
-
- handler->serverMode = true;
- handler->client.start(99, 0, properties, mechanisms, locales);
- }
-}
-
-PreviewConnectionHandler::Handler::Handler(PreviewConnection& c) :
-#if HAVE_SASL
- sasl_conn(NULL),
-#endif
- client(c.getOutput()), server(c.getOutput()),
- connection(c), serverMode(false)
-{
-#if HAVE_SASL
- if (connection.getBroker().getOptions().auth) {
- int code = sasl_server_new(BROKER_SASL_NAME,
- NULL, NULL, NULL, NULL, NULL, 0,
- &sasl_conn);
-
- if (SASL_OK != code) {
- QPID_LOG(info, "SASL: Connection creation failed: "
- << sasl_errdetail(sasl_conn));
-
- // TODO: Change this to an exception signaling
- // server error, when one is available
- throw CommandInvalidException("Unable to perform authentication");
- }
- }
-#endif
-}
-
-PreviewConnectionHandler::Handler::~Handler()
-{
-#if HAVE_SASL
- if (NULL != sasl_conn) {
- sasl_dispose(&sasl_conn);
- sasl_conn = NULL;
- }
-#endif
-}
-
-#if HAVE_SASL
-void PreviewConnectionHandler::Handler::processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len)
-{
- if (SASL_OK == code) {
- const void *uid;
-
- code = sasl_getprop(sasl_conn,
- SASL_USERNAME,
- &uid);
- if (SASL_OK != code) {
- QPID_LOG(info, "SASL: Authentication succeeded, username unavailable");
- // TODO: Change this to an exception signaling
- // authentication failure, when one is available
- throw ConnectionForcedException("Authenticated username unavailable");
- }
-
- QPID_LOG(info, "SASL: Authentication succeeded for: " << (char *)uid);
-
- connection.setUserId((char *)uid);
-
- client.tune(framing::CHANNEL_MAX,
- connection.getFrameMax(),
- connection.getHeartbeat());
- } else if (SASL_CONTINUE == code) {
- string challenge_str(challenge, challenge_len);
-
- QPID_LOG(debug, "SASL: sending challenge to client");
-
- client.secure(challenge_str);
- } else {
- QPID_LOG(info, "SASL: Authentication failed: "
- << sasl_errdetail(sasl_conn));
-
- // TODO: Change to more specific exceptions, when they are
- // available
- switch (code) {
- case SASL_NOMECH:
- throw ConnectionForcedException("Unsupported mechanism");
- break;
- case SASL_TRYAGAIN:
- throw ConnectionForcedException("Transient failure, try again");
- break;
- default:
- throw ConnectionForcedException("Authentication failed");
- break;
- }
- }
-}
-#endif
-
-void PreviewConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProperties*/,
-#if HAVE_SASL
- const string& mechanism,
- const string& response,
-#else
- const string& /*mechanism*/,
- const string& /*response*/,
-#endif
- const string& /*locale*/)
-{
-#if HAVE_SASL
- if (connection.getBroker().getOptions().auth) {
- const char *challenge;
- unsigned int challenge_len;
-
- QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
- int code = sasl_server_start(sasl_conn,
- mechanism.c_str(),
- response.c_str(), response.length(),
- &challenge, &challenge_len);
-
- processAuthenticationStep(code, challenge, challenge_len);
- } else {
-#endif
- QPID_LOG(warning, "SASL: No Authentication Performed");
-
- // TODO: Figure out what should actually be set in this case
- connection.setUserId("anonymous");
-
- client.tune(framing::CHANNEL_MAX,
- connection.getFrameMax(),
- connection.getHeartbeat());
-#if HAVE_SASL
- }
-#endif
-}
-
-void PreviewConnectionHandler::Handler::secureOk(const string&
-#if HAVE_SASL
- response
-#endif
- ) {
-#if HAVE_SASL
- int code;
- const char *challenge;
- unsigned int challenge_len;
-
- code = sasl_server_step(sasl_conn,
- response.c_str(), response.length(),
- &challenge, &challenge_len);
-
- processAuthenticationStep(code, challenge, challenge_len);
-#endif
-}
-
-void PreviewConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
- uint32_t framemax, uint16_t heartbeat)
-{
- connection.setFrameMax(framemax);
- connection.setHeartbeat(heartbeat);
-}
-
-void PreviewConnectionHandler::Handler::open(const string& /*virtualHost*/,
- const string& /*capabilities*/, bool /*insist*/)
-{
- string knownhosts;
- client.openOk(knownhosts);
-}
-
-
-void PreviewConnectionHandler::Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/,
- uint16_t /*classId*/, uint16_t /*methodId*/)
-{
- client.closeOk();
- connection.getOutput().close();
-}
-
-void PreviewConnectionHandler::Handler::closeOk(){
- connection.getOutput().close();
-}
-
-
-void PreviewConnectionHandler::Handler::start(uint8_t /*versionMajor*/,
- uint8_t /*versionMinor*/,
- const FieldTable& /*serverProperties*/,
- const string& /*mechanisms*/,
- const string& /*locales*/)
-{
- string uid = "qpidd";
- string pwd = "qpidd";
- string response = ((char)0) + uid + ((char)0) + pwd;
- server.startOk(FieldTable(), PLAIN, response, en_US);
-}
-
-void PreviewConnectionHandler::Handler::secure(const string& /*challenge*/)
-{
- server.secureOk("");
-}
-
-void PreviewConnectionHandler::Handler::tune(uint16_t channelMax,
- uint32_t frameMax,
- uint16_t heartbeat)
-{
- connection.setFrameMax(frameMax);
- connection.setHeartbeat(heartbeat);
- server.tuneOk(channelMax, frameMax, heartbeat);
- server.open("/", "", true);
-}
-
-void PreviewConnectionHandler::Handler::openOk(const string& /*knownHosts*/)
-{
-}
-
-void PreviewConnectionHandler::Handler::redirect(const string& /*host*/, const string& /*knownHosts*/)
-{
-
-}
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h
deleted file mode 100644
index b71068d81d..0000000000
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.h
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _PreviewConnectionAdapter_
-#define _PreviewConnectionAdapter_
-
-#include "config.h"
-
-#include <memory>
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ServerProxy.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/Exception.h"
-
-#if HAVE_SASL
-#include <sasl/sasl.h>
-#endif
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection;
-
-// TODO aconway 2007-09-18: Rename to ConnectionHandler
-class PreviewConnectionHandler : public framing::FrameHandler
-{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
- public framing::AMQP_ClientOperations::ConnectionHandler
- {
-#if HAVE_SASL
- sasl_conn_t *sasl_conn;
-#endif
- framing::AMQP_ClientProxy::Connection client;
- framing::AMQP_ServerProxy::Connection server;
- PreviewConnection& connection;
- bool serverMode;
-
- Handler(PreviewConnection& connection);
- ~Handler();
- void startOk(const qpid::framing::FieldTable& clientProperties,
- const std::string& mechanism, const std::string& response,
- const std::string& locale);
- void secureOk(const std::string& response);
- void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
- void open(const std::string& virtualHost,
- const std::string& capabilities, bool insist);
- void close(uint16_t replyCode, const std::string& replyText,
- uint16_t classId, uint16_t methodId);
- void closeOk();
-
-
- void start(uint8_t versionMajor,
- uint8_t versionMinor,
- const qpid::framing::FieldTable& serverProperties,
- const std::string& mechanisms,
- const std::string& locales);
-
- void secure(const std::string& challenge);
-
- void tune(uint16_t channelMax,
- uint32_t frameMax,
- uint16_t heartbeat);
-
- void openOk(const std::string& knownHosts);
-
- void redirect(const std::string& host, const std::string& knownHosts);
- private:
-#if HAVE_SASL
- void processAuthenticationStep(int code,
- const char *challenge,
- unsigned int challenge_len);
-#endif
- };
- std::auto_ptr<Handler> handler;
- public:
- PreviewConnectionHandler(PreviewConnection& connection, bool isClient);
- void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
- void handle(framing::AMQFrame& frame);
-};
-
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.cpp b/cpp/src/qpid/broker/PreviewSessionHandler.cpp
deleted file mode 100644
index 36092bb7f6..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionHandler.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "PreviewSessionHandler.h"
-#include "PreviewSessionState.h"
-#include "PreviewConnection.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/constants.h"
-#include "qpid/framing/ClientInvoker.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace broker {
-using namespace framing;
-using namespace std;
-using namespace qpid::sys;
-
-PreviewSessionHandler::PreviewSessionHandler(PreviewConnection& c, ChannelId ch)
- : InOutHandler(0, &out),
- connection(c), channel(ch, &c.getOutput()),
- proxy(out), // Via my own handleOut() for L2 data.
- peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false) {}
-
-PreviewSessionHandler::~PreviewSessionHandler() {}
-
-namespace {
-ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
-MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
-} // namespace
-
-void PreviewSessionHandler::handleIn(AMQFrame& f) {
- // Note on channel states: a channel is open if session != 0. A
- // channel that is closed (session == 0) can be in the "ignoring"
- // state. This is a temporary state after we have sent a channel
- // exception, where extra frames might arrive that should be
- // ignored.
- //
- AMQMethodBody* m = f.getBody()->getMethod();
- try {
- if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
- session->in.handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
- } else if (!ignoring) {
- throw ChannelErrorException(
- QPID_MSG("Channel " << channel.get() << " is not open"));
- }
- } catch(const ChannelException& e) {
- ignoring=true; // Ignore trailing frames sent by client.
- session->detach();
- session.reset();
- peerSession.closed(e.code, e.what());
- }catch(const ConnectionException& e){
- connection.close(e.code, e.what(), classId(m), methodId(m));
- }catch(const std::exception& e){
- connection.close(
- framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
- }
-}
-
-void PreviewSessionHandler::handleOut(AMQFrame& f) {
- channel.handle(f); // Send it.
- if (session->sent(f))
- peerSession.solicitAck();
-}
-
-void PreviewSessionHandler::assertAttached(const char* method) const {
- if (!session.get())
- throw ChannelErrorException(
- QPID_MSG(method << " failed: No session for channel "
- << getChannel()));
-}
-
-void PreviewSessionHandler::assertClosed(const char* method) const {
- if (session.get())
- throw ChannelBusyException(
- QPID_MSG(method << " failed: channel " << channel.get()
- << " is already open."));
-}
-
-void PreviewSessionHandler::open(uint32_t detachedLifetime) {
- assertClosed("open");
- std::auto_ptr<PreviewSessionState> state(
- connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
- peerSession.attached(session->getId(), session->getTimeout());
-}
-
-void PreviewSessionHandler::resume(const Uuid& id) {
- assertClosed("resume");
- session = connection.broker.getPreviewSessionManager().resume(id);
- session->attach(*this);
- SequenceNumber seq = session->resuming();
- peerSession.attached(session->getId(), session->getTimeout());
- proxy.getSession().ack(seq, SequenceNumberSet());
-}
-
-void PreviewSessionHandler::flow(bool /*active*/) {
- assertAttached("flow");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flow");
-}
-
-void PreviewSessionHandler::flowOk(bool /*active*/) {
- assertAttached("flowOk");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flowOk");
-}
-
-void PreviewSessionHandler::close() {
- assertAttached("close");
- QPID_LOG(info, "Received session.close");
- ignoring=false;
- session->detach();
- session.reset();
- peerSession.closed(REPLY_SUCCESS, "ok");
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void PreviewSessionHandler::closed(uint16_t replyCode, const string& replyText) {
- QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
- ignoring=false;
- session->detach();
- session.reset();
-}
-
-void PreviewSessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getPreviewSessionManager().suspend(session);
- session.reset();
- }
-}
-
-void PreviewSessionHandler::suspend() {
- assertAttached("suspend");
- localSuspend();
- peerSession.detached();
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void PreviewSessionHandler::ack(uint32_t cumulativeSeenMark,
- const SequenceNumberSet& /*seenFrameSet*/)
-{
- assertAttached("ack");
- if (session->getState() == PreviewSessionState::RESUMING) {
- session->receivedAck(cumulativeSeenMark);
- framing::SessionState::Replay replay=session->replay();
- std::for_each(replay.begin(), replay.end(),
- boost::bind(&PreviewSessionHandler::handleOut, this, _1));
- }
- else
- session->receivedAck(cumulativeSeenMark);
-}
-
-void PreviewSessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.high-water-mark");
-}
-
-void PreviewSessionHandler::solicitAck() {
- assertAttached("solicit-ack");
- peerSession.ack(session->sendingAck(), SequenceNumberSet());
-}
-
-void PreviewSessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
-{
- std::auto_ptr<PreviewSessionState> state(
- connection.broker.getPreviewSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
-}
-
-void PreviewSessionHandler::detached()
-{
- connection.broker.getPreviewSessionManager().suspend(session);
- session.reset();
-}
-
-ConnectionState& PreviewSessionHandler::getConnection() { return connection; }
-const ConnectionState& PreviewSessionHandler::getConnection() const { return connection; }
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionHandler.h b/cpp/src/qpid/broker/PreviewSessionHandler.h
deleted file mode 100644
index 4c517367d7..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionHandler.h
+++ /dev/null
@@ -1,111 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWSESSIONHANDLER_H
-#define QPID_BROKER_PREVIEWSESSIONHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AMQP_ClientOperations.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/ChannelHandler.h"
-#include "SessionContext.h"
-
-#include <boost/noncopyable.hpp>
-
-namespace qpid {
-namespace broker {
-
-class PreviewConnection;
-class PreviewSessionState;
-
-/**
- * A SessionHandler is associated with each active channel. It
- * receives incoming frames, handles session commands and manages the
- * association between the channel and a session.
- */
-class PreviewSessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::AMQP_ClientOperations::SessionHandler,
- public framing::FrameHandler::InOutHandler,
- private boost::noncopyable
-{
- public:
- PreviewSessionHandler(PreviewConnection&, framing::ChannelId);
- ~PreviewSessionHandler();
-
- /** Returns 0 if not attached to a session */
- PreviewSessionState* getSession() { return session.get(); }
- const PreviewSessionState* getSession() const { return session.get(); }
-
- framing::ChannelId getChannel() const { return channel.get(); }
-
- ConnectionState& getConnection();
- const ConnectionState& getConnection() const;
-
- framing::AMQP_ClientProxy& getProxy() { return proxy; }
- const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
-
- // Called by closing connection.
- void localSuspend();
- void detach() { localSuspend(); }
-
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
-
- private:
- /// Session methods
- void open(uint32_t detachedLifetime);
- void flow(bool active);
- void flowOk(bool active);
- void close();
- void closed(uint16_t replyCode, const std::string& replyText);
- void resume(const framing::Uuid& sessionId);
- void suspend();
- void ack(uint32_t cumulativeSeenMark,
- const framing::SequenceNumberSet& seenFrameSet);
- void highWaterMark(uint32_t lastSentMark);
- void solicitAck();
-
- //extra methods required for assuming client role
- void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
- void detached();
-
-
- void assertAttached(const char* method) const;
- void assertActive(const char* method) const;
- void assertClosed(const char* method) const;
-
-
- PreviewConnection& connection;
- framing::ChannelHandler channel;
- framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
- bool ignoring;
- std::auto_ptr<PreviewSessionState> session;
-};
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!QPID_BROKER_SESSIONHANDLER_H*/
diff --git a/cpp/src/qpid/broker/PreviewSessionManager.cpp b/cpp/src/qpid/broker/PreviewSessionManager.cpp
deleted file mode 100644
index 97a7c87e34..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionManager.cpp
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "PreviewSessionManager.h"
-#include "PreviewSessionState.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-#include "qpid/log/Helpers.h"
-#include "qpid/memory.h"
-
-#include <boost/bind.hpp>
-#include <boost/range.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-#include <algorithm>
-#include <functional>
-#include <ostream>
-
-namespace qpid {
-namespace broker {
-
-using namespace sys;
-using namespace framing;
-
-PreviewSessionManager::PreviewSessionManager(uint32_t a) : ack(a) {}
-
-PreviewSessionManager::~PreviewSessionManager() {}
-
-// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
-std::auto_ptr<PreviewSessionState> PreviewSessionManager::open(
- PreviewSessionHandler& h, uint32_t timeout_)
-{
- Mutex::ScopedLock l(lock);
- std::auto_ptr<PreviewSessionState> session(
- new PreviewSessionState(this, &h, timeout_, ack));
- active.insert(session->getId());
- for_each(observers.begin(), observers.end(),
- boost::bind(&Observer::opened, _1,boost::ref(*session)));
- return session;
-}
-
-void PreviewSessionManager::suspend(std::auto_ptr<PreviewSessionState> session) {
- Mutex::ScopedLock l(lock);
- active.erase(session->getId());
- session->suspend();
- session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
- if (session->mgmtObject.get() != 0)
- session->mgmtObject->set_expireTime ((uint64_t) Duration (session->expiry));
- suspended.push_back(session.release()); // In expiry order
- eraseExpired();
-}
-
-std::auto_ptr<PreviewSessionState> PreviewSessionManager::resume(const Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- eraseExpired();
- if (active.find(id) != active.end())
- throw SessionBusyException(
- QPID_MSG("Session already active: " << id));
- Suspended::iterator i = std::find_if(
- suspended.begin(), suspended.end(),
- boost::bind(std::equal_to<Uuid>(), id, boost::bind(&PreviewSessionState::getId, _1))
- );
- if (i == suspended.end())
- throw InvalidArgumentException(
- QPID_MSG("No suspended session with id=" << id));
- active.insert(id);
- std::auto_ptr<PreviewSessionState> state(suspended.release(i).release());
- return state;
-}
-
-void PreviewSessionManager::erase(const framing::Uuid& id)
-{
- Mutex::ScopedLock l(lock);
- active.erase(id);
-}
-
-void PreviewSessionManager::eraseExpired() {
- // Called with lock held.
- if (!suspended.empty()) {
- Suspended::iterator keep = std::lower_bound(
- suspended.begin(), suspended.end(), now(),
- boost::bind(std::less<AbsTime>(), boost::bind(&PreviewSessionState::expiry, _1), _2));
- if (suspended.begin() != keep) {
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
- }
- }
-}
-
-void PreviewSessionManager::add(const boost::intrusive_ptr<Observer>& o) {
- observers.push_back(o);
-}
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionManager.h b/cpp/src/qpid/broker/PreviewSessionManager.h
deleted file mode 100644
index 9bc6bc5bbc..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionManager.h
+++ /dev/null
@@ -1,101 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWSESSIONMANAGER_H
-#define QPID_BROKER_PREVIEWSESSIONMANAGER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/framing/Uuid.h>
-#include <qpid/sys/Time.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/RefCounted.h>
-
-#include <set>
-#include <vector>
-#include <memory>
-
-#include <boost/noncopyable.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-
-class PreviewSessionState;
-class PreviewSessionHandler;
-
-/**
- * Create and manage PreviewSessionState objects.
- */
-class PreviewSessionManager : private boost::noncopyable {
- public:
- /**
- * Observer notified of PreviewSessionManager events.
- */
- struct Observer : public RefCounted {
- virtual void opened(PreviewSessionState&) {}
- };
-
- PreviewSessionManager(uint32_t ack);
-
- ~PreviewSessionManager();
-
- /** Open a new active session, caller takes ownership */
- std::auto_ptr<PreviewSessionState> open(PreviewSessionHandler& c, uint32_t timeout_);
-
- /** Suspend a session, start it's timeout counter.
- * The factory takes ownership.
- */
- void suspend(std::auto_ptr<PreviewSessionState> session);
-
- /** Resume a suspended session.
- *@throw Exception if timed out or non-existant.
- */
- std::auto_ptr<PreviewSessionState> resume(const framing::Uuid&);
-
- /** Add an Observer. */
- void add(const boost::intrusive_ptr<Observer>&);
-
- private:
- typedef boost::ptr_vector<PreviewSessionState> Suspended;
- typedef std::set<framing::Uuid> Active;
- typedef std::vector<boost::intrusive_ptr<Observer> > Observers;
-
- void erase(const framing::Uuid&);
- void eraseExpired();
-
- sys::Mutex lock;
- Suspended suspended;
- Active active;
- uint32_t ack;
- Observers observers;
-
- friend class PreviewSessionState; // removes deleted sessions from active set.
-};
-
-
-
-}} // namespace qpid::broker
-
-
-
-
-
-#endif /*!QPID_BROKER_PREVIEWSESSIONMANAGER_H*/
diff --git a/cpp/src/qpid/broker/PreviewSessionState.cpp b/cpp/src/qpid/broker/PreviewSessionState.cpp
deleted file mode 100644
index 43c3b1509e..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionState.cpp
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "PreviewSessionState.h"
-#include "PreviewSessionManager.h"
-#include "PreviewSessionHandler.h"
-#include "ConnectionState.h"
-#include "Broker.h"
-#include "SemanticHandler.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace broker {
-
-using namespace framing;
-using sys::Mutex;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-
-PreviewSessionState::PreviewSessionState(
- PreviewSessionManager* f, PreviewSessionHandler* h, uint32_t timeout_, uint32_t ack)
- : framing::SessionState(ack, timeout_ > 0),
- factory(f), handler(h), id(true), timeout(timeout_),
- broker(h->getConnection().broker),
- version(h->getConnection().getVersion()),
- semanticHandler(new SemanticHandler(*this))
-{
- in.next = semanticHandler.get();
- out.next = &handler->out;
-
- getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
-
- Manageable* parent = broker.GetVhostObject ();
-
- if (parent != 0)
- {
- ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
-
- if (agent.get () != 0)
- {
- mgmtObject = management::Session::shared_ptr
- (new management::Session (this, parent, id.str ()));
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h->getChannel());
- mgmtObject->set_detachedLifespan (getTimeout());
- agent->addObject (mgmtObject);
- }
- }
-}
-
-PreviewSessionState::~PreviewSessionState() {
- // Remove ID from active session list.
- if (factory)
- factory->erase(getId());
- if (mgmtObject.get () != 0)
- mgmtObject->resourceDestroy ();
-}
-
-PreviewSessionHandler* PreviewSessionState::getHandler() {
- return handler;
-}
-
-AMQP_ClientProxy& PreviewSessionState::getProxy() {
- assert(isAttached());
- return getHandler()->getProxy();
-}
-
-ConnectionState& PreviewSessionState::getConnection() {
- assert(isAttached());
- return getHandler()->getConnection();
-}
-
-bool PreviewSessionState::isLocal(const ConnectionToken* t) const
-{
- return isAttached() && &(handler->getConnection()) == t;
-}
-
-void PreviewSessionState::detach() {
- getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
- Mutex::ScopedLock l(lock);
- handler = 0; out.next = 0;
- if (mgmtObject.get() != 0)
- {
- mgmtObject->set_attached (0);
- }
-}
-
-void PreviewSessionState::attach(PreviewSessionHandler& h) {
- {
- Mutex::ScopedLock l(lock);
- handler = &h;
- out.next = &handler->out;
- if (mgmtObject.get() != 0)
- {
- mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h.getChannel());
- }
- }
- h.getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
-}
-
-void PreviewSessionState::activateOutput()
-{
- Mutex::ScopedLock l(lock);
- if (isAttached()) {
- getConnection().outputTasks.activateOutput();
- }
-}
- //This class could be used as the callback for queue notifications
- //if not attached, it can simply ignore the callback, else pass it
- //on to the connection
-
-ManagementObject::shared_ptr PreviewSessionState::GetManagementObject (void) const
-{
- return dynamic_pointer_cast<ManagementObject> (mgmtObject);
-}
-
-Manageable::status_t PreviewSessionState::ManagementMethod (uint32_t methodId,
- Args& /*args*/)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
-
- switch (methodId)
- {
- case management::Session::METHOD_DETACH :
- if (handler != 0)
- {
- handler->detach();
- }
- status = Manageable::STATUS_OK;
- break;
-
- case management::Session::METHOD_CLOSE :
- /*
- if (handler != 0)
- {
- handler->getConnection().closeChannel(handler->getChannel());
- }
- status = Manageable::STATUS_OK;
- break;
- */
-
- case management::Session::METHOD_SOLICITACK :
- case management::Session::METHOD_RESETLIFESPAN :
- status = Manageable::STATUS_NOT_IMPLEMENTED;
- break;
- }
-
- return status;
-}
-
-
-}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewSessionState.h b/cpp/src/qpid/broker/PreviewSessionState.h
deleted file mode 100644
index 1aecb12e72..0000000000
--- a/cpp/src/qpid/broker/PreviewSessionState.h
+++ /dev/null
@@ -1,125 +0,0 @@
-#ifndef QPID_BROKER_PREVIEWSESSION_H
-#define QPID_BROKER_PREVIEWSESSION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/Uuid.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SessionState.h"
-#include "qpid/framing/ProtocolVersion.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Time.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/management/Session.h"
-#include "SessionContext.h"
-
-#include <boost/noncopyable.hpp>
-#include <boost/scoped_ptr.hpp>
-
-#include <set>
-#include <vector>
-#include <ostream>
-
-namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
-namespace broker {
-
-class SemanticHandler;
-class PreviewSessionHandler;
-class PreviewSessionManager;
-class Broker;
-class ConnectionState;
-
-/**
- * Broker-side session state includes sessions handler chains, which may
- * themselves have state.
- */
-class PreviewSessionState : public framing::SessionState,
- public SessionContext,
- public framing::FrameHandler::Chains,
- public management::Manageable
-{
- public:
- ~PreviewSessionState();
- bool isAttached() const { return handler; }
-
- void detach();
- void attach(PreviewSessionHandler& handler);
-
-
- PreviewSessionHandler* getHandler();
-
- /** @pre isAttached() */
- framing::AMQP_ClientProxy& getProxy();
-
- /** @pre isAttached() */
- ConnectionState& getConnection();
- bool isLocal(const ConnectionToken* t) const;
-
- uint32_t getTimeout() const { return timeout; }
- Broker& getBroker() { return broker; }
- framing::ProtocolVersion getVersion() const { return version; }
-
- /** OutputControl **/
- void activateOutput();
-
- // Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject (void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args);
-
- // Normally SessionManager creates sessions.
- PreviewSessionState(PreviewSessionManager*,
- PreviewSessionHandler* out,
- uint32_t timeout,
- uint32_t ackInterval);
-
-
- private:
- PreviewSessionManager* factory;
- PreviewSessionHandler* handler;
- framing::Uuid id;
- uint32_t timeout;
- sys::AbsTime expiry; // Used by SessionManager.
- Broker& broker;
- framing::ProtocolVersion version;
- sys::Mutex lock;
- boost::scoped_ptr<SemanticHandler> semanticHandler;
- management::Session::shared_ptr mgmtObject;
-
- friend class PreviewSessionManager;
-};
-
-
-inline std::ostream& operator<<(std::ostream& out, const PreviewSessionState& session) {
- return out << session.getId();
-}
-
-}} // namespace qpid::broker
-
-
-
-#endif /*!QPID_BROKER_SESSION_H*/
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 628d969c69..e799cde2b9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -340,11 +340,11 @@ bool Queue::seek(QueuedMessage& msg, Consumer& c) {
void Queue::consume(Consumer& c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
- throw AccessRefusedException(
+ throw ResourceLockedException(
QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
} else if(requestExclusive) {
if(consumerCount) {
- throw AccessRefusedException(
+ throw ResourceLockedException(
QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
} else {
exclusive = c.getSession();
@@ -596,7 +596,6 @@ void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
-
}
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp
index e2fcd493db..95e529f47e 100644
--- a/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/cpp/src/qpid/broker/QueueBindings.cpp
@@ -20,8 +20,10 @@
*/
#include "QueueBindings.h"
#include "ExchangeRegistry.h"
+#include "qpid/framing/reply_exceptions.h"
using qpid::framing::FieldTable;
+using qpid::framing::NotFoundException;
using std::string;
using namespace qpid::broker;
@@ -35,7 +37,7 @@ void QueueBindings::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr queue)
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
try {
exchanges.get(i->exchange)->unbind(queue, i->key, &(i->args));
- } catch (ChannelException&) {
+ } catch (const NotFoundException&) {
}
}
}
diff --git a/cpp/src/qpid/broker/SaslAuthenticator.cpp b/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 9ca4069a12..56718502f1 100644
--- a/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -23,6 +23,7 @@
#include "Connection.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#if HAVE_SASL
#include <sasl/sasl.h>
@@ -37,7 +38,7 @@ namespace broker {
class NullAuthenticator : public SaslAuthenticator
{
Connection& connection;
- framing::AMQP_ClientProxy::Connection010 client;
+ framing::AMQP_ClientProxy::Connection client;
public:
NullAuthenticator(Connection& connection);
~NullAuthenticator();
@@ -52,7 +53,7 @@ class CyrusAuthenticator : public SaslAuthenticator
{
sasl_conn_t *sasl_conn;
Connection& connection;
- framing::AMQP_ClientProxy::Connection010 client;
+ framing::AMQP_ClientProxy::Connection client;
void processAuthenticationStep(int code, const char *challenge, unsigned int challenge_len);
@@ -117,7 +118,7 @@ void CyrusAuthenticator::init()
// TODO: Change this to an exception signaling
// server error, when one is available
- throw CommandInvalidException("Unable to perform authentication");
+ throw ConnectionForcedException("Unable to perform authentication");
}
}
@@ -146,7 +147,7 @@ void CyrusAuthenticator::getMechanisms(Array& mechanisms)
// TODO: Change this to an exception signaling
// server error, when one is available
- throw CommandInvalidException("Mechanism listing failed");
+ throw ConnectionForcedException("Mechanism listing failed");
} else {
string mechanism;
unsigned int start;
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
deleted file mode 100644
index 411e0ce3c0..0000000000
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SemanticHandler.h"
-#include "SemanticState.h"
-#include "SessionContext.h"
-#include "BrokerAdapter.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/ExecutionXCompleteBody.h"
-#include "qpid/framing/ServerInvoker.h"
-#include "qpid/log/Statement.h"
-
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-SemanticHandler::SemanticHandler(SessionContext& s) :
- state(*this,s), session(s),
- msgBuilder(&s.getConnection().getBroker().getStore(), s.getConnection().getBroker().getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
- {}
-
-void SemanticHandler::handle(framing::AMQFrame& frame)
-{
- //TODO: assembly for method and headers
-
- //have potentially three separate tracks at this point:
- //
- // (1) execution controls
- // (2) commands
- // (3) data i.e. content-bearing commands
- //
- //framesets on each can be interleaved. framesets on the latter
- //two share a command-id sequence. controls on the first track are
- //used to communicate details about that command-id sequence.
- //
- //need to decide what to do if a frame on the command track
- //arrives while a frameset on the data track is still
- //open. execute it (i.e. out-of order execution with respect to
- //the command id sequence) or queue it up?
-
- TrackId track = getTrack(frame);//will be replaced by field in 0-10 frame header
-
- switch(track) {
- case EXECUTION_CONTROL_TRACK:
- handleL3(frame.getMethod());
- break;
- case MODEL_COMMAND_TRACK:
- handleCommand(frame.getMethod());
- break;
- case MODEL_CONTENT_TRACK:
- handleContent(frame);
- break;
- }
-}
-
-void SemanticHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- //record:
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- //ack messages:
- state.ackCumulative(mark.getValue());
- }
- range.processRanges(ackOp);
-}
-
-void SemanticHandler::sendCompletion()
-{
- SequenceNumber mark = incoming.getMark();
- SequenceNumberSet range = incoming.getRange();
- session.getProxy().getExecution().complete(mark.getValue(), range);
-}
-
-void SemanticHandler::flush()
-{
- incoming.flush();
- sendCompletion();
-}
-void SemanticHandler::sync()
-{
- incoming.sync();
- sendCompletion();
-}
-
-void SemanticHandler::noop()
-{
- incoming.noop();
-}
-
-void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
-{
- //never actually sent by client at present
-}
-
-void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
-{
- SequenceNumber id = incoming.next();
- BrokerAdapter adapter(state);
- Invoker::Result invoker = invoke(adapter, *method);
- incoming.complete(id);
-
- if (!invoker.wasHandled()) {
- throw NotImplementedException("Not implemented");
- } else if (invoker.hasResult()) {
- session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
- }
- if (method->isSync()) {
- incoming.sync(id);
- sendCompletion();
- }
- //TODO: if window gets too large send unsolicited completion
-}
-
-void SemanticHandler::handleL3(framing::AMQMethodBody* method)
-{
- if (!invoke(*this, *method))
- throw NotImplementedException("Not implemented");
-}
-
-void SemanticHandler::handleContent(AMQFrame& frame)
-{
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(incoming.next());
- msg = msgBuilder.getMessage();
- }
- msgBuilder.handle(frame);
- if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
- msg->setPublisher(&session.getConnection());
- state.handle(msg);
- msgBuilder.end();
- incoming.track(msg);
- if (msg->getFrames().getMethod()->isSync()) {
- incoming.sync(msg->getCommandId());
- sendCompletion();
- }
- }
-}
-
-DeliveryId SemanticHandler::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
-{
- uint32_t maxFrameSize = session.getConnection().getFrameMax();
- MessageDelivery::deliver(msg, session.getProxy().getHandler(), ++outgoing.hwm, token, maxFrameSize);
- return outgoing.hwm;
-}
-
-SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
-{
- //will be replaced by field in 0-10 frame header
- uint8_t type = frame.getBody()->type();
- uint16_t classId;
- switch(type) {
- case METHOD_BODY:
- if (frame.castBody<AMQMethodBody>()->isContentBearing()) {
- return MODEL_CONTENT_TRACK;
- }
-
- classId = frame.castBody<AMQMethodBody>()->amqpClassId();
- switch (classId) {
- case ExecutionXCompleteBody::CLASS_ID:
- return EXECUTION_CONTROL_TRACK;
- }
-
- return MODEL_COMMAND_TRACK;
- case HEADER_BODY:
- case CONTENT_BODY:
- return MODEL_CONTENT_TRACK;
- }
- throw Exception("Could not determine track");
-}
-
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
deleted file mode 100644
index 893a0cbded..0000000000
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _SemanticHandler_
-#define _SemanticHandler_
-
-#include <memory>
-#include "BrokerAdapter.h"
-#include "DeliveryAdapter.h"
-#include "MessageBuilder.h"
-#include "IncomingExecutionContext.h"
-#include "HandlerImpl.h"
-
-#include "qpid/framing/amqp_types.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-#include <boost/function.hpp>
-
-namespace qpid {
-
-namespace framing {
-class AMQMethodBody;
-class AMQHeaderBody;
-class AMQContentBody;
-class AMQHeaderBody;
-}
-
-namespace broker {
-
-class SessionContext;
-
-class SemanticHandler : public DeliveryAdapter,
- public framing::FrameHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler
-
-{
- typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
-
- SemanticState state;
- SessionContext& session;
- // TODO aconway 2007-09-20: Why are these on the handler rather than the
- // state?
- IncomingExecutionContext incoming;
- framing::Window outgoing;
- MessageBuilder msgBuilder;
- RangedOperation ackOp;
-
- enum TrackId {EXECUTION_CONTROL_TRACK, MODEL_COMMAND_TRACK, MODEL_CONTENT_TRACK};
- TrackId getTrack(const framing::AMQFrame& frame);
-
- void handleL3(framing::AMQMethodBody* method);
- void handleCommand(framing::AMQMethodBody* method);
- void handleContent(framing::AMQFrame& frame);
-
- void sendCompletion();
-
- //delivery adapter methods:
- DeliveryId deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
-
- framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- //Connection& getConnection() { return session.getConnection(); }
- Broker& getBroker() { return session.getConnection().getBroker(); }
-
-public:
- SemanticHandler(SessionContext& session);
-
- //frame handler:
- void handle(framing::AMQFrame& frame);
-
- //execution class method handlers:
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void result(uint32_t command, const std::string& data);
- void sync();
-
-
- SemanticState& getSemanticState() { return state; }
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ab6b82a232..e73540891c 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -19,22 +19,19 @@
*
*/
-#include "SessionContext.h"
-#include "BrokerAdapter.h"
-#include "Queue.h"
+#include "SessionState.h"
#include "Connection.h"
#include "DeliverableMessage.h"
#include "DtxAck.h"
#include "DtxTimeout.h"
#include "Message.h"
-#include "SemanticHandler.h"
-#include "SessionHandler.h"
+#include "Queue.h"
+#include "SessionContext.h"
#include "TxAccept.h"
#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -357,9 +354,7 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
- if (msg->isA<MessageXTransferBody>()) {
- msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName);
- } else if (msg->isA<MessageTransferBody>()) {
+ if (msg->isA<MessageTransferBody>()) {
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index c92b9bb945..e284451d14 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -113,7 +113,7 @@ ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& nam
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
- } catch (const ChannelException& e) {
+ } catch (const NotFoundException& e) {
return ExchangeQueryResult("", false, true, FieldTable());
}
}
@@ -163,7 +163,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
Exchange::shared_ptr exchange;
try {
exchange = getBroker().getExchanges().get(exchangeName);
- } catch (const ChannelException&) {}
+ } catch (const NotFoundException&) {}
Queue::shared_ptr queue;
if (!queueName.empty()) {
@@ -192,7 +192,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han
SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
- destroyExclusiveQueues();
+ try {
+ destroyExclusiveQueues();
+ } catch (std::exception& e) {
+ QPID_LOG(error, e.what());
+ }
}
void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
@@ -370,7 +374,7 @@ void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, u_
state.addByteCredit(destination, value);
} else {
//unknown
- throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for unit " << unit));
}
}
@@ -384,7 +388,7 @@ void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destinat
//window
state.setWindowMode(destination);
} else{
- throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
+ throw InvalidArgumentException(QPID_MSG("Invalid value for mode " << mode));
}
}
@@ -419,19 +423,26 @@ framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const
return MessageAcquireResult(acquisitions);
}
+framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const std::string& /*destination*/,
+ const std::string& /*resumeId*/)
+{
+ throw NotImplementedException("resuming transfers not yet supported");
+}
+
+
void SessionAdapter::ExecutionHandlerImpl::sync()
{
//TODO
}
-void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/)
+void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/)
{
//TODO
}
void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
- uint32_t /*commandId*/,
+ const SequenceNumber& /*commandId*/,
uint8_t /*classCode*/,
uint8_t /*commandCode*/,
uint8_t /*fieldIndex*/,
@@ -470,7 +481,7 @@ void SessionAdapter::DtxHandlerImpl::select()
state.selectDtx();
}
-DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
+XaResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
bool fail,
bool suspend)
{
@@ -480,7 +491,7 @@ DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return DtxEndResult(XA_RBROLLBACK);
+ return XaResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -488,14 +499,14 @@ DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
} else {
state.endDtx(convert(xid), false);
}
- return DtxEndResult(XA_OK);
+ return XaResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return DtxEndResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
+XaResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
bool join,
bool resume)
{
@@ -508,41 +519,41 @@ DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
- return DtxStartResult(XA_OK);
+ return XaResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxStartResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
+XaResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
- return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return XaResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxPrepareResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
+XaResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
- return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return XaResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxCommitResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
-DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
+XaResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
- return DtxRollbackResult(XA_OK);
+ return XaResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxRollbackResult(XA_RBTIMEOUT);
+ return XaResult(XA_RBTIMEOUT);
}
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index b5bf44ceba..4eaaf13f8d 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -54,35 +54,19 @@ class Queue;
public:
SessionAdapter(SemanticState& session);
-
framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
- Message010Handler* getMessage010Handler(){ return &messageImpl; }
- Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
- Queue010Handler* getQueue010Handler(){ return &queueImpl; }
- Execution010Handler* getExecution010Handler(){ return &executionImpl; }
- Tx010Handler* getTx010Handler(){ return &txImpl; }
- Dtx010Handler* getDtx010Handler(){ return &dtxImpl; }
-
- BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
- ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
- AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented"); }
- FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
- StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
- TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); }
- ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ MessageHandler* getMessageHandler(){ return &messageImpl; }
+ ExchangeHandler* getExchangeHandler(){ return &exchangeImpl; }
+ QueueHandler* getQueueHandler(){ return &queueImpl; }
+ ExecutionHandler* getExecutionHandler(){ return &executionImpl; }
+ TxHandler* getTxHandler(){ return &txImpl; }
+ DtxHandler* getDtxHandler(){ return &dtxImpl; }
+
ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); }
SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); }
- Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
- Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
-
- void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); }
+ FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
private:
//common base for utility methods etc that are specific to this adapter
@@ -95,7 +79,7 @@ class Queue;
class ExchangeHandlerImpl :
- public Exchange010Handler,
+ public ExchangeHandler,
public HandlerHelper
{
public:
@@ -124,7 +108,7 @@ class Queue;
shared_ptr<Exchange> alternate);
};
- class QueueHandlerImpl : public Queue010Handler,
+ class QueueHandlerImpl : public QueueHandler,
public HandlerHelper, public OwnershipToken
{
Broker& broker;
@@ -149,7 +133,7 @@ class Queue;
};
class MessageHandlerImpl :
- public Message010Handler,
+ public MessageHandler,
public HandlerHelper
{
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
@@ -196,18 +180,21 @@ class Queue;
void flush(const string& destination);
void stop(const string& destination);
+
+ framing::MessageResumeResult resume(const std::string& destination,
+ const std::string& resumeId);
};
- class ExecutionHandlerImpl : public Execution010Handler, public HandlerHelper
+ class ExecutionHandlerImpl : public ExecutionHandler, public HandlerHelper
{
public:
ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void sync();
- void result(uint32_t commandId, const string& value);
+ void result(const framing::SequenceNumber& commandId, const string& value);
void exception(uint16_t errorCode,
- uint32_t commandId,
+ const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t fieldIndex,
@@ -216,7 +203,7 @@ class Queue;
};
- class TxHandlerImpl : public Tx010Handler, public HandlerHelper
+ class TxHandlerImpl : public TxHandler, public HandlerHelper
{
public:
TxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
@@ -226,7 +213,7 @@ class Queue;
void rollback();
};
- class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper
+ class DtxHandlerImpl : public DtxHandler, public HandlerHelper, private framing::StructHelper
{
std::string convert(const framing::Xid& xid);
@@ -235,26 +222,26 @@ class Queue;
void select();
- framing::DtxStartResult start(const framing::Xid& xid,
+ framing::XaResult start(const framing::Xid& xid,
bool join,
bool resume);
- framing::DtxEndResult end(const framing::Xid& xid,
+ framing::XaResult end(const framing::Xid& xid,
bool fail,
bool suspend);
- framing::DtxCommitResult commit(const framing::Xid& xid,
+ framing::XaResult commit(const framing::Xid& xid,
bool onePhase);
void forget(const framing::Xid& xid);
framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid);
- framing::DtxPrepareResult prepare(const framing::Xid& xid);
+ framing::XaResult prepare(const framing::Xid& xid);
framing::DtxRecoverResult recover();
- framing::DtxRollbackResult rollback(const framing::Xid& xid);
+ framing::XaResult rollback(const framing::Xid& xid);
void setTimeout(const framing::Xid& xid, uint32_t timeout);
};
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index d5caf789c0..f5fa22060f 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -42,7 +42,8 @@ SessionHandler::SessionHandler(Connection& c, ChannelId ch)
connection(c), channel(ch, &c.getOutput()),
proxy(out), // Via my own handleOut() for L2 data.
peerSession(channel), // Direct to channel for L2 commands.
- ignoring(false) {}
+ ignoring(false)
+{}
SessionHandler::~SessionHandler() {}
@@ -52,33 +53,30 @@ MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
void SessionHandler::handleIn(AMQFrame& f) {
- // Note on channel states: a channel is open if session != 0. A
- // channel that is closed (session == 0) can be in the "ignoring"
- // state. This is a temporary state after we have sent a channel
- // exception, where extra frames might arrive that should be
- // ignored.
- //
+ // Note on channel states: a channel is attached if session != 0
AMQMethodBody* m = f.getBody()->getMethod();
try {
- if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
+ if (ignoring && !(m && m->isA<SessionDetachedBody>())) {
+ return;
+ }
+ if (m && isValid(m) && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
+ //frame was a valid session control and has been handled
return;
} else if (session.get()) {
+ //we are attached and frame was not a session control so it is for upper layers
session->handle(f);
- } else if (!ignoring) {
- throw ConnectionException(501, QPID_MSG("Channel " << channel.get() << " is not attached"));
+ } else {
+ throw NotAttachedException(QPID_MSG("Channel " << channel.get() << " is not attached"));
}
+ }catch(const ChannelException& e){
+ QPID_LOG(error, "Session detached due to: " << e.what());
+ peerSession.detached(name, e.code);
+ handleDetach();
+ connection.closeChannel(channel.get());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
- }catch(const SessionException& e){
- //execution.exception will have been sent already
- ignoring = true;
- //peerSession.requestTimeout(0);
- session->setTimeout(0);
- peerSession.detach(name);
- localSuspend();
}catch(const std::exception& e){
- connection.close(
- framing::INTERNAL_ERROR, e.what(), classId(m), methodId(m));
+ connection.close(501, e.what(), classId(m), methodId(m));
}
}
@@ -95,7 +93,7 @@ void SessionHandler::handleOut(AMQFrame& f) {
void SessionHandler::assertAttached(const char* method) const {
if (!session.get()) {
std::cout << "SessionHandler::assertAttached() failed for " << method << std::endl;
- throw ChannelErrorException(
+ throw NotAttachedException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
@@ -103,33 +101,23 @@ void SessionHandler::assertAttached(const char* method) const {
void SessionHandler::assertClosed(const char* method) const {
if (session.get())
- throw ChannelBusyException(
+ throw SessionBusyException(
QPID_MSG(method << " failed: channel " << channel.get()
<< " is already open."));
}
-void SessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getSessionManager().suspend(session);
- session.reset();
- }
-}
-
-
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
//new methods:
void SessionHandler::attach(const std::string& _name, bool /*force*/)
{
- //TODO: need to revise session manager to support resume as well
- assertClosed("attach");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, 0));
name = _name;//TODO: this should be used in conjunction with
//userid for connection as sessions identity
- session.reset(state.release());
+
+ //TODO: need to revise session manager to support resume as well
+ assertClosed("attach");
+ session.reset(new SessionState(0, this, 0, 0));
peerSession.attached(name);
peerSession.commandPoint(session->nextOut, 0);
}
@@ -138,31 +126,46 @@ void SessionHandler::attached(const std::string& _name)
{
name = _name;//TODO: this should be used in conjunction with
//userid for connection as sessions identity
- std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
- session.reset(state.release());
+ session.reset(new SessionState(0, this, 0, 0));
peerSession.commandPoint(session->nextOut, 0);
}
void SessionHandler::detach(const std::string& name)
{
assertAttached("detach");
- localSuspend();
- peerSession.detached(name, 0);
+ peerSession.detached(name, session::NORMAL);
+ handleDetach();
assert(&connection.getChannel(channel.get()) == this);
connection.closeChannel(channel.get());
}
void SessionHandler::detached(const std::string& name, uint8_t code)
{
- ignoring=false;
- session->detach();
- session.reset();
+ ignoring = false;
+ handleDetach();
if (code) {
//no error
} else {
//error occured
QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
}
+ connection.closeChannel(channel.get());
+}
+
+void SessionHandler::handleDetach()
+{
+ if (session.get()) {
+ session->detach();
+ session.reset();
+ }
+}
+
+void SessionHandler::requestDetach()
+{
+ //TODO: request timeout when python can handle it
+ //peerSession.requestTimeout(0);
+ ignoring = true;
+ peerSession.detach(name);
}
void SessionHandler::requestTimeout(uint32_t t)
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index c299c465cf..47c534441a 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -46,7 +46,7 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
+class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
@@ -66,9 +66,8 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
- // Called by closing connection.
- void localSuspend();
- void detach() { localSuspend(); }
+ void requestDetach();
+ void handleDetach();
void sendCompletion();
protected:
@@ -93,9 +92,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void flush(bool expected, bool confirmed, bool completed);
void gap(const framing::SequenceSet& commands);
- //hacks for old generator:
- void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
-
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
void assertClosed(const char* method) const;
@@ -105,7 +101,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session010 peerSession;
+ framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
std::string name;//TODO: this should be part of the session state and replace the id
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index b96d7b5e3f..50938de8ac 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -22,7 +22,6 @@
#include "Broker.h"
#include "ConnectionState.h"
#include "MessageDelivery.h"
-#include "SemanticHandler.h"
#include "SessionManager.h"
#include "SessionHandler.h"
#include "qpid/framing/AMQContentBody.h"
@@ -30,6 +29,7 @@
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -50,6 +50,7 @@ SessionState::SessionState(
factory(f), handler(h), id(true), timeout(timeout_),
broker(h->getConnection().broker),
version(h->getConnection().getVersion()),
+ ignoring(false),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
@@ -154,7 +155,7 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
case management::Session::METHOD_DETACH :
if (handler != 0)
{
- handler->detach();
+ handler->requestDetach();
}
status = Manageable::STATUS_OK;
break;
@@ -188,7 +189,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
nextOut++;//execution result is now a command, so the counter must be incremented
- getProxy().getExecution010().result(id, invocation.getResult());
+ getProxy().getExecution().result(id, invocation.getResult());
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
@@ -242,12 +243,13 @@ void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
completed.add(msg->getCommandId());
if (msg->requiresAccept()) {
nextOut++;//accept is a command, so the counter must be incremented
- getProxy().getMessage010().accept(SequenceSet(msg->getCommandId()));
+ getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
}
}
void SessionState::handle(AMQFrame& frame)
{
+ if (ignoring) return;
received(frame);
SequenceNumber commandId;
@@ -271,11 +273,13 @@ void SessionState::handle(AMQFrame& frame)
AMQMethodBody* m = frame.getMethod();
if (m) {
- getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
+ getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
} else {
- getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
+ getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- throw e;
+ timeout = 0;
+ ignoring = true;
+ handler->requestDetach();
}
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 4fc2ae4cc5..2ec68260a1 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -130,6 +130,7 @@ class SessionState : public framing::SessionState,
Broker& broker;
framing::ProtocolVersion version;
sys::Mutex lock;
+ bool ignoring;
SemanticState semanticState;
SessionAdapter adapter;
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index f32b5e2614..3bcba8983c 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -32,6 +32,7 @@
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace std;
using namespace boost;
@@ -75,7 +76,7 @@ void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
- throw ChannelBusyException();
+ throw SessionBusyException();
active = true;
session = s;
if(isTransactional()) {
@@ -146,7 +147,7 @@ void Channel::consume(
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
- throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag ));
+ throw PreconditionFailedException(QPID_MSG("Consumer already exists with tag " << tag ));
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 83cc357ded..df27942008 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -25,6 +25,7 @@
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ClientInvoker.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::client;
using namespace qpid::framing;
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 2ce36d6991..d7ab97ce31 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -55,9 +55,9 @@ class ConnectionHandler : private StateManager,
public ConnectionProperties,
public ChainableFrameHandler,
public framing::InputHandler,
- private framing::AMQP_ClientOperations::Connection010Handler
+ private framing::AMQP_ClientOperations::ConnectionHandler
{
- typedef framing::AMQP_ClientOperations::Connection010Handler ConnectionOperations;
+ typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
@@ -70,7 +70,7 @@ class ConnectionHandler : private StateManager,
};
Adapter outHandler;
- framing::AMQP_ServerProxy::Connection010 proxy;
+ framing::AMQP_ServerProxy::Connection proxy;
uint16_t errorCode;
std::string errorText;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index d1fd66ff26..ce95e43f58 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -32,6 +32,7 @@ using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
+using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
: connector(c), isClosed(false), isClosing(false)
@@ -39,7 +40,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
- REPLY_SUCCESS, std::string());
+ NORMAL, std::string());
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
@@ -57,7 +58,7 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
{
Mutex::ScopedLock l(lock);
boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
- if (s.lock()) throw ChannelBusyException();
+ if (s.lock()) throw SessionBusyException();
s = session;
}
@@ -74,7 +75,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s = sessions[frame.getChannel()].lock();
}
if (!s)
- throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
+ throw NotAttachedException(QPID_MSG("Invalid channel: " << frame.getChannel()));
s->in(frame);
}
@@ -113,7 +114,7 @@ void ConnectionImpl::close()
Mutex::ScopedUnlock u(lock);
handler.close();
}
- closed(REPLY_SUCCESS, "Closed by client");
+ closed(NORMAL, "Closed by client");
}
// Set closed flags and erase the sessions map, but keep the contents
@@ -149,7 +150,7 @@ void ConnectionImpl::shutdown()
handler.fail(CONN_CLOSED);
Mutex::ScopedUnlock u(lock);
std::for_each(save.begin(), save.end(),
- boost::bind(&SessionImpl::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED));
+ boost::bind(&SessionImpl::connectionBroke, _1, CONNECTION_FORCED, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h
index 5d91f289e2..fc4175ef22 100644
--- a/cpp/src/qpid/client/Session.h
+++ b/cpp/src/qpid/client/Session.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/client/Session_99_0.h"
+#include "qpid/client/Session_0_10.h"
namespace qpid {
namespace client {
@@ -31,7 +31,7 @@ namespace client {
*
* \ingroup clientapi
*/
-typedef Session_99_0 Session;
+typedef Session_0_10 Session;
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index 4f3869319c..571d54df0c 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -30,16 +30,18 @@
#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "qpid/framing/SequenceSet.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-namespace { const std::string OK="ok"; }
+namespace { const std::string EMPTY; }
namespace qpid {
namespace client {
using namespace qpid::framing;
+using namespace qpid::framing::session;//for detach codes
typedef sys::Monitor::ScopedLock Lock;
typedef sys::Monitor::ScopedUnlock UnLock;
@@ -47,8 +49,9 @@ typedef sys::Monitor::ScopedUnlock UnLock;
SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
uint16_t ch, uint64_t _maxFrameSize)
- : code(REPLY_SUCCESS),
- text(OK),
+ : error(OK),
+ code(NORMAL),
+ text(EMPTY),
state(INACTIVE),
syncMode(false),
detachedLifetime(0),
@@ -250,6 +253,7 @@ void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool
void SessionImpl::connectionClosed(uint16_t _code, const std::string& _text)
{
Lock l(state);
+ error = CONNECTION_CLOSE;
code = _code;
text = _text;
setState(DETACHED);
@@ -379,6 +383,7 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
//TODO: proper 0-10 exception handling
QPID_LOG(error, "Session exception:" << e.what());
Lock l(state);
+ error = EXCEPTION;
code = e.code;
text = e.what();
}
@@ -443,6 +448,7 @@ void SessionImpl::detached(const std::string& _name, uint8_t _code)
//TODO: make sure this works with execution.exception - don't
//want to overwrite the code from that
QPID_LOG(error, "Session detached by peer: " << name << " " << code);
+ error = SESSION_DETACH;
code = _code;
text = "Session detached by peer";
}
@@ -545,14 +551,14 @@ void SessionImpl::gap(const framing::SequenceSet& /*commands*/)
void SessionImpl::sync() {}
-void SessionImpl::result(uint32_t commandId, const std::string& value)
+void SessionImpl::result(const framing::SequenceNumber& commandId, const std::string& value)
{
Lock l(state);
results.received(commandId, value);
}
void SessionImpl::exception(uint16_t errorCode,
- uint32_t commandId,
+ const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t /*fieldIndex*/,
@@ -563,6 +569,7 @@ void SessionImpl::exception(uint16_t errorCode,
<< " [caused by " << commandId << " " << classCode << ":" << commandCode << "]");
Lock l(state);
+ error = EXCEPTION;
code = errorCode;
text = description;
if (detachedLifetime) {
@@ -589,8 +596,11 @@ inline void SessionImpl::waitFor(State s) //call with lock held
void SessionImpl::check() const //call with lock held.
{
- if (code != REPLY_SUCCESS) {
- throwReplyException(code, text);
+ switch (error) {
+ case OK: break;
+ case CONNECTION_CLOSE: throw ConnectionException(code, text);
+ case SESSION_DETACH: throw ChannelException(code, text);
+ case EXCEPTION: throwExecutionException(code, text);
}
}
@@ -598,7 +608,7 @@ void SessionImpl::checkOpen() const //call with lock held.
{
check();
if (state != ATTACHED) {
- throwReplyException(0, "Session isn't attached");
+ throw NotAttachedException("Session isn't attached");
}
}
diff --git a/cpp/src/qpid/client/SessionImpl.h b/cpp/src/qpid/client/SessionImpl.h
index 86820dbb92..3b2e80fefd 100644
--- a/cpp/src/qpid/client/SessionImpl.h
+++ b/cpp/src/qpid/client/SessionImpl.h
@@ -54,8 +54,8 @@ class ConnectionImpl;
class SessionImpl : public framing::FrameHandler::InOutHandler,
public Execution,
- private framing::AMQP_ClientOperations::Session010Handler,
- private framing::AMQP_ClientOperations::Execution010Handler
+ private framing::AMQP_ClientOperations::SessionHandler,
+ private framing::AMQP_ClientOperations::ExecutionHandler
{
public:
SessionImpl(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
@@ -95,6 +95,12 @@ public:
void connectionBroke(uint16_t code, const std::string& text);
private:
+ enum ErrorType {
+ OK,
+ CONNECTION_CLOSE,
+ SESSION_DETACH,
+ EXCEPTION
+ };
enum State {
INACTIVE,
ATTACHING,
@@ -102,8 +108,8 @@ private:
DETACHING,
DETACHED
};
- typedef framing::AMQP_ClientOperations::Session010Handler SessionHandler;
- typedef framing::AMQP_ClientOperations::Execution010Handler ExecutionHandler;
+ typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
+ typedef framing::AMQP_ClientOperations::ExecutionHandler ExecutionHandler;
typedef sys::StateMonitor<State, DETACHED> StateMonitor;
typedef StateMonitor::Set States;
@@ -145,19 +151,16 @@ private:
// Note: Following methods are called by network thread in
// response to execution commands from the broker
void sync();
- void result(uint32_t commandId, const std::string& value);
+ void result(const framing::SequenceNumber& commandId, const std::string& value);
void exception(uint16_t errorCode,
- uint32_t commandId,
+ const framing::SequenceNumber& commandId,
uint8_t classCode,
uint8_t commandCode,
uint8_t fieldIndex,
const std::string& description,
const framing::FieldTable& errorInfo);
-
- //hack for old generator:
- void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
-
+ ErrorType error;
int code; // Error code
std::string text; // Error text
mutable StateMonitor state;
@@ -170,7 +173,7 @@ private:
shared_ptr<ConnectionImpl> connection;
framing::ChannelHandler channel;
- framing::AMQP_ServerProxy::Session010 proxy;
+ framing::AMQP_ServerProxy::Session proxy;
Results results;
Demux demux;
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 5152aa2e43..bca6c49c13 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -17,7 +17,7 @@
*/
#include "Cluster.h"
-#include "qpid/broker/PreviewSessionState.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -32,18 +32,18 @@ namespace cluster {
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-using broker::PreviewSessionState;
+using broker::SessionState;
namespace {
// Beginning of inbound chain: send to cluster.
struct ClusterSendHandler : public FrameHandler {
- PreviewSessionState& session;
+ SessionState& session;
Cluster& cluster;
bool busy;
Monitor lock;
- ClusterSendHandler(PreviewSessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+ ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
void handle(AMQFrame& f) {
Mutex::ScopedLock l(lock);
@@ -83,11 +83,11 @@ void insert(FrameHandler::Chain& c, FrameHandler* h) {
c.next = h;
}
-struct SessionObserver : public broker::PreviewSessionManager::Observer {
+struct SessionObserver : public broker::SessionManager::Observer {
Cluster& cluster;
SessionObserver(Cluster& c) : cluster(c) {}
- void opened(PreviewSessionState& s) {
+ void opened(SessionState& s) {
// FIXME aconway 2008-01-29: IList for memory management.
ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 1b0c1b1689..6cc8dd7f78 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -63,7 +63,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- boost::intrusive_ptr<broker::PreviewSessionManager::Observer> getObserver() { return observer; }
+ boost::intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -117,7 +117,7 @@ class Cluster : private sys::Runnable, private Cpg::Handler
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
- boost::intrusive_ptr<broker::PreviewSessionManager::Observer> observer;
+ boost::intrusive_ptr<broker::SessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index eeb658600d..3ebb61feb5 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -73,7 +73,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t flags = buffer.getOctet();
uint8_t framing_version = (flags & 0xc0) >> 6;
if (framing_version != 0)
- throw SyntaxErrorException(QPID_MSG("Framing version unsupported"));
+ throw FramingErrorException(QPID_MSG("Framing version unsupported"));
bof = flags & 0x08;
eof = flags & 0x04;
bos = flags & 0x02;
@@ -81,7 +81,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t type = buffer.getOctet();
uint16_t frame_size = buffer.getShort();
if (frame_size < frameOverhead()-1)
- throw SyntaxErrorException(QPID_MSG("Frame size too small"));
+ throw FramingErrorException(QPID_MSG("Frame size too small"));
uint8_t reserved1 = buffer.getOctet();
uint8_t field1 = buffer.getOctet();
subchannel = field1 & 0x0f;
@@ -92,7 +92,7 @@ bool AMQFrame::decode(Buffer& buffer)
// TODO: should we check reserved2 against zero as well? - the
// spec isn't clear
if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
- throw SyntaxErrorException(QPID_MSG("Reserved bits not zero"));
+ throw FramingErrorException(QPID_MSG("Reserved bits not zero"));
// TODO: should no longer care about body size and only pass up
// B,E,b,e flags
@@ -105,7 +105,7 @@ bool AMQFrame::decode(Buffer& buffer)
body->decode(type,buffer, body_size);
uint8_t end = buffer.getOctet();
if (end != 0xCE)
- throw SyntaxErrorException(QPID_MSG("Frame end not found"));
+ throw FramingErrorException(QPID_MSG("Frame end not found"));
return true;
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 2064468785..c69a768291 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -26,8 +26,6 @@
#include "Buffer.h"
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
-#include "qpid/framing/PreviewDeliveryProperties.h"
-#include "qpid/framing/PreviewMessageProperties.h"
#include <iostream>
#include <boost/optional.hpp>
@@ -77,10 +75,7 @@ class AMQHeaderBody : public AMQBody
};
// Could use boost::mpl::fold to construct a larger set.
- typedef PropSet< PropSet< PropSet<PropSet<Empty, DeliveryProperties>,
- MessageProperties>,
- PreviewDeliveryProperties>,
- PreviewMessageProperties> Properties;
+ typedef PropSet<PropSet<Empty, DeliveryProperties>, MessageProperties> Properties;
Properties properties;
diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp
index 71281c7a52..f0b6331ff3 100644
--- a/cpp/src/qpid/framing/Array.cpp
+++ b/cpp/src/qpid/framing/Array.cpp
@@ -77,7 +77,7 @@ void Array::decode(Buffer& buffer){
uint32_t size = buffer.getLong();//size added only when array is a top-level type
uint32_t available = buffer.available();
if (available < size) {
- throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected "
+ throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected "
<< size << " bytes but only " << available << " available"));
}
if (size) {
@@ -88,7 +88,7 @@ void Array::decode(Buffer& buffer){
dummy.setType(typeOctet);
available = buffer.available();
if (available < count * dummy.getData().size()) {
- throw SyntaxErrorException(QPID_MSG("Not enough data for array, expected "
+ throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected "
<< count << " items of " << dummy.getData().size()
<< " bytes each but only " << available << " bytes available"));
}
@@ -117,7 +117,7 @@ bool Array::operator==(const Array& x) const {
void Array::add(ValuePtr value)
{
if (typeOctet != value->getType()) {
- throw SyntaxErrorException(QPID_MSG("Wrong type of value, expected " << typeOctet));
+ throw IllegalArgumentException(QPID_MSG("Wrong type of value, expected " << typeOctet));
}
values.push_back(value);
}
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index fb84be7cd6..ffbcf33a95 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -48,7 +48,7 @@ void BodyHandler::handleBody(AMQBody* body) {
handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
- throw SyntaxErrorException(
+ throw FramingErrorException(
QPID_MSG("Invalid frame type " << body->type()));
}
}
diff --git a/cpp/src/qpid/framing/BodyHolder.cpp b/cpp/src/qpid/framing/BodyHolder.cpp
index de971b5b28..1b2f74de2c 100644
--- a/cpp/src/qpid/framing/BodyHolder.cpp
+++ b/cpp/src/qpid/framing/BodyHolder.cpp
@@ -59,7 +59,7 @@ void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) {
case CONTENT_BODY: *this=in_place<AMQContentBody>(); break;
case HEARTBEAT_BODY: *this=in_place<AMQHeartbeatBody>(); break;
default:
- throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type));
+ throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type));
}
get()->decode(buffer, size);
}
diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp
index 69168d462a..19c94ffd58 100644
--- a/cpp/src/qpid/framing/Buffer.cpp
+++ b/cpp/src/qpid/framing/Buffer.cpp
@@ -19,7 +19,6 @@
*
*/
#include "Buffer.h"
-#include "FramingContent.h"
#include "FieldTable.h"
#include <string.h>
#include <boost/format.hpp>
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index ac2a0f286d..903c7ed100 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -133,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){
uint32_t len = buffer.getLong();
uint32_t available = buffer.available();
if (available < len)
- throw SyntaxErrorException(QPID_MSG("Not enough data for field table."));
+ throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
uint32_t leftover = available - len;
while(buffer.available() > leftover){
std::string name;
diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp
index 8171a94ef2..681f20a793 100644
--- a/cpp/src/qpid/framing/FieldValue.cpp
+++ b/cpp/src/qpid/framing/FieldValue.cpp
@@ -79,7 +79,7 @@ void FieldValue::setType(uint8_t type)
data.reset(new FixedWidthValue<0>());
break;
default:
- throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
+ throw IllegalArgumentException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
}
}
diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp
deleted file mode 100644
index cd134b0e89..0000000000
--- a/cpp/src/qpid/framing/FramingContent.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "Buffer.h"
-#include "FramingContent.h"
-#include "qpid/Exception.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace framing {
-
-Content::Content() : discriminator(0) {}
-
-Content::Content(uint8_t _discriminator, const string& _value): discriminator(_discriminator), value(_value) {
- validate();
-}
-
-void Content::validate() {
- if (discriminator == REFERENCE) {
- if(value.empty()) {
- throw InvalidArgumentException(
- QPID_MSG("Reference cannot be empty"));
- }
- }else if (discriminator != INLINE) {
- throw SyntaxErrorException(
- QPID_MSG("Invalid discriminator: " << discriminator));
- }
-}
-
-Content::~Content() {}
-
-void Content::encode(Buffer& buffer) const {
- buffer.putOctet(discriminator);
- buffer.putLongString(value);
-}
-
-void Content::decode(Buffer& buffer) {
- discriminator = buffer.getOctet();
- buffer.getLongString(value);
- validate();
-}
-
-size_t Content::size() const {
- return 1/*discriminator*/ + 4/*for recording size of long string*/ + value.size();
-}
-
-std::ostream& operator<<(std::ostream& out, const Content& content) {
- if (content.discriminator == REFERENCE) {
- out << "{REF:" << content.value << "}";
- } else if (content.discriminator == INLINE) {
- out << "{INLINE:" << content.value.size() << " bytes}";
- }
- return out;
-}
-
-}} // namespace framing::qpid
diff --git a/cpp/src/qpid/framing/FramingContent.h b/cpp/src/qpid/framing/FramingContent.h
deleted file mode 100644
index 36f5d641b2..0000000000
--- a/cpp/src/qpid/framing/FramingContent.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _framing_FramingContent_h
-#define _framing_FramingContent_h
-
-#include <ostream>
-
-namespace qpid {
-namespace framing {
-
-class Buffer;
-
-enum discriminator_types { INLINE = 0, REFERENCE = 1 };
-
-/**
- * A representation of the AMQP Content data type (used for message
- * bodies) which can hold inline data or a reference.
- */
-class Content
-{
- uint8_t discriminator;
- string value;
-
- void validate();
-
- public:
- Content();
- Content(uint8_t _discriminator, const string& _value);
- ~Content();
-
- void encode(Buffer& buffer) const;
- void decode(Buffer& buffer);
- size_t size() const;
- bool isInline() const { return discriminator == INLINE; }
- bool isReference() const { return discriminator == REFERENCE; }
- const string& getValue() const { return value; }
- void setValue(const string& newValue) { value = newValue; }
-
- friend std::ostream& operator<<(std::ostream&, const Content&);
-};
-
-}} // namespace qpid::framing
-
-
-#endif /*!_framing_FramingContent_h*/
diff --git a/cpp/src/qpid/framing/ModelMethod.h b/cpp/src/qpid/framing/ModelMethod.h
index 07600aadca..8e4361e761 100644
--- a/cpp/src/qpid/framing/ModelMethod.h
+++ b/cpp/src/qpid/framing/ModelMethod.h
@@ -22,7 +22,7 @@
*
*/
#include "AMQMethodBody.h"
-#include "qpid/framing/ExecutionHeader.h"
+#include "qpid/framing/Header.h"
namespace qpid {
namespace framing {
@@ -30,7 +30,7 @@ namespace framing {
class ModelMethod : public AMQMethodBody
{
- mutable ExecutionHeader header;
+ mutable Header header;
public:
virtual ~ModelMethod() {}
virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); }
@@ -38,8 +38,8 @@ public:
virtual uint32_t headerSize() const { return header.size(); }
virtual bool isSync() const { return header.getSync(); }
virtual void setSync(bool on) const { header.setSync(on); }
- ExecutionHeader& getHeader() { return header; }
- const ExecutionHeader& getHeader() const { return header; }
+ Header& getHeader() { return header; }
+ const Header& getHeader() const { return header; }
};
diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp
index 1b62d296c6..cba00c860a 100644
--- a/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -20,8 +20,10 @@
*/
#include "SequenceNumber.h"
+#include "Buffer.h"
using qpid::framing::SequenceNumber;
+using qpid::framing::Buffer;
SequenceNumber::SequenceNumber() : value(0 - 1) {}
@@ -77,6 +79,20 @@ bool SequenceNumber::operator>=(const SequenceNumber& other) const
return *this == other || *this > other;
}
+void SequenceNumber::encode(Buffer& buffer) const
+{
+ buffer.putLong(value);
+}
+
+void SequenceNumber::decode(Buffer& buffer)
+{
+ value = buffer.getLong();
+}
+
+uint32_t SequenceNumber::size() const {
+ return 4;
+}
+
namespace qpid {
namespace framing {
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index 0ed591b804..d659bec5c1 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -26,6 +26,8 @@
namespace qpid {
namespace framing {
+class Buffer;
+
/**
* 4-byte sequence number that 'wraps around'.
*/
@@ -51,6 +53,10 @@ class SequenceNumber
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ uint32_t size() const;
+
template <class S> void serialize(S& s) { s(value); }
};
diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp
index 2683b0025d..cdf890b7f8 100644
--- a/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/cpp/src/qpid/framing/SequenceSet.cpp
@@ -49,7 +49,7 @@ void SequenceSet::decode(Buffer& buffer)
uint16_t size = buffer.getShort();
uint16_t count = size / RANGE_SIZE;//number of ranges
if (size % RANGE_SIZE)
- throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size));
+ throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size));
for (uint16_t i = 0; i < count; i++) {
add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong()));
diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp
index 2918c48ce3..b58d9fce96 100644
--- a/cpp/src/qpid/framing/Uuid.cpp
+++ b/cpp/src/qpid/framing/Uuid.cpp
@@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const {
void Uuid::decode(Buffer& buf) {
if (buf.available() < size())
- throw SyntaxErrorException(QPID_MSG("Not enough data for UUID."));
+ throw IllegalArgumentException(QPID_MSG("Not enough data for UUID."));
buf.getRawData(c_array(), size());
}
diff --git a/cpp/src/qpid/framing/amqp_types_full.h b/cpp/src/qpid/framing/amqp_types_full.h
index 1ab8777896..d0aaf28cb4 100644
--- a/cpp/src/qpid/framing/amqp_types_full.h
+++ b/cpp/src/qpid/framing/amqp_types_full.h
@@ -31,9 +31,7 @@
#include "amqp_types.h"
#include "Array.h"
-#include "FramingContent.h"
#include "FieldTable.h"
-#include "SequenceNumberSet.h"
#include "SequenceSet.h"
#include "Uuid.h"
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 94e2c025d6..0b69f76a76 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -28,6 +28,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid_test_plugin.h"
#include <iostream>
#include "MessageUtils.h"
@@ -166,7 +167,7 @@ class ExchangeTest : public CppUnit::TestCase
exchanges.destroy("my-exchange");
try {
exchanges.get("my-exchange");
- } catch (const ChannelException&) {}
+ } catch (const NotFoundException&) {}
std::pair<Exchange::shared_ptr, bool> response = exchanges.declare("my-exchange", "direct", false, FieldTable());
CPPUNIT_ASSERT_EQUAL(string("direct"), response.first->getType());
diff --git a/cpp/src/tests/exception_test.cpp b/cpp/src/tests/exception_test.cpp
index 7c68973d4d..f75269c959 100644
--- a/cpp/src/tests/exception_test.cpp
+++ b/cpp/src/tests/exception_test.cpp
@@ -91,7 +91,7 @@ QPID_AUTO_TEST_CASE(DisconnectedListen) {
Thread t(fix.subs);
fix.connection.proxy.close();
t.join();
- BOOST_CHECK_THROW(fix.session.close(), InternalErrorException);
+ BOOST_CHECK_THROW(fix.session.close(), ConnectionException);
}
QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
diff --git a/cpp/src/tests/interop_runner.cpp b/cpp/src/tests/interop_runner.cpp
index 1d77408eff..51dd2cb924 100644
--- a/cpp/src/tests/interop_runner.cpp
+++ b/cpp/src/tests/interop_runner.cpp
@@ -158,7 +158,7 @@ void Listener::sendResponse(Message& response, Message& request)
void Listener::sendResponse(Message& response, ReplyTo replyTo)
{
- string exchange = replyTo.getExchangeName();
+ string exchange = replyTo.getExchange();
string routingKey = replyTo.getRoutingKey();
channel.publish(response, exchange, routingKey);
}
diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests
index ce6b1f3810..e4b70f5ff5 100755
--- a/cpp/src/tests/python_tests
+++ b/cpp/src/tests/python_tests
@@ -14,7 +14,6 @@ run() {
if test -d ${PYTHON_DIR} ; then
cd ${PYTHON_DIR}
run 0-10-errata cpp_failing_0-10.txt
- if test -z "$QPID_NO_PREVIEW" ; then run ../specs/amqp.0-10-preview.xml cpp_failing_0-10_preview.txt; fi
else
echo Warning: python tests not found.
fi