diff options
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/sys/BlockingQueue.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/XmlClientSessionTest.cpp | 95 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 20 |
5 files changed, 75 insertions, 65 deletions
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index fe7793c432..e674a738c2 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -394,7 +394,7 @@ void TCPConnector::run(){ aio->queueForDeletion(); socket.close(); } catch (const std::exception& e) { - QPID_LOG(error, e.what()); + QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); handleClosed(); } } diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index 32d0001040..af805a3808 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -72,7 +72,7 @@ void Dispatcher::run() boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; try { - while (true) { + while (!queue->isClosed()) { Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { @@ -92,18 +92,19 @@ void Dispatcher::run() } } } + session.sync(); // Make sure all our acks are received before returning. } - catch (const ClosedException& e) { - QPID_LOG(debug, "Dispatch thread exiting, session closed: " << session.getId()); - try { - session.sync(); // Make sure all our acks are received before returning. - } - catch(...) {} + catch (const ClosedException&) { + QPID_LOG(debug, QPID_MSG(session.getId() << ": closed by peer")); } catch (const std::exception& e) { - QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); - if ( failoverHandler ) + if ( failoverHandler ) { + QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what())); failoverHandler(); + } + else { + QPID_LOG(error, session.getId() << " error: " << e.what()); + } } } diff --git a/cpp/src/qpid/sys/BlockingQueue.h b/cpp/src/qpid/sys/BlockingQueue.h index c6c6291b97..d7e6449d7a 100644 --- a/cpp/src/qpid/sys/BlockingQueue.h +++ b/cpp/src/qpid/sys/BlockingQueue.h @@ -73,7 +73,9 @@ public: return result; } - /** Push a value onto the queue */ + /** Push a value onto the queue. + * Note it is not an error to push onto a closed queue. + */ void push(const T& t) { Mutex::ScopedLock l(waitable); queue.push(t); diff --git a/cpp/src/tests/XmlClientSessionTest.cpp b/cpp/src/tests/XmlClientSessionTest.cpp index df515a6adb..534ecf70f2 100644 --- a/cpp/src/tests/XmlClientSessionTest.cpp +++ b/cpp/src/tests/XmlClientSessionTest.cpp @@ -18,7 +18,9 @@ * under the License. * */ + #include "unit_test.h" +#include "test_tools.h" #include "BrokerFixture.h" #include "qpid/sys/Shlib.h" #include "qpid/sys/Monitor.h" @@ -124,31 +126,31 @@ struct ClientSessionFixture : public ProxySessionFixture QPID_AUTO_TEST_CASE(testXmlBinding) { - ClientSessionFixture f; + ClientSessionFixture f; - SubscriptionManager subscriptions(f.session); - SubscribedLocalQueue localQueue(subscriptions); + SubscriptionManager subscriptions(f.session); + SubscribedLocalQueue localQueue(subscriptions); - f.session.exchangeDeclare(qpid::client::arg::exchange="xml", qpid::client::arg::type="xml"); - f.session.queueDeclare(qpid::client::arg::queue="odd_blue"); - subscriptions.subscribe(localQueue, "odd_blue"); + f.session.exchangeDeclare(qpid::client::arg::exchange="xml", qpid::client::arg::type="xml"); + f.session.queueDeclare(qpid::client::arg::queue="odd_blue"); + subscriptions.subscribe(localQueue, "odd_blue"); - FieldTable binding; - binding.setString("xquery", "declare variable $color external;" - "(./message/id mod 2 = 1) and ($color = 'blue')"); - f.session.exchangeBind(qpid::client::arg::exchange="xml", qpid::client::arg::queue="odd_blue", qpid::client::arg::bindingKey="query_name", qpid::client::arg::arguments=binding); + FieldTable binding; + binding.setString("xquery", "declare variable $color external;" + "(./message/id mod 2 = 1) and ($color = 'blue')"); + f.session.exchangeBind(qpid::client::arg::exchange="xml", qpid::client::arg::queue="odd_blue", qpid::client::arg::bindingKey="query_name", qpid::client::arg::arguments=binding); - Message message; - message.getDeliveryProperties().setRoutingKey("query_name"); + Message message; + message.getDeliveryProperties().setRoutingKey("query_name"); - message.getHeaders().setString("color", "blue"); - string m = "<message><id>1</id></message>"; - message.setData(m); + message.getHeaders().setString("color", "blue"); + string m = "<message><id>1</id></message>"; + message.setData(m); - f.session.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="xml"); + f.session.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="xml"); - Message m2 = localQueue.get(); - BOOST_CHECK_EQUAL(m, m2.getData()); + Message m2 = localQueue.get(); + BOOST_CHECK_EQUAL(m, m2.getData()); } /** @@ -186,48 +188,49 @@ QPID_AUTO_TEST_CASE(testXMLBindMultipleQueues) { // raise an exception, the content is not required to be XML. QPID_AUTO_TEST_CASE(testXMLSendBadXML) { - ClientSessionFixture f; + ClientSessionFixture f; - f.session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); - f.session.queueDeclare(arg::queue="blue", arg::exclusive=true, arg::autoDelete=true)\ - ; - f.session.queueDeclare(arg::queue="red", arg::exclusive=true, arg::autoDelete=true); + f.session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); + f.session.queueDeclare(arg::queue="blue", arg::exclusive=true, arg::autoDelete=true)\ + ; + f.session.queueDeclare(arg::queue="red", arg::exclusive=true, arg::autoDelete=true); - FieldTable blue; - blue.setString("xquery", "./colour = 'blue'"); - f.session.exchangeBind(arg::exchange="xml", arg::queue="blue", arg::bindingKey="by-c\ + FieldTable blue; + blue.setString("xquery", "./colour = 'blue'"); + f.session.exchangeBind(arg::exchange="xml", arg::queue="blue", arg::bindingKey="by-c\ olour", arg::arguments=blue); - FieldTable red; - red.setString("xquery", "./colour = 'red'"); - f.session.exchangeBind(arg::exchange="xml", arg::queue="red", arg::bindingKey="by-co\ + FieldTable red; + red.setString("xquery", "./colour = 'red'"); + f.session.exchangeBind(arg::exchange="xml", arg::queue="red", arg::bindingKey="by-co\ lour", arg::arguments=red); - Message sent1("<>colour>blue</colour>", "by-colour"); - f.session.messageTransfer(arg::content=sent1, arg::destination="xml"); + Message sent1("<>colour>blue</colour>", "by-colour"); + f.session.messageTransfer(arg::content=sent1, arg::destination="xml"); - BOOST_CHECK_EQUAL(1, 1); + BOOST_CHECK_EQUAL(1, 1); } //### Test: Bad XQuery does not kill the server, but does raise an exception QPID_AUTO_TEST_CASE(testXMLBadXQuery) { - ClientSessionFixture f; - - f.session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); - f.session.queueDeclare(arg::queue="blue", arg::exclusive=true, arg::autoDelete=true)\ - ; + ClientSessionFixture f; - try { - FieldTable blue; - blue.setString("xquery", "./colour $=! 'blue'"); - f.session.exchangeBind(arg::exchange="xml", arg::queue="blue", arg::bindingKey="by-c\ + f.session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); + f.session.queueDeclare(arg::queue="blue", arg::exclusive=true, arg::autoDelete=true)\ + ; + + try { + ScopedSuppressLogging sl; // Supress logging of error messages for expected error. + FieldTable blue; + blue.setString("xquery", "./colour $=! 'blue'"); + f.session.exchangeBind(arg::exchange="xml", arg::queue="blue", arg::bindingKey="by-c\ olour", arg::arguments=blue); - } - catch (const InternalErrorException& e) { - return; - } - BOOST_ERROR("A bad XQuery must raise an exception when used in an XML Binding."); + } + catch (const InternalErrorException& e) { + return; + } + BOOST_ERROR("A bad XQuery must raise an exception when used in an XML Binding."); } diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 9266aa1b67..68920d1324 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -92,11 +92,19 @@ struct ClusterFixture : public vector<uint16_t> { void add0(bool force); void setup(); + /** Kill a forked broker with sig, or shutdown broker0 if n==0. */ void kill(size_t n, int sig=SIGINT) { if (n) forkedBrokers[n-1].kill(sig); else broker0->broker->shutdown(); } + /** Kill a broker and suppress errors from connection. */ + void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT) { + ScopedSuppressLogging sl; + kill(n,sig); + try { c.close(); } catch(...) {} + } + void waitFor(size_t n) { for (size_t retry = 1000; retry && getGlobalCluster().getUrls().size() != n; --retry) ::usleep(1000); @@ -218,7 +226,8 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { BOOST_CHECK_EQUAL(kb2,kb0); BOOST_CHECK_EQUAL(kb2,kb1); - cluster.kill(1,9); + cluster.killWithSilencer(1,c1.connection,9); + cluster.waitFor(2); kb0 = knownBrokerPorts(c0.connection, 2); kb2 = knownBrokerPorts(c2.connection, 2); BOOST_CHECK_EQUAL(kb0.size(), 2u); @@ -257,13 +266,8 @@ QPID_AUTO_TEST_CASE(DumpConsumers) { BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u); // Kill the subscribing member, ensure further messages are not removed. - { - ScopedSuppressLogging sl; - cluster.kill(1,9); - cluster.waitFor(2); - try { c1.connection.close(); } - catch (...) {} - } + cluster.killWithSilencer(1,c1.connection,9); + cluster.waitFor(2); for (int i = 0; i < 10; ++i) { c0.session.messageTransfer(arg::content=Message("bbb", "q")); BOOST_REQUIRE(c0.subs.get(m, "q", TIME_SEC)); |