diff options
40 files changed, 494 insertions, 164 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp index c1a8d74237..a3855807c8 100644 --- a/cpp/examples/messaging/client.cpp +++ b/cpp/examples/messaging/client.cpp @@ -41,7 +41,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Sender sender = session.createSender("service_queue"); diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp index 41b9503649..ba613ec364 100644 --- a/cpp/examples/messaging/drain.cpp +++ b/cpp/examples/messaging/drain.cpp @@ -95,7 +95,7 @@ int main(int argc, char** argv) if (options.parse(argc, argv)) { Connection connection(options.url, options.connectionOptions); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(options.address); Duration timeout = options.getTimeout(); diff --git a/cpp/examples/messaging/map_receiver.cpp b/cpp/examples/messaging/map_receiver.cpp index 55c543b90b..6afc4c9ec9 100644 --- a/cpp/examples/messaging/map_receiver.cpp +++ b/cpp/examples/messaging/map_receiver.cpp @@ -40,7 +40,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver("message_queue"); Variant::Map content; diff --git a/cpp/examples/messaging/map_sender.cpp b/cpp/examples/messaging/map_sender.cpp index 2e63c88aa4..eeea9a53b4 100644 --- a/cpp/examples/messaging/map_sender.cpp +++ b/cpp/examples/messaging/map_sender.cpp @@ -39,7 +39,7 @@ int main(int argc, char** argv) { const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672"; Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Sender sender = session.createSender("message_queue"); diff --git a/cpp/examples/messaging/queue_receiver.cpp b/cpp/examples/messaging/queue_receiver.cpp index 43dffd8baf..377ce11993 100644 --- a/cpp/examples/messaging/queue_receiver.cpp +++ b/cpp/examples/messaging/queue_receiver.cpp @@ -33,7 +33,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver("message_queue"); while (true) { diff --git a/cpp/examples/messaging/queue_sender.cpp b/cpp/examples/messaging/queue_sender.cpp index fa355dbf88..0592c30b60 100644 --- a/cpp/examples/messaging/queue_sender.cpp +++ b/cpp/examples/messaging/queue_sender.cpp @@ -36,7 +36,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Sender sender = session.createSender("message_queue"); diff --git a/cpp/examples/messaging/server.cpp b/cpp/examples/messaging/server.cpp index 33d28a75c4..ae1fee4b50 100644 --- a/cpp/examples/messaging/server.cpp +++ b/cpp/examples/messaging/server.cpp @@ -42,7 +42,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver("service_queue; {create: always}"); diff --git a/cpp/examples/messaging/spout.cpp b/cpp/examples/messaging/spout.cpp index 9ed8b642c8..2e9e91bfba 100644 --- a/cpp/examples/messaging/spout.cpp +++ b/cpp/examples/messaging/spout.cpp @@ -158,7 +158,7 @@ int main(int argc, char** argv) if (options.parse(argc, argv)) { Connection connection(options.url, options.connectionOptions); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Sender sender = session.createSender(options.address); diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp index 408920f5aa..96f40a539c 100644 --- a/cpp/examples/messaging/topic_receiver.cpp +++ b/cpp/examples/messaging/topic_receiver.cpp @@ -35,7 +35,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver("news_service; {filter:[control, " + pattern + "]}"); while (true) { diff --git a/cpp/examples/messaging/topic_sender.cpp b/cpp/examples/messaging/topic_sender.cpp index 9d4cd582cf..a95c0951f7 100644 --- a/cpp/examples/messaging/topic_sender.cpp +++ b/cpp/examples/messaging/topic_sender.cpp @@ -53,7 +53,7 @@ int main(int argc, char** argv) { Connection connection(url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); Sender sender = session.createSender("news_service"); diff --git a/cpp/include/qpid/client/SessionBase_0_10.h b/cpp/include/qpid/client/SessionBase_0_10.h index 6b7ed97df4..3b5c84e74b 100644 --- a/cpp/include/qpid/client/SessionBase_0_10.h +++ b/cpp/include/qpid/client/SessionBase_0_10.h @@ -100,9 +100,6 @@ class SessionBase_0_10 { QPID_CLIENT_EXTERN bool isValid() const; QPID_CLIENT_EXTERN Connection getConnection(); - - /** Send sync request without actually blocking for it**/ - QPID_CLIENT_EXTERN void sendSyncRequest(); protected: boost::shared_ptr<SessionImpl> impl; friend class SessionBase_0_10Access; diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h index 34a186c9ce..3722db94e8 100644 --- a/cpp/include/qpid/messaging/Address.h +++ b/cpp/include/qpid/messaging/Address.h @@ -22,7 +22,7 @@ * */ #include <string> -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/types/Variant.h" #include "qpid/messaging/ImportExport.h" #include <ostream> @@ -30,23 +30,6 @@ namespace qpid { namespace messaging { -/** - * Thrown when a syntactically correct address cannot be resolved or - * used. - */ -struct InvalidAddress : public qpid::Exception -{ - InvalidAddress(const std::string& msg); -}; - -/** - * Thrown when an address string with inalid sytanx is used. - */ -struct MalformedAddress : public qpid::Exception -{ - MalformedAddress(const std::string& msg); -}; - class AddressImpl; /** diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h index e58abc1986..23711034d6 100644 --- a/cpp/include/qpid/messaging/Connection.h +++ b/cpp/include/qpid/messaging/Connection.h @@ -24,6 +24,7 @@ #include <string> #include "qpid/messaging/ImportExport.h" #include "qpid/messaging/Handle.h" +#include "qpid/messaging/exceptions.h" #include "qpid/types/Variant.h" namespace qpid { @@ -33,11 +34,6 @@ template <class> class PrivateImplRef; class ConnectionImpl; class Session; -struct InvalidOptionString : public qpid::Exception -{ - InvalidOptionString(const std::string& msg); -}; - class Connection : public qpid::messaging::Handle<ConnectionImpl> { public: @@ -51,6 +47,7 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl> * heartbeat * tcp-nodelay * sasl-mechanism + * sasl-service * sasl-min-ssf * sasl-max-ssf * transport @@ -78,13 +75,12 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl> * * @exception InvalidOptionString if the string does not match the correct syntax */ - QPID_CLIENT_EXTERN Connection(const std::string& url, const std::string& options); + QPID_CLIENT_EXTERN Connection(const std::string& url, const std::string& options) throw(InvalidOptionString); QPID_CLIENT_EXTERN ~Connection(); QPID_CLIENT_EXTERN Connection& operator=(const Connection&); QPID_CLIENT_EXTERN void setOption(const std::string& name, const qpid::types::Variant& value); - QPID_CLIENT_EXTERN void connect(); - QPID_CLIENT_EXTERN bool isConnected(); - QPID_CLIENT_EXTERN void detach(); + QPID_CLIENT_EXTERN void open(); + QPID_CLIENT_EXTERN bool isOpen(); /** * Closes a connection and all sessions associated with it. An * opened connection must be closed before the last handle is diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h index d89813acfc..6926d3401a 100644 --- a/cpp/include/qpid/messaging/Receiver.h +++ b/cpp/include/qpid/messaging/Receiver.h @@ -21,7 +21,7 @@ * under the License. * */ -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/ImportExport.h" #include "qpid/messaging/Handle.h" #include "qpid/messaging/Duration.h" @@ -41,8 +41,6 @@ class Session; class Receiver : public qpid::messaging::Handle<ReceiverImpl> { public: - struct NoMessageAvailable : qpid::Exception {}; - QPID_CLIENT_EXTERN Receiver(ReceiverImpl* impl = 0); QPID_CLIENT_EXTERN Receiver(const Receiver&); QPID_CLIENT_EXTERN ~Receiver(); diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index 95f9832576..b3bc527329 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -21,7 +21,7 @@ * under the License. * */ -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/ImportExport.h" #include "qpid/messaging/Handle.h" @@ -40,11 +40,6 @@ class Receiver; class SessionImpl; class Subscription; -struct KeyError : qpid::Exception -{ - QPID_CLIENT_EXTERN KeyError(const std::string&); -}; - /** * A session represents a distinct 'conversation' which can involve * sending and receiving messages to and from different addresses. @@ -159,6 +154,9 @@ class Session : public qpid::messaging::Handle<SessionImpl> */ QPID_CLIENT_EXTERN Connection getConnection() const; + QPID_CLIENT_EXTERN bool hasError(); + QPID_CLIENT_EXTERN void checkError(); + private: friend class qpid::messaging::PrivateImplRef<Session>; }; diff --git a/cpp/include/qpid/messaging/exceptions.h b/cpp/include/qpid/messaging/exceptions.h new file mode 100644 index 0000000000..af7959fe13 --- /dev/null +++ b/cpp/include/qpid/messaging/exceptions.h @@ -0,0 +1,150 @@ +#ifndef QPID_MESSAGING_EXCEPTIONS_H +#define QPID_MESSAGING_EXCEPTIONS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/types/Exception.h" +#include "qpid/types/Variant.h" +#include "qpid/messaging/ImportExport.h" + +namespace qpid { +namespace messaging { + +struct MessagingException : public qpid::types::Exception +{ + QPID_CLIENT_EXTERN MessagingException(const std::string& msg); + QPID_CLIENT_EXTERN virtual ~MessagingException() throw(); + + qpid::types::Variant::Map detail; + //TODO: override what() to include detail if present +}; + +struct InvalidOptionString : public MessagingException +{ + QPID_CLIENT_EXTERN InvalidOptionString(const std::string& msg); +}; + +struct KeyError : MessagingException +{ + QPID_CLIENT_EXTERN KeyError(const std::string&); +}; + +struct LinkError : MessagingException +{ + QPID_CLIENT_EXTERN LinkError(const std::string&); +}; + +struct AddressError : LinkError +{ + QPID_CLIENT_EXTERN AddressError(const std::string&); +}; + +/** + * Thrown when a syntactically correct address cannot be resolved or + * used. + */ +struct ResolutionError : public AddressError +{ + QPID_CLIENT_EXTERN ResolutionError(const std::string& msg); +}; + +struct AssertionFailed : public ResolutionError +{ + QPID_CLIENT_EXTERN AssertionFailed(const std::string& msg); +}; + +struct NotFound : public ResolutionError +{ + QPID_CLIENT_EXTERN NotFound(const std::string& msg); +}; + +/** + * Thrown when an address string with inalid sytanx is used. + */ +struct MalformedAddress : public AddressError +{ + QPID_CLIENT_EXTERN MalformedAddress(const std::string& msg); +}; + +struct ReceiverError : LinkError +{ + QPID_CLIENT_EXTERN ReceiverError(const std::string&); +}; + +struct FetchError : ReceiverError +{ + QPID_CLIENT_EXTERN FetchError(const std::string&); +}; + +struct NoMessageAvailable : FetchError +{ + QPID_CLIENT_EXTERN NoMessageAvailable(); +}; + +struct SenderError : LinkError +{ + QPID_CLIENT_EXTERN SenderError(const std::string&); +}; + +struct SendError : SenderError +{ + QPID_CLIENT_EXTERN SendError(const std::string&); +}; + +struct TargetCapacityExceeded : SendError +{ + QPID_CLIENT_EXTERN TargetCapacityExceeded(const std::string&); +}; + +struct SessionError : MessagingException +{ + QPID_CLIENT_EXTERN SessionError(const std::string&); +}; + +struct TransactionError : SessionError +{ + QPID_CLIENT_EXTERN TransactionError(const std::string&); +}; + +struct TransactionAborted : TransactionError +{ + QPID_CLIENT_EXTERN TransactionAborted(const std::string&); +}; + +struct UnauthorizedAccess : SessionError +{ + QPID_CLIENT_EXTERN UnauthorizedAccess(const std::string&); +}; + +struct ConnectionError : MessagingException +{ + QPID_CLIENT_EXTERN ConnectionError(const std::string&); +}; + +struct TransportFailure : MessagingException +{ + QPID_CLIENT_EXTERN TransportFailure(const std::string&); +}; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_EXCEPTIONS_H*/ diff --git a/cpp/include/qpid/types/Exception.h b/cpp/include/qpid/types/Exception.h new file mode 100644 index 0000000000..a8b7d128af --- /dev/null +++ b/cpp/include/qpid/types/Exception.h @@ -0,0 +1,44 @@ +#ifndef QPID_TYPES_EXCEPTION_H +#define QPID_TYPES_EXCEPTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "qpid/CommonImportExport.h" + +namespace qpid { +namespace types { + +class Exception : public std::exception +{ + public: + QPID_COMMON_EXTERN explicit Exception(const std::string& message=std::string()) throw(); + QPID_COMMON_EXTERN virtual ~Exception() throw(); + QPID_COMMON_EXTERN virtual const char* what() const throw(); + + private: + const std::string message; +}; + +}} // namespace qpid::types + +#endif /*!QPID_TYPES_EXCEPTION_H*/ diff --git a/cpp/include/qpid/types/Variant.h b/cpp/include/qpid/types/Variant.h index 91e37242d0..059550bc9c 100644 --- a/cpp/include/qpid/types/Variant.h +++ b/cpp/include/qpid/types/Variant.h @@ -26,7 +26,7 @@ #include <ostream> #include <string> #include "Uuid.h" -#include "qpid/Exception.h" +#include "qpid/types/Exception.h" #include "qpid/sys/IntegerTypes.h" #include "qpid/CommonImportExport.h" @@ -36,7 +36,7 @@ namespace types { /** * Thrown when an illegal conversion of a variant is attempted. */ -struct InvalidConversion : public qpid::Exception +struct InvalidConversion : public Exception { InvalidConversion(const std::string& msg); }; diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 164b02d822..2edbf96205 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -584,6 +584,7 @@ set (qpidcommon_SOURCES qpid/SessionId.cpp qpid/StringUtils.cpp qpid/Url.cpp + qpid/types/Exception.cpp qpid/types/Uuid.cpp qpid/types/Variant.cpp qpid/amqp_0_10/Codecs.cpp @@ -698,6 +699,7 @@ set (qpidclient_SOURCES qpid/messaging/Connection.cpp qpid/messaging/ConnectionImpl.h qpid/messaging/Duration.cpp + qpid/messaging/exceptions.cpp qpid/messaging/Message.cpp qpid/messaging/MessageImpl.h qpid/messaging/MessageImpl.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 427148dfbf..6692701502 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -426,6 +426,7 @@ libqpidcommon_la_SOURCES += \ qpid/memory.h \ qpid/pointer_to_other.h \ qpid/ptr_map.h \ + qpid/types/Exception.cpp \ qpid/types/Uuid.cpp \ qpid/types/Variant.cpp \ qpid/amqp_0_10/Codecs.cpp \ @@ -710,6 +711,7 @@ libqpidclient_la_SOURCES = \ qpid/messaging/AddressParser.cpp \ qpid/messaging/Connection.cpp \ qpid/messaging/Duration.cpp \ + qpid/messaging/exceptions.cpp \ qpid/messaging/Message.cpp \ qpid/messaging/MessageImpl.h \ qpid/messaging/MessageImpl.cpp \ @@ -818,12 +820,14 @@ nobase_include_HEADERS += \ ../include/qpid/messaging/Address.h \ ../include/qpid/messaging/Connection.h \ ../include/qpid/messaging/Duration.h \ + ../include/qpid/messaging/exceptions.h \ ../include/qpid/messaging/Handle.h \ ../include/qpid/messaging/ImportExport.h \ ../include/qpid/messaging/Message.h \ ../include/qpid/messaging/Receiver.h \ ../include/qpid/messaging/Sender.h \ ../include/qpid/messaging/Session.h \ + ../include/qpid/types/Exception.h \ ../include/qpid/types/Uuid.h \ ../include/qpid/types/Variant.h \ ../include/qpid/client/amqp0_10/FailoverUpdates.h diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp index 6aa13bb579..e114b7aacc 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -65,13 +65,6 @@ void SessionBase_0_10::sendCompletion() impl->sendCompletion(); } -void SessionBase_0_10::sendSyncRequest() -{ - ExecutionSyncBody b; - b.setSync(true); - impl->send(b); -} - uint16_t SessionBase_0_10::getChannel() const { return impl->getChannel(); } void SessionBase_0_10::suspend() { impl->suspend(); } diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index f64a46ba01..43b581861f 100644 --- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -26,7 +26,7 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/types/Variant.h" -#include "qpid/Exception.h" +#include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" #include "qpid/framing/enum.h" #include "qpid/framing/ExchangeBoundResult.h" @@ -45,7 +45,10 @@ namespace amqp0_10 { using qpid::Exception; using qpid::messaging::Address; -using qpid::messaging::InvalidAddress; +using qpid::messaging::MalformedAddress; +using qpid::messaging::ResolutionError; +using qpid::messaging::NotFound; +using qpid::messaging::AssertionFailed; using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; @@ -360,7 +363,7 @@ bool AddressResolution::is_reliable(const Address& address) std::string checkAddressType(qpid::client::Session session, const Address& address) { if (address.getName().empty()) { - throw InvalidAddress("Name cannot be null"); + throw MalformedAddress("Name cannot be null"); } std::string type = (Opt(address)/NODE/TYPE).str(); if (type.empty()) { @@ -376,7 +379,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre type = TOPIC_ADDRESS; } else { //both a queue and exchange exist for that name - throw InvalidAddress("Ambiguous address, please specify queue or topic as node type"); + throw ResolutionError("Ambiguous address, please specify queue or topic as node type"); } } return type; @@ -396,7 +399,7 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess QPID_LOG(debug, "treating source address as queue: " << address); return source; } else { - throw InvalidAddress("Unrecognised type: " + type); + throw ResolutionError("Unrecognised type: " + type); } } @@ -414,7 +417,7 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session QPID_LOG(debug, "treating target address as queue: " << address); return sink; } else { - throw InvalidAddress("Unrecognised type: " + type); + throw ResolutionError("Unrecognised type: " + type); } } @@ -424,7 +427,7 @@ bool isBrowse(const Address& address) if (!mode.isVoid()) { std::string value = mode.asString(); if (value == BROWSE) return true; - else if (value != CONSUME) throw InvalidAddress("Invalid mode"); + else if (value != CONSUME) throw ResolutionError("Invalid mode"); } return false; } @@ -516,7 +519,7 @@ void Subscription::bindAll() b.arguments.setString("x-match", "all"); bindings.push_back(b); } else { //E.g. direct and xml - throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); + throw ResolutionError(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType)); } } @@ -662,23 +665,26 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(createPolicy, mode)) { QPID_LOG(debug, "Auto-creating queue '" << name << "'"); try { - sync(session).queueDeclare(arg::queue=name, - arg::durable=durable, - arg::autoDelete=autoDelete, - arg::exclusive=exclusive, - arg::alternateExchange=alternateExchange, - arg::arguments=arguments); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str()); + session.queueDeclare(arg::queue=name, + arg::durable=durable, + arg::autoDelete=autoDelete, + arg::exclusive=exclusive, + arg::alternateExchange=alternateExchange, + arg::arguments=arguments); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::ResourceLockedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings + throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str()); } - nodeBindings.bind(session); } else { try { sync(session).queueDeclare(arg::queue=name, arg::passive=true); } catch (const qpid::framing::NotFoundException& /*e*/) { - throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str()); - } catch (const std::exception& e) { - throw InvalidAddress(e.what()); + throw NotFound((boost::format("Queue %1% does not exist") % name).str()); } } } @@ -700,27 +706,27 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(assertPolicy, mode)) { QueueQueryResult result = sync(session).queueQuery(name); if (result.getQueue() != name) { - throw InvalidAddress((boost::format("Queue not found: %1%") % name).str()); + throw NotFound((boost::format("Queue not found: %1%") % name).str()); } else { if (durable && !result.getDurable()) { - throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str()); } if (autoDelete && !result.getAutoDelete()) { - throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str()); } if (exclusive && !result.getExclusive()) { - throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str()); + throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str()); } if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) { - throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") + throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%") % name % alternateExchange % result.getAlternateExchange()).str()); } for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { - throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); } else if (*i->second != *v) { - throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } } @@ -746,23 +752,24 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode) try { std::string type = specifiedType; if (type.empty()) type = TOPIC_EXCHANGE; - sync(session).exchangeDeclare(arg::exchange=name, + session.exchangeDeclare(arg::exchange=name, arg::type=type, arg::durable=durable, arg::autoDelete=autoDelete, arg::alternateExchange=alternateExchange, arg::arguments=arguments); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str()); + nodeBindings.bind(session); + session.sync(); + } catch (const qpid::framing::NotAllowedException& e) { + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); + } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings + throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str()); } - nodeBindings.bind(session); } else { try { sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true); } catch (const qpid::framing::NotFoundException& /*e*/) { - throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str()); - } catch (const std::exception& e) { - throw InvalidAddress(e.what()); + throw NotFound((boost::format("Exchange %1% does not exist") % name).str()); } } } @@ -784,14 +791,14 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) if (enabled(assertPolicy, mode)) { ExchangeQueryResult result = sync(session).exchangeQuery(name); if (result.getNotFound()) { - throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str()); + throw NotFound((boost::format("Exchange not found: %1%") % name).str()); } else { if (specifiedType.size() && result.getType() != specifiedType) { - throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") + throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%") % name % specifiedType % result.getType()).str()); } if (durable && !result.getDurable()) { - throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str()); + throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str()); } //Note: Can't check auto-delete or alternate-exchange via //exchange-query-result as these are not returned @@ -799,9 +806,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) { FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { - throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str()); + throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); } else if (i->second != v) { - throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") + throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } } @@ -844,16 +851,11 @@ void Bindings::setDefaultQueue(const std::string& queue) void Bindings::bind(qpid::client::AsyncSession& session) { - try { - for (Bindings::const_iterator i = begin(); i != end(); ++i) { - session.exchangeBind(arg::queue=i->queue, - arg::exchange=i->exchange, - arg::bindingKey=i->key, - arg::arguments=i->arguments); - } - session.sync(); - } catch (const qpid::Exception& e) { - throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str()); + for (Bindings::const_iterator i = begin(); i != end(); ++i) { + session.exchangeBind(arg::queue=i->queue, + arg::exchange=i->exchange, + arg::bindingKey=i->key, + arg::arguments=i->arguments); } } @@ -873,7 +875,7 @@ void Bindings::check(qpid::client::AsyncSession& session) arg::exchange=i->exchange, arg::bindingKey=i->key); if (result.getQueueNotMatched() || result.getKeyNotMatched()) { - throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") + throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]") % i->exchange % i->queue % i->key).str()); } } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp index 30b75ff4ff..7e5018fc5f 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp @@ -21,10 +21,12 @@ #include "ConnectionImpl.h" #include "SessionImpl.h" #include "SimpleUrlParser.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/Session.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/Url.h" #include <boost/intrusive_ptr.hpp> #include <vector> @@ -97,7 +99,7 @@ void convert(const Variant::Map& from, ConnectionSettings& to) ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) : reconnect(true), timeout(-1), limit(-1), minReconnectInterval(3), maxReconnectInterval(60), - retries(0) + retries(0), reconnectOnLimitExceeded(true) { QPID_LOG(debug, "Created connection with " << options); setOptions(options); @@ -117,7 +119,8 @@ void ConnectionImpl::setOptions(const Variant::Map& options) setIfFound(options, "reconnect-interval-min", minReconnectInterval); setIfFound(options, "reconnect-interval-max", maxReconnectInterval); } - setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "reconnect-urls", urls); + setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded); } void ConnectionImpl::setOption(const std::string& name, const Variant& value) @@ -147,7 +150,7 @@ void ConnectionImpl::detach() connection.close(); } -bool ConnectionImpl::isConnected() +bool ConnectionImpl::isOpen() { qpid::sys::Mutex::ScopedLock l(lock); return connection.isOpen(); @@ -192,13 +195,13 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st } try { getImplPtr(impl)->setSession(connection.newSession(name)); - } catch (const TransportFailure&) { - connect(); + } catch (const qpid::TransportFailure&) { + open(); } return impl; } -void ConnectionImpl::connect() +void ConnectionImpl::open() { qpid::sys::AbsTime start = qpid::sys::now(); qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore); @@ -217,9 +220,15 @@ bool expired(const qpid::sys::AbsTime& start, int64_t timeout) void ConnectionImpl::connect(const qpid::sys::AbsTime& started) { for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) { - if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)"); - if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit"); - if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout"); + if (!reconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + if (limit >= 0 && retries++ >= limit) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); + } + if (expired(started, timeout)) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); + } else qpid::sys::sleep(i); } retries = 0; @@ -246,7 +255,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls) } QPID_LOG(info, "Connected to " << *i); return true; - } catch (const Exception& e) { + } catch (const qpid::Exception& e) { //TODO: need to fix timeout on //qpid::client::Connection::open() so that it throws //TransportFailure rather than a ConnectionException @@ -264,8 +273,27 @@ bool ConnectionImpl::resetSessions() getImplPtr(i->second)->setSession(connection.newSession(i->first)); } return true; - } catch (const TransportFailure&) { - QPID_LOG(debug, "Connection failed while re-inialising sessions"); + } catch (const qpid::TransportFailure&) { + QPID_LOG(debug, "Connection failed while re-initialising sessions"); + return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (reconnectOnLimitExceeded) { + QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what()); + detach(); + return false; + } else { + throw qpid::messaging::TargetCapacityExceeded(e.what()); + } + } +} + +bool ConnectionImpl::backoff() +{ + if (reconnectOnLimitExceeded) { + detach(); + open(); + return true; + } else { return false; } } diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h index 9d992c1375..b6fd33cc49 100644 --- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h @@ -23,7 +23,6 @@ */ #include "qpid/messaging/ConnectionImpl.h" #include "qpid/types/Variant.h" -#include "qpid/Url.h" #include "qpid/client/Connection.h" #include "qpid/client/ConnectionSettings.h" #include "qpid/sys/Mutex.h" @@ -40,14 +39,15 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl { public: ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options); - void connect(); - bool isConnected(); + void open(); + bool isOpen(); void close(); qpid::messaging::Session newSession(bool transactional, const std::string& name); qpid::messaging::Session getSession(const std::string& name) const; void closed(SessionImpl&); void detach(); void setOption(const std::string& name, const qpid::types::Variant& value); + bool backoff(); private: typedef std::map<std::string, qpid::messaging::Session> Sessions; @@ -63,6 +63,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl int64_t minReconnectInterval; int64_t maxReconnectInterval; int32_t retries; + bool reconnectOnLimitExceeded; void setOptions(const qpid::types::Variant::Map& options); void connect(const qpid::sys::AbsTime& started); diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index c3367f8ab4..343b5cad37 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -22,6 +22,7 @@ #include "AddressResolution.h" #include "MessageSource.h" #include "SessionImpl.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" @@ -29,6 +30,7 @@ namespace qpid { namespace client { namespace amqp0_10 { +using qpid::messaging::NoMessageAvailable; using qpid::messaging::Receiver; using qpid::messaging::Duration; @@ -44,14 +46,14 @@ void ReceiverImpl::received(qpid::messaging::Message&) qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; - if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); + if (!get(result, timeout)) throw NoMessageAvailable(); return result; } qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; - if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); + if (!fetch(result, timeout)) throw NoMessageAvailable(); return result; } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 969ad93da9..33a3e226ff 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -24,6 +24,8 @@ #include "qpid/client/amqp0_10/SenderImpl.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" @@ -34,12 +36,15 @@ #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" -#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> #include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> using qpid::messaging::KeyError; +using qpid::messaging::NoMessageAvailable; +using qpid::messaging::MessagingException; +using qpid::messaging::TransactionAborted; +using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; using qpid::messaging::Receiver; @@ -50,6 +55,11 @@ namespace amqp0_10 { SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} +void SessionImpl::checkError() +{ + qpid::client::SessionBase_0_10Access s(session); + s.get()->assertOpen(); +} void SessionImpl::sync(bool block) { @@ -60,7 +70,7 @@ void SessionImpl::sync(bool block) void SessionImpl::commit() { if (!execute<Commit>()) { - throw Exception();//TODO: what type? + throw TransactionAborted("Transaction aborted due to transport failure"); } } @@ -141,6 +151,7 @@ void SessionImpl::setSession(qpid::client::Session s) for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); } + session.sync(); } struct SessionImpl::CreateReceiver : Command @@ -219,7 +230,7 @@ SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) { boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); if (!impl) { - throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); + throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); } return *impl; } @@ -297,7 +308,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag if (incoming.getNextDestination(destination, adjust(timeout))) { Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { - throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination)); + throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); } else { receiver = i->second; } @@ -307,6 +318,17 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } } catch (TransportFailure&) { reconnect(); + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::SessionException& e) { + throw qpid::messaging::SessionError(e.what()); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); } } } @@ -314,8 +336,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout) { qpid::messaging::Receiver receiver; - if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable(); - if (!receiver) throw qpid::Exception("Bad receiver returned!"); + if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable(); + if (!receiver) throw SessionError("Bad receiver returned!"); return receiver; } @@ -377,7 +399,7 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { if (block) session.sync(); - else session.sendSyncRequest(); + else session.flush(); } void SessionImpl::commitImpl() @@ -435,7 +457,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->connect(); + connection->open(); +} + +bool SessionImpl::backoff() +{ + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 8b098e65d6..e1229055f7 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -23,11 +23,13 @@ */ #include "qpid/messaging/SessionImpl.h" #include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" #include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/sys/Mutex.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/intrusive_ptr.hpp> namespace qpid { @@ -73,6 +75,7 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout); qpid::messaging::Connection getConnection() const; + void checkError(); bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); @@ -93,9 +96,20 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::sys::Mutex::ScopedLock l(lock); f(); return true; - } catch (TransportFailure&) { + } catch (const qpid::TransportFailure&) { reconnect(); return false; + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::SessionException& e) { + throw qpid::messaging::SessionError(e.what()); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); } } @@ -118,6 +132,7 @@ class SessionImpl : public qpid::messaging::SessionImpl bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); + bool backoff(); void commitImpl(); void rollbackImpl(); diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp index 0c522888e7..a516959edb 100644 --- a/cpp/src/qpid/messaging/Address.cpp +++ b/cpp/src/qpid/messaging/Address.cpp @@ -148,8 +148,4 @@ std::ostream& operator<<(std::ostream& out, const Address& address) return out; } -InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {} - -MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {} - }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp index 81a72bb876..14e713d425 100644 --- a/cpp/src/qpid/messaging/Connection.cpp +++ b/cpp/src/qpid/messaging/Connection.cpp @@ -39,7 +39,7 @@ Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::cop Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); } Connection::~Connection() { PI::dtor(*this); } -Connection::Connection(const std::string& url, const std::string& o) +Connection::Connection(const std::string& url, const std::string& o) throw(InvalidOptionString) { Variant::Map options; AddressParser parser(o); @@ -54,9 +54,8 @@ Connection::Connection(const std::string& url, const Variant::Map& options) PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options)); } -void Connection::connect() { impl->connect(); } -bool Connection::isConnected() { return impl->isConnected(); } -void Connection::detach() { impl->detach(); } +void Connection::open() { impl->open(); } +bool Connection::isOpen() { return impl->isOpen(); } void Connection::close() { impl->close(); } Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); } Session Connection::createTransactionalSession(const std::string& name) @@ -69,6 +68,4 @@ void Connection::setOption(const std::string& name, const Variant& value) impl->setOption(name, value); } -InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {} - }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h index 148105476d..23ab5272d0 100644 --- a/cpp/src/qpid/messaging/ConnectionImpl.h +++ b/cpp/src/qpid/messaging/ConnectionImpl.h @@ -38,9 +38,8 @@ class ConnectionImpl : public virtual qpid::RefCounted { public: virtual ~ConnectionImpl() {} - virtual void connect() = 0; - virtual bool isConnected() = 0; - virtual void detach() = 0; + virtual void open() = 0; + virtual bool isOpen() = 0; virtual void close() = 0; virtual Session newSession(bool transactional, const std::string& name) = 0; virtual Session getSession(const std::string& name) const = 0; diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp index 753d3cec1b..1a04abe15b 100644 --- a/cpp/src/qpid/messaging/Message.cpp +++ b/cpp/src/qpid/messaging/Message.cpp @@ -111,13 +111,13 @@ template <class C> struct MessageCodec checkEncoding(requested) || checkEncoding(message.getContentType()); } - template <class T> static void decode(const Message& message, T& object, const std::string& encoding) + static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding) { checkEncoding(message, encoding); C::decode(message.getContent(), object); } - template <class T> static void encode(const T& map, Message& message, const std::string& encoding) + static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding) { checkEncoding(message, encoding); std::string content; diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index eb5e3766b8..bd9048e893 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -94,6 +94,15 @@ Connection Session::getConnection() const return impl->getConnection(); } -KeyError::KeyError(const std::string& msg) : Exception(msg) {} +void Session::checkError() { impl->checkError(); } +bool Session::hasError() +{ + try { + checkError(); + return false; + } catch (const std::exception&) { + return true; + } +} }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index 7acead5b04..e9a200b22e 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -54,6 +54,7 @@ class SessionImpl : public virtual qpid::RefCounted virtual Sender getSender(const std::string& name) const = 0; virtual Receiver getReceiver(const std::string& name) const = 0; virtual Connection getConnection() const = 0; + virtual void checkError() = 0; private: }; }} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/exceptions.cpp b/cpp/src/qpid/messaging/exceptions.cpp new file mode 100644 index 0000000000..5d2683fffe --- /dev/null +++ b/cpp/src/qpid/messaging/exceptions.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/exceptions.h" + +namespace qpid { +namespace messaging { + +MessagingException::MessagingException(const std::string& msg) : qpid::types::Exception(msg) {} +MessagingException::~MessagingException() throw() {} + +InvalidOptionString::InvalidOptionString(const std::string& msg) : MessagingException(msg) {} +KeyError::KeyError(const std::string& msg) : MessagingException(msg) {} + + +LinkError::LinkError(const std::string& msg) : MessagingException(msg) {} + +AddressError::AddressError(const std::string& msg) : LinkError(msg) {} +ResolutionError::ResolutionError(const std::string& msg) : AddressError(msg) {} +MalformedAddress::MalformedAddress(const std::string& msg) : AddressError(msg) {} +AssertionFailed::AssertionFailed(const std::string& msg) : ResolutionError(msg) {} +NotFound::NotFound(const std::string& msg) : ResolutionError(msg) {} + +ReceiverError::ReceiverError(const std::string& msg) : LinkError(msg) {} +FetchError::FetchError(const std::string& msg) : ReceiverError(msg) {} +NoMessageAvailable::NoMessageAvailable() : FetchError("No message to fetch") {} + +SenderError::SenderError(const std::string& msg) : LinkError(msg) {} +SendError::SendError(const std::string& msg) : SenderError(msg) {} +TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {} + +SessionError::SessionError(const std::string& msg) : MessagingException(msg) {} +TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {} +TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {} +UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {} + +ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {} + +TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {} + +}} // namespace qpid::messaging diff --git a/cpp/src/qpid/types/Exception.cpp b/cpp/src/qpid/types/Exception.cpp new file mode 100644 index 0000000000..71390e6abd --- /dev/null +++ b/cpp/src/qpid/types/Exception.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Exception.h" + +namespace qpid { +namespace types { + +Exception::Exception(const std::string& msg) throw() : message(msg) {} +Exception::~Exception() throw() {} +const char* Exception::what() const throw() { return message.c_str(); } + +}} // namespace qpid::types diff --git a/cpp/src/qpid/types/Variant.cpp b/cpp/src/qpid/types/Variant.cpp index 904a596e82..2126c67fec 100644 --- a/cpp/src/qpid/types/Variant.cpp +++ b/cpp/src/qpid/types/Variant.cpp @@ -29,13 +29,13 @@ namespace qpid { namespace types { -InvalidConversion::InvalidConversion(const std::string& msg) : Exception(msg) {} - - namespace { -std::string EMPTY; +const std::string EMPTY; +const std::string PREFIX("invalid conversion: "); } +InvalidConversion::InvalidConversion(const std::string& msg) : Exception(PREFIX + msg) {} + class VariantImpl { public: diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index bdd5422690..aa0c65d319 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -116,7 +116,7 @@ struct MessagingFixture : public BrokerFixture static Connection open(uint16_t port) { Connection connection((boost::format("amqp:tcp:localhost:%1%") % (port)).str()); - connection.connect(); + connection.open(); return connection; } @@ -266,20 +266,20 @@ QPID_AUTO_TEST_CASE(testSenderError) { MessagingFixture fix; ScopedSuppressLogging sl; - BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::NotFound); fix.session = fix.connection.createSession(); BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver}"), - qpid::messaging::InvalidAddress); + qpid::messaging::NotFound); } QPID_AUTO_TEST_CASE(testReceiverError) { MessagingFixture fix; ScopedSuppressLogging sl; - BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::NotFound); fix.session = fix.connection.createSession(); BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender}"), - qpid::messaging::InvalidAddress); + qpid::messaging::NotFound); } QPID_AUTO_TEST_CASE(testSimpleTopic) @@ -766,10 +766,10 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); - BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed); std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}"; - BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress); + BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed); Receiver r3 = fix.session.createReceiver(a3); r3.close(); diff --git a/cpp/src/tests/qpid_receive.cpp b/cpp/src/tests/qpid_receive.cpp index 77e9cd180a..46f3db6718 100644 --- a/cpp/src/tests/qpid_receive.cpp +++ b/cpp/src/tests/qpid_receive.cpp @@ -161,7 +161,7 @@ int main(int argc, char ** argv) if (opts.parse(argc, argv)) { Connection connection(opts.url, opts.connectionOptions); try { - connection.connect(); + connection.open(); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); Receiver receiver = session.createReceiver(opts.address); diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index e51c5a93d2..914f910224 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -218,7 +218,7 @@ int main(int argc, char ** argv) if (opts.parse(argc, argv)) { Connection connection(opts.url, opts.connectionOptions); try { - connection.connect(); + connection.open(); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); Sender sender = session.createSender(opts.address); diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index b3fe493922..4305ee8c49 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -90,7 +90,7 @@ struct Client : qpid::sys::Runnable { Connection connection(opts.url); try { - connection.connect(); + connection.open(); Session session = connection.createSession(); doWork(session); session.close(); |