summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-31 16:56:04 +0000
committerAlan Conway <aconway@apache.org>2008-10-31 16:56:04 +0000
commit4cbbb2a935e2eda7aa8d79fd6d5764cbbfb2010a (patch)
tree0033cd459d4418c44f28953bb211b58135c88eae
parent79323867dae1ad7b0d00e4055e506749236d9bf1 (diff)
downloadqpid-python-4cbbb2a935e2eda7aa8d79fd6d5764cbbfb2010a.tar.gz
Cluster returns connection exception for un-supported AMQP features.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709474 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp29
-rw-r--r--cpp/src/qpid/cluster/Connection.h1
-rw-r--r--cpp/src/tests/cluster_test.cpp14
3 files changed, 40 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index d8d41027cc..604df9dde6 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,8 +24,10 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
@@ -98,14 +100,33 @@ void Connection::received(framing::AMQFrame& f) {
}
}
+bool Connection::checkUnsupported(const AMQBody& body) {
+ std::string message;
+ if (body.getMethod()) {
+ switch (body.getMethod()->amqpClassId()) {
+ case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break;
+ case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
+ }
+ }
+ else if (body.type() == HEADER_BODY) {
+ const DeliveryProperties* dp = static_cast<const AMQHeaderBody&>(body).get<DeliveryProperties>();
+ if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
+ }
+ if (!message.empty())
+ connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+ return !message.empty();
+}
+
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
assert(!catchUp);
- // Handle connection controls, deliver other frames to connection.
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection.received(f);
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
+ && !checkUnsupported(*f.getBody())) // Unsupported operation.
+ {
+ connection.received(f); // Pass to broker connection.
+ }
}
void Connection::closed() {
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index b4f8128632..9f75d3dae3 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -121,6 +121,7 @@ class Connection :
private:
bool catcUp;
+ bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 63bd31ea1b..5f18d0ff90 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -220,6 +220,20 @@ class Sender {
uint16_t channel;
};
+QPID_AUTO_TEST_CASE(testUnsupported) {
+ ScopedSuppressLogging sl;
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ BOOST_CHECK_THROW(c0.session.txSelect(), Exception);
+ BOOST_CHECK(!c0.connection.isOpen());
+ Client c1(cluster[0], "c1");
+ BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception);
+ Client c2(cluster[0], "c2");
+ Message m;
+ m.getDeliveryProperties().setTtl(1);
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
+}
+
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
ClusterFixture cluster(1);