summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/examples/messaging/client.cpp2
-rw-r--r--cpp/examples/messaging/drain.cpp2
-rw-r--r--cpp/examples/messaging/map_receiver.cpp2
-rw-r--r--cpp/examples/messaging/map_sender.cpp2
-rw-r--r--cpp/examples/messaging/queue_receiver.cpp2
-rw-r--r--cpp/examples/messaging/queue_sender.cpp2
-rw-r--r--cpp/examples/messaging/server.cpp2
-rw-r--r--cpp/examples/messaging/spout.cpp2
-rw-r--r--cpp/examples/messaging/topic_receiver.cpp2
-rw-r--r--cpp/examples/messaging/topic_sender.cpp2
-rw-r--r--cpp/include/qpid/client/SessionBase_0_10.h3
-rw-r--r--cpp/include/qpid/messaging/Address.h19
-rw-r--r--cpp/include/qpid/messaging/Connection.h14
-rw-r--r--cpp/include/qpid/messaging/Receiver.h4
-rw-r--r--cpp/include/qpid/messaging/Session.h10
-rw-r--r--cpp/include/qpid/messaging/exceptions.h150
-rw-r--r--cpp/include/qpid/types/Exception.h44
-rw-r--r--cpp/include/qpid/types/Variant.h4
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/Makefile.am4
-rw-r--r--cpp/src/qpid/client/SessionBase_0_10.cpp7
-rw-r--r--cpp/src/qpid/client/amqp0_10/AddressResolution.cpp102
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp52
-rw-r--r--cpp/src/qpid/client/amqp0_10/ConnectionImpl.h7
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp6
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp43
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h17
-rw-r--r--cpp/src/qpid/messaging/Address.cpp4
-rw-r--r--cpp/src/qpid/messaging/Connection.cpp9
-rw-r--r--cpp/src/qpid/messaging/ConnectionImpl.h5
-rw-r--r--cpp/src/qpid/messaging/Message.cpp4
-rw-r--r--cpp/src/qpid/messaging/Session.cpp11
-rw-r--r--cpp/src/qpid/messaging/SessionImpl.h1
-rw-r--r--cpp/src/qpid/messaging/exceptions.cpp58
-rw-r--r--cpp/src/qpid/types/Exception.cpp30
-rw-r--r--cpp/src/qpid/types/Variant.cpp8
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp14
-rw-r--r--cpp/src/tests/qpid_receive.cpp2
-rw-r--r--cpp/src/tests/qpid_send.cpp2
-rw-r--r--cpp/src/tests/qpid_stream.cpp2
40 files changed, 494 insertions, 164 deletions
diff --git a/cpp/examples/messaging/client.cpp b/cpp/examples/messaging/client.cpp
index c1a8d74237..a3855807c8 100644
--- a/cpp/examples/messaging/client.cpp
+++ b/cpp/examples/messaging/client.cpp
@@ -41,7 +41,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Sender sender = session.createSender("service_queue");
diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp
index 41b9503649..ba613ec364 100644
--- a/cpp/examples/messaging/drain.cpp
+++ b/cpp/examples/messaging/drain.cpp
@@ -95,7 +95,7 @@ int main(int argc, char** argv)
if (options.parse(argc, argv)) {
Connection connection(options.url, options.connectionOptions);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver(options.address);
Duration timeout = options.getTimeout();
diff --git a/cpp/examples/messaging/map_receiver.cpp b/cpp/examples/messaging/map_receiver.cpp
index 55c543b90b..6afc4c9ec9 100644
--- a/cpp/examples/messaging/map_receiver.cpp
+++ b/cpp/examples/messaging/map_receiver.cpp
@@ -40,7 +40,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver("message_queue");
Variant::Map content;
diff --git a/cpp/examples/messaging/map_sender.cpp b/cpp/examples/messaging/map_sender.cpp
index 2e63c88aa4..eeea9a53b4 100644
--- a/cpp/examples/messaging/map_sender.cpp
+++ b/cpp/examples/messaging/map_sender.cpp
@@ -39,7 +39,7 @@ int main(int argc, char** argv) {
const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Sender sender = session.createSender("message_queue");
diff --git a/cpp/examples/messaging/queue_receiver.cpp b/cpp/examples/messaging/queue_receiver.cpp
index 43dffd8baf..377ce11993 100644
--- a/cpp/examples/messaging/queue_receiver.cpp
+++ b/cpp/examples/messaging/queue_receiver.cpp
@@ -33,7 +33,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver("message_queue");
while (true) {
diff --git a/cpp/examples/messaging/queue_sender.cpp b/cpp/examples/messaging/queue_sender.cpp
index fa355dbf88..0592c30b60 100644
--- a/cpp/examples/messaging/queue_sender.cpp
+++ b/cpp/examples/messaging/queue_sender.cpp
@@ -36,7 +36,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Sender sender = session.createSender("message_queue");
diff --git a/cpp/examples/messaging/server.cpp b/cpp/examples/messaging/server.cpp
index 33d28a75c4..ae1fee4b50 100644
--- a/cpp/examples/messaging/server.cpp
+++ b/cpp/examples/messaging/server.cpp
@@ -42,7 +42,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver("service_queue; {create: always}");
diff --git a/cpp/examples/messaging/spout.cpp b/cpp/examples/messaging/spout.cpp
index 9ed8b642c8..2e9e91bfba 100644
--- a/cpp/examples/messaging/spout.cpp
+++ b/cpp/examples/messaging/spout.cpp
@@ -158,7 +158,7 @@ int main(int argc, char** argv)
if (options.parse(argc, argv)) {
Connection connection(options.url, options.connectionOptions);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Sender sender = session.createSender(options.address);
diff --git a/cpp/examples/messaging/topic_receiver.cpp b/cpp/examples/messaging/topic_receiver.cpp
index 408920f5aa..96f40a539c 100644
--- a/cpp/examples/messaging/topic_receiver.cpp
+++ b/cpp/examples/messaging/topic_receiver.cpp
@@ -35,7 +35,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver("news_service; {filter:[control, " + pattern + "]}");
while (true) {
diff --git a/cpp/examples/messaging/topic_sender.cpp b/cpp/examples/messaging/topic_sender.cpp
index 9d4cd582cf..a95c0951f7 100644
--- a/cpp/examples/messaging/topic_sender.cpp
+++ b/cpp/examples/messaging/topic_sender.cpp
@@ -53,7 +53,7 @@ int main(int argc, char** argv) {
Connection connection(url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
Sender sender = session.createSender("news_service");
diff --git a/cpp/include/qpid/client/SessionBase_0_10.h b/cpp/include/qpid/client/SessionBase_0_10.h
index 6b7ed97df4..3b5c84e74b 100644
--- a/cpp/include/qpid/client/SessionBase_0_10.h
+++ b/cpp/include/qpid/client/SessionBase_0_10.h
@@ -100,9 +100,6 @@ class SessionBase_0_10 {
QPID_CLIENT_EXTERN bool isValid() const;
QPID_CLIENT_EXTERN Connection getConnection();
-
- /** Send sync request without actually blocking for it**/
- QPID_CLIENT_EXTERN void sendSyncRequest();
protected:
boost::shared_ptr<SessionImpl> impl;
friend class SessionBase_0_10Access;
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h
index 34a186c9ce..3722db94e8 100644
--- a/cpp/include/qpid/messaging/Address.h
+++ b/cpp/include/qpid/messaging/Address.h
@@ -22,7 +22,7 @@
*
*/
#include <string>
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/types/Variant.h"
#include "qpid/messaging/ImportExport.h"
#include <ostream>
@@ -30,23 +30,6 @@
namespace qpid {
namespace messaging {
-/**
- * Thrown when a syntactically correct address cannot be resolved or
- * used.
- */
-struct InvalidAddress : public qpid::Exception
-{
- InvalidAddress(const std::string& msg);
-};
-
-/**
- * Thrown when an address string with inalid sytanx is used.
- */
-struct MalformedAddress : public qpid::Exception
-{
- MalformedAddress(const std::string& msg);
-};
-
class AddressImpl;
/**
diff --git a/cpp/include/qpid/messaging/Connection.h b/cpp/include/qpid/messaging/Connection.h
index e58abc1986..23711034d6 100644
--- a/cpp/include/qpid/messaging/Connection.h
+++ b/cpp/include/qpid/messaging/Connection.h
@@ -24,6 +24,7 @@
#include <string>
#include "qpid/messaging/ImportExport.h"
#include "qpid/messaging/Handle.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/types/Variant.h"
namespace qpid {
@@ -33,11 +34,6 @@ template <class> class PrivateImplRef;
class ConnectionImpl;
class Session;
-struct InvalidOptionString : public qpid::Exception
-{
- InvalidOptionString(const std::string& msg);
-};
-
class Connection : public qpid::messaging::Handle<ConnectionImpl>
{
public:
@@ -51,6 +47,7 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl>
* heartbeat
* tcp-nodelay
* sasl-mechanism
+ * sasl-service
* sasl-min-ssf
* sasl-max-ssf
* transport
@@ -78,13 +75,12 @@ class Connection : public qpid::messaging::Handle<ConnectionImpl>
*
* @exception InvalidOptionString if the string does not match the correct syntax
*/
- QPID_CLIENT_EXTERN Connection(const std::string& url, const std::string& options);
+ QPID_CLIENT_EXTERN Connection(const std::string& url, const std::string& options) throw(InvalidOptionString);
QPID_CLIENT_EXTERN ~Connection();
QPID_CLIENT_EXTERN Connection& operator=(const Connection&);
QPID_CLIENT_EXTERN void setOption(const std::string& name, const qpid::types::Variant& value);
- QPID_CLIENT_EXTERN void connect();
- QPID_CLIENT_EXTERN bool isConnected();
- QPID_CLIENT_EXTERN void detach();
+ QPID_CLIENT_EXTERN void open();
+ QPID_CLIENT_EXTERN bool isOpen();
/**
* Closes a connection and all sessions associated with it. An
* opened connection must be closed before the last handle is
diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h
index d89813acfc..6926d3401a 100644
--- a/cpp/include/qpid/messaging/Receiver.h
+++ b/cpp/include/qpid/messaging/Receiver.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/ImportExport.h"
#include "qpid/messaging/Handle.h"
#include "qpid/messaging/Duration.h"
@@ -41,8 +41,6 @@ class Session;
class Receiver : public qpid::messaging::Handle<ReceiverImpl>
{
public:
- struct NoMessageAvailable : qpid::Exception {};
-
QPID_CLIENT_EXTERN Receiver(ReceiverImpl* impl = 0);
QPID_CLIENT_EXTERN Receiver(const Receiver&);
QPID_CLIENT_EXTERN ~Receiver();
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h
index 95f9832576..b3bc527329 100644
--- a/cpp/include/qpid/messaging/Session.h
+++ b/cpp/include/qpid/messaging/Session.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/ImportExport.h"
#include "qpid/messaging/Handle.h"
@@ -40,11 +40,6 @@ class Receiver;
class SessionImpl;
class Subscription;
-struct KeyError : qpid::Exception
-{
- QPID_CLIENT_EXTERN KeyError(const std::string&);
-};
-
/**
* A session represents a distinct 'conversation' which can involve
* sending and receiving messages to and from different addresses.
@@ -159,6 +154,9 @@ class Session : public qpid::messaging::Handle<SessionImpl>
*/
QPID_CLIENT_EXTERN Connection getConnection() const;
+ QPID_CLIENT_EXTERN bool hasError();
+ QPID_CLIENT_EXTERN void checkError();
+
private:
friend class qpid::messaging::PrivateImplRef<Session>;
};
diff --git a/cpp/include/qpid/messaging/exceptions.h b/cpp/include/qpid/messaging/exceptions.h
new file mode 100644
index 0000000000..af7959fe13
--- /dev/null
+++ b/cpp/include/qpid/messaging/exceptions.h
@@ -0,0 +1,150 @@
+#ifndef QPID_MESSAGING_EXCEPTIONS_H
+#define QPID_MESSAGING_EXCEPTIONS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Exception.h"
+#include "qpid/types/Variant.h"
+#include "qpid/messaging/ImportExport.h"
+
+namespace qpid {
+namespace messaging {
+
+struct MessagingException : public qpid::types::Exception
+{
+ QPID_CLIENT_EXTERN MessagingException(const std::string& msg);
+ QPID_CLIENT_EXTERN virtual ~MessagingException() throw();
+
+ qpid::types::Variant::Map detail;
+ //TODO: override what() to include detail if present
+};
+
+struct InvalidOptionString : public MessagingException
+{
+ QPID_CLIENT_EXTERN InvalidOptionString(const std::string& msg);
+};
+
+struct KeyError : MessagingException
+{
+ QPID_CLIENT_EXTERN KeyError(const std::string&);
+};
+
+struct LinkError : MessagingException
+{
+ QPID_CLIENT_EXTERN LinkError(const std::string&);
+};
+
+struct AddressError : LinkError
+{
+ QPID_CLIENT_EXTERN AddressError(const std::string&);
+};
+
+/**
+ * Thrown when a syntactically correct address cannot be resolved or
+ * used.
+ */
+struct ResolutionError : public AddressError
+{
+ QPID_CLIENT_EXTERN ResolutionError(const std::string& msg);
+};
+
+struct AssertionFailed : public ResolutionError
+{
+ QPID_CLIENT_EXTERN AssertionFailed(const std::string& msg);
+};
+
+struct NotFound : public ResolutionError
+{
+ QPID_CLIENT_EXTERN NotFound(const std::string& msg);
+};
+
+/**
+ * Thrown when an address string with inalid sytanx is used.
+ */
+struct MalformedAddress : public AddressError
+{
+ QPID_CLIENT_EXTERN MalformedAddress(const std::string& msg);
+};
+
+struct ReceiverError : LinkError
+{
+ QPID_CLIENT_EXTERN ReceiverError(const std::string&);
+};
+
+struct FetchError : ReceiverError
+{
+ QPID_CLIENT_EXTERN FetchError(const std::string&);
+};
+
+struct NoMessageAvailable : FetchError
+{
+ QPID_CLIENT_EXTERN NoMessageAvailable();
+};
+
+struct SenderError : LinkError
+{
+ QPID_CLIENT_EXTERN SenderError(const std::string&);
+};
+
+struct SendError : SenderError
+{
+ QPID_CLIENT_EXTERN SendError(const std::string&);
+};
+
+struct TargetCapacityExceeded : SendError
+{
+ QPID_CLIENT_EXTERN TargetCapacityExceeded(const std::string&);
+};
+
+struct SessionError : MessagingException
+{
+ QPID_CLIENT_EXTERN SessionError(const std::string&);
+};
+
+struct TransactionError : SessionError
+{
+ QPID_CLIENT_EXTERN TransactionError(const std::string&);
+};
+
+struct TransactionAborted : TransactionError
+{
+ QPID_CLIENT_EXTERN TransactionAborted(const std::string&);
+};
+
+struct UnauthorizedAccess : SessionError
+{
+ QPID_CLIENT_EXTERN UnauthorizedAccess(const std::string&);
+};
+
+struct ConnectionError : MessagingException
+{
+ QPID_CLIENT_EXTERN ConnectionError(const std::string&);
+};
+
+struct TransportFailure : MessagingException
+{
+ QPID_CLIENT_EXTERN TransportFailure(const std::string&);
+};
+
+}} // namespace qpid::messaging
+
+#endif /*!QPID_MESSAGING_EXCEPTIONS_H*/
diff --git a/cpp/include/qpid/types/Exception.h b/cpp/include/qpid/types/Exception.h
new file mode 100644
index 0000000000..a8b7d128af
--- /dev/null
+++ b/cpp/include/qpid/types/Exception.h
@@ -0,0 +1,44 @@
+#ifndef QPID_TYPES_EXCEPTION_H
+#define QPID_TYPES_EXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include "qpid/CommonImportExport.h"
+
+namespace qpid {
+namespace types {
+
+class Exception : public std::exception
+{
+ public:
+ QPID_COMMON_EXTERN explicit Exception(const std::string& message=std::string()) throw();
+ QPID_COMMON_EXTERN virtual ~Exception() throw();
+ QPID_COMMON_EXTERN virtual const char* what() const throw();
+
+ private:
+ const std::string message;
+};
+
+}} // namespace qpid::types
+
+#endif /*!QPID_TYPES_EXCEPTION_H*/
diff --git a/cpp/include/qpid/types/Variant.h b/cpp/include/qpid/types/Variant.h
index 91e37242d0..059550bc9c 100644
--- a/cpp/include/qpid/types/Variant.h
+++ b/cpp/include/qpid/types/Variant.h
@@ -26,7 +26,7 @@
#include <ostream>
#include <string>
#include "Uuid.h"
-#include "qpid/Exception.h"
+#include "qpid/types/Exception.h"
#include "qpid/sys/IntegerTypes.h"
#include "qpid/CommonImportExport.h"
@@ -36,7 +36,7 @@ namespace types {
/**
* Thrown when an illegal conversion of a variant is attempted.
*/
-struct InvalidConversion : public qpid::Exception
+struct InvalidConversion : public Exception
{
InvalidConversion(const std::string& msg);
};
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 164b02d822..2edbf96205 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -584,6 +584,7 @@ set (qpidcommon_SOURCES
qpid/SessionId.cpp
qpid/StringUtils.cpp
qpid/Url.cpp
+ qpid/types/Exception.cpp
qpid/types/Uuid.cpp
qpid/types/Variant.cpp
qpid/amqp_0_10/Codecs.cpp
@@ -698,6 +699,7 @@ set (qpidclient_SOURCES
qpid/messaging/Connection.cpp
qpid/messaging/ConnectionImpl.h
qpid/messaging/Duration.cpp
+ qpid/messaging/exceptions.cpp
qpid/messaging/Message.cpp
qpid/messaging/MessageImpl.h
qpid/messaging/MessageImpl.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 427148dfbf..6692701502 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -426,6 +426,7 @@ libqpidcommon_la_SOURCES += \
qpid/memory.h \
qpid/pointer_to_other.h \
qpid/ptr_map.h \
+ qpid/types/Exception.cpp \
qpid/types/Uuid.cpp \
qpid/types/Variant.cpp \
qpid/amqp_0_10/Codecs.cpp \
@@ -710,6 +711,7 @@ libqpidclient_la_SOURCES = \
qpid/messaging/AddressParser.cpp \
qpid/messaging/Connection.cpp \
qpid/messaging/Duration.cpp \
+ qpid/messaging/exceptions.cpp \
qpid/messaging/Message.cpp \
qpid/messaging/MessageImpl.h \
qpid/messaging/MessageImpl.cpp \
@@ -818,12 +820,14 @@ nobase_include_HEADERS += \
../include/qpid/messaging/Address.h \
../include/qpid/messaging/Connection.h \
../include/qpid/messaging/Duration.h \
+ ../include/qpid/messaging/exceptions.h \
../include/qpid/messaging/Handle.h \
../include/qpid/messaging/ImportExport.h \
../include/qpid/messaging/Message.h \
../include/qpid/messaging/Receiver.h \
../include/qpid/messaging/Sender.h \
../include/qpid/messaging/Session.h \
+ ../include/qpid/types/Exception.h \
../include/qpid/types/Uuid.h \
../include/qpid/types/Variant.h \
../include/qpid/client/amqp0_10/FailoverUpdates.h
diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp
index 6aa13bb579..e114b7aacc 100644
--- a/cpp/src/qpid/client/SessionBase_0_10.cpp
+++ b/cpp/src/qpid/client/SessionBase_0_10.cpp
@@ -65,13 +65,6 @@ void SessionBase_0_10::sendCompletion()
impl->sendCompletion();
}
-void SessionBase_0_10::sendSyncRequest()
-{
- ExecutionSyncBody b;
- b.setSync(true);
- impl->send(b);
-}
-
uint16_t SessionBase_0_10::getChannel() const { return impl->getChannel(); }
void SessionBase_0_10::suspend() { impl->suspend(); }
diff --git a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
index f64a46ba01..43b581861f 100644
--- a/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
+++ b/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
@@ -26,7 +26,7 @@
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/types/Variant.h"
-#include "qpid/Exception.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ExchangeBoundResult.h"
@@ -45,7 +45,10 @@ namespace amqp0_10 {
using qpid::Exception;
using qpid::messaging::Address;
-using qpid::messaging::InvalidAddress;
+using qpid::messaging::MalformedAddress;
+using qpid::messaging::ResolutionError;
+using qpid::messaging::NotFound;
+using qpid::messaging::AssertionFailed;
using qpid::framing::ExchangeBoundResult;
using qpid::framing::ExchangeQueryResult;
using qpid::framing::FieldTable;
@@ -360,7 +363,7 @@ bool AddressResolution::is_reliable(const Address& address)
std::string checkAddressType(qpid::client::Session session, const Address& address)
{
if (address.getName().empty()) {
- throw InvalidAddress("Name cannot be null");
+ throw MalformedAddress("Name cannot be null");
}
std::string type = (Opt(address)/NODE/TYPE).str();
if (type.empty()) {
@@ -376,7 +379,7 @@ std::string checkAddressType(qpid::client::Session session, const Address& addre
type = TOPIC_ADDRESS;
} else {
//both a queue and exchange exist for that name
- throw InvalidAddress("Ambiguous address, please specify queue or topic as node type");
+ throw ResolutionError("Ambiguous address, please specify queue or topic as node type");
}
}
return type;
@@ -396,7 +399,7 @@ std::auto_ptr<MessageSource> AddressResolution::resolveSource(qpid::client::Sess
QPID_LOG(debug, "treating source address as queue: " << address);
return source;
} else {
- throw InvalidAddress("Unrecognised type: " + type);
+ throw ResolutionError("Unrecognised type: " + type);
}
}
@@ -414,7 +417,7 @@ std::auto_ptr<MessageSink> AddressResolution::resolveSink(qpid::client::Session
QPID_LOG(debug, "treating target address as queue: " << address);
return sink;
} else {
- throw InvalidAddress("Unrecognised type: " + type);
+ throw ResolutionError("Unrecognised type: " + type);
}
}
@@ -424,7 +427,7 @@ bool isBrowse(const Address& address)
if (!mode.isVoid()) {
std::string value = mode.asString();
if (value == BROWSE) return true;
- else if (value != CONSUME) throw InvalidAddress("Invalid mode");
+ else if (value != CONSUME) throw ResolutionError("Invalid mode");
}
return false;
}
@@ -516,7 +519,7 @@ void Subscription::bindAll()
b.arguments.setString("x-match", "all");
bindings.push_back(b);
} else { //E.g. direct and xml
- throw qpid::Exception(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
+ throw ResolutionError(QPID_MSG("Cannot create binding to match all messages for exchange of type " << actualType));
}
}
@@ -662,23 +665,26 @@ void Queue::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
if (enabled(createPolicy, mode)) {
QPID_LOG(debug, "Auto-creating queue '" << name << "'");
try {
- sync(session).queueDeclare(arg::queue=name,
- arg::durable=durable,
- arg::autoDelete=autoDelete,
- arg::exclusive=exclusive,
- arg::alternateExchange=alternateExchange,
- arg::arguments=arguments);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create queue %1%; %2%") % name % e.what()).str());
+ session.queueDeclare(arg::queue=name,
+ arg::durable=durable,
+ arg::autoDelete=autoDelete,
+ arg::exclusive=exclusive,
+ arg::alternateExchange=alternateExchange,
+ arg::arguments=arguments);
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::ResourceLockedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//may be thrown when creating bindings
+ throw ResolutionError((boost::format("Creation failed for queue %1%; %2%") % name % e.what()).str());
}
- nodeBindings.bind(session);
} else {
try {
sync(session).queueDeclare(arg::queue=name, arg::passive=true);
} catch (const qpid::framing::NotFoundException& /*e*/) {
- throw InvalidAddress((boost::format("Queue %1% does not exist") % name).str());
- } catch (const std::exception& e) {
- throw InvalidAddress(e.what());
+ throw NotFound((boost::format("Queue %1% does not exist") % name).str());
}
}
}
@@ -700,27 +706,27 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
if (enabled(assertPolicy, mode)) {
QueueQueryResult result = sync(session).queueQuery(name);
if (result.getQueue() != name) {
- throw InvalidAddress((boost::format("Queue not found: %1%") % name).str());
+ throw NotFound((boost::format("Queue not found: %1%") % name).str());
} else {
if (durable && !result.getDurable()) {
- throw InvalidAddress((boost::format("Queue not durable: %1%") % name).str());
+ throw AssertionFailed((boost::format("Queue not durable: %1%") % name).str());
}
if (autoDelete && !result.getAutoDelete()) {
- throw InvalidAddress((boost::format("Queue not set to auto-delete: %1%") % name).str());
+ throw AssertionFailed((boost::format("Queue not set to auto-delete: %1%") % name).str());
}
if (exclusive && !result.getExclusive()) {
- throw InvalidAddress((boost::format("Queue not exclusive: %1%") % name).str());
+ throw AssertionFailed((boost::format("Queue not exclusive: %1%") % name).str());
}
if (!alternateExchange.empty() && result.getAlternateExchange() != alternateExchange) {
- throw InvalidAddress((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
+ throw AssertionFailed((boost::format("Alternate exchange does not match for %1%, expected %2%, got %3%")
% name % alternateExchange % result.getAlternateExchange()).str());
}
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
- throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
} else if (*i->second != *v) {
- throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
}
@@ -746,23 +752,24 @@ void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
try {
std::string type = specifiedType;
if (type.empty()) type = TOPIC_EXCHANGE;
- sync(session).exchangeDeclare(arg::exchange=name,
+ session.exchangeDeclare(arg::exchange=name,
arg::type=type,
arg::durable=durable,
arg::autoDelete=autoDelete,
arg::alternateExchange=alternateExchange,
arg::arguments=arguments);
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create exchange %1%; %2%") % name % e.what()).str());
+ nodeBindings.bind(session);
+ session.sync();
+ } catch (const qpid::framing::NotAllowedException& e) {
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
+ } catch (const qpid::framing::NotFoundException& e) {//can be caused when creating bindings
+ throw ResolutionError((boost::format("Create failed for exchange %1%; %2%") % name % e.what()).str());
}
- nodeBindings.bind(session);
} else {
try {
sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
} catch (const qpid::framing::NotFoundException& /*e*/) {
- throw InvalidAddress((boost::format("Exchange %1% does not exist") % name).str());
- } catch (const std::exception& e) {
- throw InvalidAddress(e.what());
+ throw NotFound((boost::format("Exchange %1% does not exist") % name).str());
}
}
}
@@ -784,14 +791,14 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
if (enabled(assertPolicy, mode)) {
ExchangeQueryResult result = sync(session).exchangeQuery(name);
if (result.getNotFound()) {
- throw InvalidAddress((boost::format("Exchange not found: %1%") % name).str());
+ throw NotFound((boost::format("Exchange not found: %1%") % name).str());
} else {
if (specifiedType.size() && result.getType() != specifiedType) {
- throw InvalidAddress((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
+ throw AssertionFailed((boost::format("Exchange %1% is of incorrect type, expected %2% but got %3%")
% name % specifiedType % result.getType()).str());
}
if (durable && !result.getDurable()) {
- throw InvalidAddress((boost::format("Exchange not durable: %1%") % name).str());
+ throw AssertionFailed((boost::format("Exchange not durable: %1%") % name).str());
}
//Note: Can't check auto-delete or alternate-exchange via
//exchange-query-result as these are not returned
@@ -799,9 +806,9 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode)
for (FieldTable::ValueMap::const_iterator i = arguments.begin(); i != arguments.end(); ++i) {
FieldTable::ValuePtr v = result.getArguments().get(i->first);
if (!v) {
- throw InvalidAddress((boost::format("Option %1% not set for %2%") % i->first % name).str());
+ throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
} else if (i->second != v) {
- throw InvalidAddress((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
+ throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
% i->first % name % *(i->second) % *v).str());
}
}
@@ -844,16 +851,11 @@ void Bindings::setDefaultQueue(const std::string& queue)
void Bindings::bind(qpid::client::AsyncSession& session)
{
- try {
- for (Bindings::const_iterator i = begin(); i != end(); ++i) {
- session.exchangeBind(arg::queue=i->queue,
- arg::exchange=i->exchange,
- arg::bindingKey=i->key,
- arg::arguments=i->arguments);
- }
- session.sync();
- } catch (const qpid::Exception& e) {
- throw InvalidAddress((boost::format("Could not create node bindings: %1%") % e.what()).str());
+ for (Bindings::const_iterator i = begin(); i != end(); ++i) {
+ session.exchangeBind(arg::queue=i->queue,
+ arg::exchange=i->exchange,
+ arg::bindingKey=i->key,
+ arg::arguments=i->arguments);
}
}
@@ -873,7 +875,7 @@ void Bindings::check(qpid::client::AsyncSession& session)
arg::exchange=i->exchange,
arg::bindingKey=i->key);
if (result.getQueueNotMatched() || result.getKeyNotMatched()) {
- throw InvalidAddress((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
+ throw AssertionFailed((boost::format("No such binding [exchange=%1%, queue=%2%, key=%3%]")
% i->exchange % i->queue % i->key).str());
}
}
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
index 30b75ff4ff..7e5018fc5f 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
@@ -21,10 +21,12 @@
#include "ConnectionImpl.h"
#include "SessionImpl.h"
#include "SimpleUrlParser.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Session.h"
#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/Url.h"
#include <boost/intrusive_ptr.hpp>
#include <vector>
@@ -97,7 +99,7 @@ void convert(const Variant::Map& from, ConnectionSettings& to)
ConnectionImpl::ConnectionImpl(const std::string& url, const Variant::Map& options) :
reconnect(true), timeout(-1), limit(-1),
minReconnectInterval(3), maxReconnectInterval(60),
- retries(0)
+ retries(0), reconnectOnLimitExceeded(true)
{
QPID_LOG(debug, "Created connection with " << options);
setOptions(options);
@@ -117,7 +119,8 @@ void ConnectionImpl::setOptions(const Variant::Map& options)
setIfFound(options, "reconnect-interval-min", minReconnectInterval);
setIfFound(options, "reconnect-interval-max", maxReconnectInterval);
}
- setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "reconnect-urls", urls);
+ setIfFound(options, "x-reconnect-on-limit-exceeded", reconnectOnLimitExceeded);
}
void ConnectionImpl::setOption(const std::string& name, const Variant& value)
@@ -147,7 +150,7 @@ void ConnectionImpl::detach()
connection.close();
}
-bool ConnectionImpl::isConnected()
+bool ConnectionImpl::isOpen()
{
qpid::sys::Mutex::ScopedLock l(lock);
return connection.isOpen();
@@ -192,13 +195,13 @@ qpid::messaging::Session ConnectionImpl::newSession(bool transactional, const st
}
try {
getImplPtr(impl)->setSession(connection.newSession(name));
- } catch (const TransportFailure&) {
- connect();
+ } catch (const qpid::TransportFailure&) {
+ open();
}
return impl;
}
-void ConnectionImpl::connect()
+void ConnectionImpl::open()
{
qpid::sys::AbsTime start = qpid::sys::now();
qpid::sys::ScopedLock<qpid::sys::Semaphore> l(semaphore);
@@ -217,9 +220,15 @@ bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
void ConnectionImpl::connect(const qpid::sys::AbsTime& started)
{
for (int64_t i = minReconnectInterval; !tryConnect(); i = std::min(i * 2, maxReconnectInterval)) {
- if (!reconnect) throw TransportFailure("Failed to connect (reconnect disabled)");
- if (limit >= 0 && retries++ >= limit) throw TransportFailure("Failed to connect within reconnect limit");
- if (expired(started, timeout)) throw TransportFailure("Failed to connect within reconnect timeout");
+ if (!reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ if (limit >= 0 && retries++ >= limit) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+ }
+ if (expired(started, timeout)) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+ }
else qpid::sys::sleep(i);
}
retries = 0;
@@ -246,7 +255,7 @@ bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
}
QPID_LOG(info, "Connected to " << *i);
return true;
- } catch (const Exception& e) {
+ } catch (const qpid::Exception& e) {
//TODO: need to fix timeout on
//qpid::client::Connection::open() so that it throws
//TransportFailure rather than a ConnectionException
@@ -264,8 +273,27 @@ bool ConnectionImpl::resetSessions()
getImplPtr(i->second)->setSession(connection.newSession(i->first));
}
return true;
- } catch (const TransportFailure&) {
- QPID_LOG(debug, "Connection failed while re-inialising sessions");
+ } catch (const qpid::TransportFailure&) {
+ QPID_LOG(debug, "Connection failed while re-initialising sessions");
+ return false;
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (reconnectOnLimitExceeded) {
+ QPID_LOG(debug, "Detaching and reconnecting due to: " << e.what());
+ detach();
+ return false;
+ } else {
+ throw qpid::messaging::TargetCapacityExceeded(e.what());
+ }
+ }
+}
+
+bool ConnectionImpl::backoff()
+{
+ if (reconnectOnLimitExceeded) {
+ detach();
+ open();
+ return true;
+ } else {
return false;
}
}
diff --git a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
index 9d992c1375..b6fd33cc49 100644
--- a/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
@@ -23,7 +23,6 @@
*/
#include "qpid/messaging/ConnectionImpl.h"
#include "qpid/types/Variant.h"
-#include "qpid/Url.h"
#include "qpid/client/Connection.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/sys/Mutex.h"
@@ -40,14 +39,15 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
{
public:
ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
- void connect();
- bool isConnected();
+ void open();
+ bool isOpen();
void close();
qpid::messaging::Session newSession(bool transactional, const std::string& name);
qpid::messaging::Session getSession(const std::string& name) const;
void closed(SessionImpl&);
void detach();
void setOption(const std::string& name, const qpid::types::Variant& value);
+ bool backoff();
private:
typedef std::map<std::string, qpid::messaging::Session> Sessions;
@@ -63,6 +63,7 @@ class ConnectionImpl : public qpid::messaging::ConnectionImpl
int64_t minReconnectInterval;
int64_t maxReconnectInterval;
int32_t retries;
+ bool reconnectOnLimitExceeded;
void setOptions(const qpid::types::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index c3367f8ab4..343b5cad37 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -22,6 +22,7 @@
#include "AddressResolution.h"
#include "MessageSource.h"
#include "SessionImpl.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
@@ -29,6 +30,7 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
+using qpid::messaging::NoMessageAvailable;
using qpid::messaging::Receiver;
using qpid::messaging::Duration;
@@ -44,14 +46,14 @@ void ReceiverImpl::received(qpid::messaging::Message&)
qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
- if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
+ if (!get(result, timeout)) throw NoMessageAvailable();
return result;
}
qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
- if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+ if (!fetch(result, timeout)) throw NoMessageAvailable();
return result;
}
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 969ad93da9..33a3e226ff 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -24,6 +24,8 @@
#include "qpid/client/amqp0_10/SenderImpl.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
@@ -34,12 +36,15 @@
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
-#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
using qpid::messaging::KeyError;
+using qpid::messaging::NoMessageAvailable;
+using qpid::messaging::MessagingException;
+using qpid::messaging::TransactionAborted;
+using qpid::messaging::SessionError;
using qpid::messaging::MessageImplAccess;
using qpid::messaging::Sender;
using qpid::messaging::Receiver;
@@ -50,6 +55,11 @@ namespace amqp0_10 {
SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
+void SessionImpl::checkError()
+{
+ qpid::client::SessionBase_0_10Access s(session);
+ s.get()->assertOpen();
+}
void SessionImpl::sync(bool block)
{
@@ -60,7 +70,7 @@ void SessionImpl::sync(bool block)
void SessionImpl::commit()
{
if (!execute<Commit>()) {
- throw Exception();//TODO: what type?
+ throw TransactionAborted("Transaction aborted due to transport failure");
}
}
@@ -141,6 +151,7 @@ void SessionImpl::setSession(qpid::client::Session s)
for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
}
+ session.sync();
}
struct SessionImpl::CreateReceiver : Command
@@ -219,7 +230,7 @@ SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
{
boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
if (!impl) {
- throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
+ throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
}
return *impl;
}
@@ -297,7 +308,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
if (incoming.getNextDestination(destination, adjust(timeout))) {
Receivers::const_iterator i = receivers.find(destination);
if (i == receivers.end()) {
- throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
+ throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination));
} else {
receiver = i->second;
}
@@ -307,6 +318,17 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
}
} catch (TransportFailure&) {
reconnect();
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (backoff()) return false;
+ else throw qpid::messaging::TargetCapacityExceeded(e.what());
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::SessionException& e) {
+ throw qpid::messaging::SessionError(e.what());
+ } catch (const qpid::ConnectionException& e) {
+ throw qpid::messaging::ConnectionError(e.what());
+ } catch (const qpid::ChannelException& e) {
+ throw qpid::messaging::MessagingException(e.what());
}
}
}
@@ -314,8 +336,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout)
{
qpid::messaging::Receiver receiver;
- if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
- if (!receiver) throw qpid::Exception("Bad receiver returned!");
+ if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable();
+ if (!receiver) throw SessionError("Bad receiver returned!");
return receiver;
}
@@ -377,7 +399,7 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
void SessionImpl::syncImpl(bool block)
{
if (block) session.sync();
- else session.sendSyncRequest();
+ else session.flush();
}
void SessionImpl::commitImpl()
@@ -435,7 +457,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->connect();
+ connection->open();
+}
+
+bool SessionImpl::backoff()
+{
+ return connection->backoff();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 8b098e65d6..e1229055f7 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -23,11 +23,13 @@
*/
#include "qpid/messaging/SessionImpl.h"
#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/client/Session.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/client/amqp0_10/IncomingMessages.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/intrusive_ptr.hpp>
namespace qpid {
@@ -73,6 +75,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout);
qpid::messaging::Connection getConnection() const;
+ void checkError();
bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
@@ -93,9 +96,20 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::sys::Mutex::ScopedLock l(lock);
f();
return true;
- } catch (TransportFailure&) {
+ } catch (const qpid::TransportFailure&) {
reconnect();
return false;
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (backoff()) return false;
+ else throw qpid::messaging::TargetCapacityExceeded(e.what());
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::SessionException& e) {
+ throw qpid::messaging::SessionError(e.what());
+ } catch (const qpid::ConnectionException& e) {
+ throw qpid::messaging::ConnectionError(e.what());
+ } catch (const qpid::ChannelException& e) {
+ throw qpid::messaging::MessagingException(e.what());
}
}
@@ -118,6 +132,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout);
bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
+ bool backoff();
void commitImpl();
void rollbackImpl();
diff --git a/cpp/src/qpid/messaging/Address.cpp b/cpp/src/qpid/messaging/Address.cpp
index 0c522888e7..a516959edb 100644
--- a/cpp/src/qpid/messaging/Address.cpp
+++ b/cpp/src/qpid/messaging/Address.cpp
@@ -148,8 +148,4 @@ std::ostream& operator<<(std::ostream& out, const Address& address)
return out;
}
-InvalidAddress::InvalidAddress(const std::string& msg) : Exception(msg) {}
-
-MalformedAddress::MalformedAddress(const std::string& msg) : Exception(msg) {}
-
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/Connection.cpp b/cpp/src/qpid/messaging/Connection.cpp
index 81a72bb876..14e713d425 100644
--- a/cpp/src/qpid/messaging/Connection.cpp
+++ b/cpp/src/qpid/messaging/Connection.cpp
@@ -39,7 +39,7 @@ Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::cop
Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
Connection::~Connection() { PI::dtor(*this); }
-Connection::Connection(const std::string& url, const std::string& o)
+Connection::Connection(const std::string& url, const std::string& o) throw(InvalidOptionString)
{
Variant::Map options;
AddressParser parser(o);
@@ -54,9 +54,8 @@ Connection::Connection(const std::string& url, const Variant::Map& options)
PI::ctor(*this, new qpid::client::amqp0_10::ConnectionImpl(url, options));
}
-void Connection::connect() { impl->connect(); }
-bool Connection::isConnected() { return impl->isConnected(); }
-void Connection::detach() { impl->detach(); }
+void Connection::open() { impl->open(); }
+bool Connection::isOpen() { return impl->isOpen(); }
void Connection::close() { impl->close(); }
Session Connection::createSession(const std::string& name) { return impl->newSession(false, name); }
Session Connection::createTransactionalSession(const std::string& name)
@@ -69,6 +68,4 @@ void Connection::setOption(const std::string& name, const Variant& value)
impl->setOption(name, value);
}
-InvalidOptionString::InvalidOptionString(const std::string& msg) : Exception(msg) {}
-
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/ConnectionImpl.h b/cpp/src/qpid/messaging/ConnectionImpl.h
index 148105476d..23ab5272d0 100644
--- a/cpp/src/qpid/messaging/ConnectionImpl.h
+++ b/cpp/src/qpid/messaging/ConnectionImpl.h
@@ -38,9 +38,8 @@ class ConnectionImpl : public virtual qpid::RefCounted
{
public:
virtual ~ConnectionImpl() {}
- virtual void connect() = 0;
- virtual bool isConnected() = 0;
- virtual void detach() = 0;
+ virtual void open() = 0;
+ virtual bool isOpen() = 0;
virtual void close() = 0;
virtual Session newSession(bool transactional, const std::string& name) = 0;
virtual Session getSession(const std::string& name) const = 0;
diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp
index 753d3cec1b..1a04abe15b 100644
--- a/cpp/src/qpid/messaging/Message.cpp
+++ b/cpp/src/qpid/messaging/Message.cpp
@@ -111,13 +111,13 @@ template <class C> struct MessageCodec
checkEncoding(requested) || checkEncoding(message.getContentType());
}
- template <class T> static void decode(const Message& message, T& object, const std::string& encoding)
+ static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
{
checkEncoding(message, encoding);
C::decode(message.getContent(), object);
}
- template <class T> static void encode(const T& map, Message& message, const std::string& encoding)
+ static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)
{
checkEncoding(message, encoding);
std::string content;
diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp
index eb5e3766b8..bd9048e893 100644
--- a/cpp/src/qpid/messaging/Session.cpp
+++ b/cpp/src/qpid/messaging/Session.cpp
@@ -94,6 +94,15 @@ Connection Session::getConnection() const
return impl->getConnection();
}
-KeyError::KeyError(const std::string& msg) : Exception(msg) {}
+void Session::checkError() { impl->checkError(); }
+bool Session::hasError()
+{
+ try {
+ checkError();
+ return false;
+ } catch (const std::exception&) {
+ return true;
+ }
+}
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h
index 7acead5b04..e9a200b22e 100644
--- a/cpp/src/qpid/messaging/SessionImpl.h
+++ b/cpp/src/qpid/messaging/SessionImpl.h
@@ -54,6 +54,7 @@ class SessionImpl : public virtual qpid::RefCounted
virtual Sender getSender(const std::string& name) const = 0;
virtual Receiver getReceiver(const std::string& name) const = 0;
virtual Connection getConnection() const = 0;
+ virtual void checkError() = 0;
private:
};
}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/messaging/exceptions.cpp b/cpp/src/qpid/messaging/exceptions.cpp
new file mode 100644
index 0000000000..5d2683fffe
--- /dev/null
+++ b/cpp/src/qpid/messaging/exceptions.cpp
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/messaging/exceptions.h"
+
+namespace qpid {
+namespace messaging {
+
+MessagingException::MessagingException(const std::string& msg) : qpid::types::Exception(msg) {}
+MessagingException::~MessagingException() throw() {}
+
+InvalidOptionString::InvalidOptionString(const std::string& msg) : MessagingException(msg) {}
+KeyError::KeyError(const std::string& msg) : MessagingException(msg) {}
+
+
+LinkError::LinkError(const std::string& msg) : MessagingException(msg) {}
+
+AddressError::AddressError(const std::string& msg) : LinkError(msg) {}
+ResolutionError::ResolutionError(const std::string& msg) : AddressError(msg) {}
+MalformedAddress::MalformedAddress(const std::string& msg) : AddressError(msg) {}
+AssertionFailed::AssertionFailed(const std::string& msg) : ResolutionError(msg) {}
+NotFound::NotFound(const std::string& msg) : ResolutionError(msg) {}
+
+ReceiverError::ReceiverError(const std::string& msg) : LinkError(msg) {}
+FetchError::FetchError(const std::string& msg) : ReceiverError(msg) {}
+NoMessageAvailable::NoMessageAvailable() : FetchError("No message to fetch") {}
+
+SenderError::SenderError(const std::string& msg) : LinkError(msg) {}
+SendError::SendError(const std::string& msg) : SenderError(msg) {}
+TargetCapacityExceeded::TargetCapacityExceeded(const std::string& msg) : SendError(msg) {}
+
+SessionError::SessionError(const std::string& msg) : MessagingException(msg) {}
+TransactionError::TransactionError(const std::string& msg) : SessionError(msg) {}
+TransactionAborted::TransactionAborted(const std::string& msg) : TransactionError(msg) {}
+UnauthorizedAccess::UnauthorizedAccess(const std::string& msg) : SessionError(msg) {}
+
+ConnectionError::ConnectionError(const std::string& msg) : MessagingException(msg) {}
+
+TransportFailure::TransportFailure(const std::string& msg) : MessagingException(msg) {}
+
+}} // namespace qpid::messaging
diff --git a/cpp/src/qpid/types/Exception.cpp b/cpp/src/qpid/types/Exception.cpp
new file mode 100644
index 0000000000..71390e6abd
--- /dev/null
+++ b/cpp/src/qpid/types/Exception.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Exception.h"
+
+namespace qpid {
+namespace types {
+
+Exception::Exception(const std::string& msg) throw() : message(msg) {}
+Exception::~Exception() throw() {}
+const char* Exception::what() const throw() { return message.c_str(); }
+
+}} // namespace qpid::types
diff --git a/cpp/src/qpid/types/Variant.cpp b/cpp/src/qpid/types/Variant.cpp
index 904a596e82..2126c67fec 100644
--- a/cpp/src/qpid/types/Variant.cpp
+++ b/cpp/src/qpid/types/Variant.cpp
@@ -29,13 +29,13 @@
namespace qpid {
namespace types {
-InvalidConversion::InvalidConversion(const std::string& msg) : Exception(msg) {}
-
-
namespace {
-std::string EMPTY;
+const std::string EMPTY;
+const std::string PREFIX("invalid conversion: ");
}
+InvalidConversion::InvalidConversion(const std::string& msg) : Exception(PREFIX + msg) {}
+
class VariantImpl
{
public:
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index bdd5422690..aa0c65d319 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -116,7 +116,7 @@ struct MessagingFixture : public BrokerFixture
static Connection open(uint16_t port)
{
Connection connection((boost::format("amqp:tcp:localhost:%1%") % (port)).str());
- connection.connect();
+ connection.open();
return connection;
}
@@ -266,20 +266,20 @@ QPID_AUTO_TEST_CASE(testSenderError)
{
MessagingFixture fix;
ScopedSuppressLogging sl;
- BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress"), qpid::messaging::NotFound);
fix.session = fix.connection.createSession();
BOOST_CHECK_THROW(fix.session.createSender("NonExistentAddress; {create:receiver}"),
- qpid::messaging::InvalidAddress);
+ qpid::messaging::NotFound);
}
QPID_AUTO_TEST_CASE(testReceiverError)
{
MessagingFixture fix;
ScopedSuppressLogging sl;
- BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress"), qpid::messaging::NotFound);
fix.session = fix.connection.createSession();
BOOST_CHECK_THROW(fix.session.createReceiver("NonExistentAddress; {create:sender}"),
- qpid::messaging::InvalidAddress);
+ qpid::messaging::NotFound);
}
QPID_AUTO_TEST_CASE(testSimpleTopic)
@@ -766,10 +766,10 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
- BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createReceiver(a2), qpid::messaging::AssertionFailed);
std::string a3 = "q; {assert:sender, node:{x-declare:{arguments:{qpid.max-count:99}}}}";
- BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::InvalidAddress);
+ BOOST_CHECK_THROW(fix.session.createSender(a3), qpid::messaging::AssertionFailed);
Receiver r3 = fix.session.createReceiver(a3);
r3.close();
diff --git a/cpp/src/tests/qpid_receive.cpp b/cpp/src/tests/qpid_receive.cpp
index 77e9cd180a..46f3db6718 100644
--- a/cpp/src/tests/qpid_receive.cpp
+++ b/cpp/src/tests/qpid_receive.cpp
@@ -161,7 +161,7 @@ int main(int argc, char ** argv)
if (opts.parse(argc, argv)) {
Connection connection(opts.url, opts.connectionOptions);
try {
- connection.connect();
+ connection.open();
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
Receiver receiver = session.createReceiver(opts.address);
diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp
index e51c5a93d2..914f910224 100644
--- a/cpp/src/tests/qpid_send.cpp
+++ b/cpp/src/tests/qpid_send.cpp
@@ -218,7 +218,7 @@ int main(int argc, char ** argv)
if (opts.parse(argc, argv)) {
Connection connection(opts.url, opts.connectionOptions);
try {
- connection.connect();
+ connection.open();
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
Sender sender = session.createSender(opts.address);
diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp
index b3fe493922..4305ee8c49 100644
--- a/cpp/src/tests/qpid_stream.cpp
+++ b/cpp/src/tests/qpid_stream.cpp
@@ -90,7 +90,7 @@ struct Client : qpid::sys::Runnable
{
Connection connection(opts.url);
try {
- connection.connect();
+ connection.open();
Session session = connection.createSession();
doWork(session);
session.close();