diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-03-19 21:05:01 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2012-03-19 21:05:01 +0000 |
commit | b4294229869c5f7b0fd5da3c5fe9e6b6b6fd6c6c (patch) | |
tree | 116e58401dbd116aa003db31764eb5e656200d03 | |
parent | 841377c0309773dad4db14af13002fff5cc6d236 (diff) | |
download | qpid-python-b4294229869c5f7b0fd5da3c5fe9e6b6b6fd6c6c.tar.gz |
QPID-3890: resync this branch to latest trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3890@1302655 13f79535-47bb-0310-9956-ffa450edef68
57 files changed, 1518 insertions, 2246 deletions
diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h index 293fb2eed7..1986a72d10 100644 --- a/qpid/cpp/include/qpid/framing/FieldTable.h +++ b/qpid/cpp/include/qpid/framing/FieldTable.h @@ -1,3 +1,6 @@ +#ifndef _FieldTable_ +#define _FieldTable_ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,16 +21,17 @@ * under the License. * */ -#include <iostream> -#include <vector> + +#include "qpid/framing/amqp_types.h" +#include "qpid/sys/Mutex.h" + #include <boost/shared_ptr.hpp> #include <boost/shared_array.hpp> + +#include <iosfwd> #include <map> -#include "qpid/framing/amqp_types.h" -#include "qpid/CommonImportExport.h" -#ifndef _FieldTable_ -#define _FieldTable_ +#include "qpid/CommonImportExport.h" namespace qpid { /** @@ -114,11 +118,13 @@ class FieldTable private: void realDecode() const; - void flushRawCache() const; + void flushRawCache(); + mutable qpid::sys::Mutex lock; mutable ValueMap values; mutable boost::shared_array<uint8_t> cachedBytes; mutable uint32_t cachedSize; // if = 0 then non cached size as 0 is not a legal size + mutable bool newBytes; QPID_COMMON_EXTERN friend std::ostream& operator<<(std::ostream& out, const FieldTable& body); }; diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index cfcdead883..b6ce249708 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -874,7 +874,6 @@ set (qpidcommon_SOURCES qpid/framing/AMQHeaderBody.cpp qpid/framing/AMQHeartbeatBody.cpp qpid/framing/Array.cpp - qpid/framing/BodyHandler.cpp qpid/framing/Buffer.cpp qpid/framing/Endian.cpp qpid/framing/FieldTable.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index e03c88ec8b..5dcc4cd210 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -398,8 +398,6 @@ libqpidcommon_la_SOURCES += \ qpid/framing/AccumulatedAck.h \ qpid/framing/Array.cpp \ qpid/framing/BodyFactory.h \ - qpid/framing/BodyHandler.cpp \ - qpid/framing/BodyHandler.h \ qpid/framing/Buffer.cpp \ qpid/framing/ResizableBuffer.h \ qpid/framing/ChannelHandler.h \ diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index a8389095c9..12730d20ec 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -29,7 +29,8 @@ #include <ostream> using namespace std; -using namespace boost; +using boost::ref; +using boost::optional; namespace qpid { using namespace framing; diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp index eb65005a9e..fc53d1076b 100644 --- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -30,9 +30,9 @@ namespace qpid { namespace cluster { using namespace std; -using namespace boost; using namespace framing::cluster; using namespace framing; +using boost::optional; InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) : self(self_), completed(), resendNeeded(), size(size_) diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp deleted file mode 100644 index db302b1e4c..0000000000 --- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/framing/BodyHandler.h" -#include "qpid/framing/AMQMethodBody.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/AMQContentBody.h" -#include "qpid/framing/AMQHeartbeatBody.h" -#include <boost/cast.hpp> -#include "qpid/framing/reply_exceptions.h" -#include "qpid/Msg.h" - -using namespace qpid::framing; -using namespace boost; - -BodyHandler::~BodyHandler() {} - -// TODO aconway 2007-08-13: Replace with visitor. -void BodyHandler::handleBody(AMQBody* body) { - switch(body->type()) - { - case METHOD_BODY: - handleMethod(polymorphic_downcast<AMQMethodBody*>(body)); - break; - case HEADER_BODY: - handleHeader(polymorphic_downcast<AMQHeaderBody*>(body)); - break; - case CONTENT_BODY: - handleContent(polymorphic_downcast<AMQContentBody*>(body)); - break; - case HEARTBEAT_BODY: - handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); - break; - default: - throw FramingErrorException( - QPID_MSG("Invalid frame type " << body->type())); - } -} - diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.h b/qpid/cpp/src/qpid/framing/BodyHandler.h deleted file mode 100644 index 9ded737195..0000000000 --- a/qpid/cpp/src/qpid/framing/BodyHandler.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef _BodyHandler_ -#define _BodyHandler_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> - -namespace qpid { -namespace framing { -class AMQBody; -class AMQMethodBody; -class AMQHeaderBody; -class AMQContentBody; -class AMQHeartbeatBody; - -// TODO aconway 2007-08-10: rework using Visitor pattern? - -/** - * Interface to handle incoming frame bodies. - * Derived classes provide logic for each frame type. - */ -class BodyHandler { - public: - virtual ~BodyHandler(); - virtual void handleBody(AMQBody* body); - - protected: - virtual void handleMethod(AMQMethodBody*) = 0; - virtual void handleHeader(AMQHeaderBody*) = 0; - virtual void handleContent(AMQContentBody*) = 0; - virtual void handleHeartbeat(AMQHeartbeatBody*) = 0; -}; - -}} - - -#endif diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp index b696b5e54f..f9dc42916d 100644 --- a/qpid/cpp/src/qpid/framing/FieldTable.cpp +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -28,22 +28,45 @@ #include "qpid/Msg.h" #include <assert.h> +// The locking rationale in the FieldTable seems a little odd, but it +// maintains the concurrent guarantees and requirements that were in +// place before the cachedBytes/cachedSize were added: +// +// The FieldTable client code needs to make sure that they call no write +// operation in parallel with any other operation on the FieldTable. +// However multiple parallel read operations are safe. +// +// To this end the only code that is locked is code that can transparently +// change the state of the FieldTable during a read only operation. +// (In other words the code that required the mutable members in the class +// definition!) +// namespace qpid { + +using sys::Mutex; +using sys::ScopedLock; + namespace framing { FieldTable::FieldTable() : - cachedSize(0) + cachedSize(0), + newBytes(false) { } FieldTable::FieldTable(const FieldTable& ft) : cachedBytes(ft.cachedBytes), - cachedSize(ft.cachedSize) + cachedSize(ft.cachedSize), + newBytes(ft.newBytes) { // Only copy the values if we have no raw data // - copying the map is expensive and we can // reconstruct it if necessary from the raw data - if (!cachedBytes && !ft.values.empty()) values = ft.values; + if (cachedBytes) { + newBytes = true; + return; + } + if (!ft.values.empty()) values = ft.values; } FieldTable& FieldTable::operator=(const FieldTable& ft) @@ -52,10 +75,13 @@ FieldTable& FieldTable::operator=(const FieldTable& ft) values.swap(nft.values); cachedBytes.swap(nft.cachedBytes); cachedSize = nft.cachedSize; + newBytes = nft.newBytes; return (*this); } uint32_t FieldTable::encodedSize() const { + ScopedLock<Mutex> l(lock); + if (cachedSize != 0) { return cachedSize; } @@ -238,6 +264,7 @@ void FieldTable::encode(Buffer& buffer) const { i->second->encode(buffer); } // Now create raw bytes in case we are used again + ScopedLock<Mutex> l(lock); cachedSize = buffer.getPosition() - p; cachedBytes = boost::shared_array<uint8_t>(new uint8_t[cachedSize]); buffer.setPosition(p); @@ -261,14 +288,17 @@ void FieldTable::decode(Buffer& buffer){ // Copy data into our buffer cachedBytes = boost::shared_array<uint8_t>(new uint8_t[len + 4]); cachedSize = len + 4; + newBytes = true; buffer.setPosition(p); buffer.getRawData(&cachedBytes[0], cachedSize); } void FieldTable::realDecode() const { + ScopedLock<Mutex> l(lock); + // If we've got no raw data stored up then nothing to do - if (!cachedBytes) + if (!newBytes) return; Buffer buffer((char*)&cachedBytes[0], cachedSize); @@ -286,10 +316,13 @@ void FieldTable::realDecode() const values[name] = ValuePtr(value); } } + newBytes = false; } -void FieldTable::flushRawCache() const +void FieldTable::flushRawCache() { + // We can only flush the cache if there are no cached bytes to decode + assert(newBytes==false); // Avoid recreating shared array unless we actually have one. if (cachedBytes) cachedBytes.reset(); cachedSize = 0; @@ -319,6 +352,7 @@ void FieldTable::erase(const std::string& name) void FieldTable::clear() { values.clear(); + newBytes = false; flushRawCache(); } diff --git a/qpid/cpp/src/qpid/framing/FrameSet.cpp b/qpid/cpp/src/qpid/framing/FrameSet.cpp index 255aaf6e6b..d2612e484f 100644 --- a/qpid/cpp/src/qpid/framing/FrameSet.cpp +++ b/qpid/cpp/src/qpid/framing/FrameSet.cpp @@ -26,7 +26,6 @@ #include "qpid/framing/TypeFilter.h" using namespace qpid::framing; -using namespace boost; FrameSet::FrameSet(const SequenceNumber& _id) : id(_id),contentSize(0),recalculateSize(true) { } FrameSet::FrameSet(const FrameSet& original) : id(original.id), contentSize(0), recalculateSize(true) diff --git a/qpid/cpp/src/qpid/framing/amqp_framing.h b/qpid/cpp/src/qpid/framing/amqp_framing.h index 3a8b39afb5..2e58922364 100644 --- a/qpid/cpp/src/qpid/framing/amqp_framing.h +++ b/qpid/cpp/src/qpid/framing/amqp_framing.h @@ -21,7 +21,6 @@ #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQBody.h" -#include "qpid/framing/BodyHandler.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQContentBody.h" diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h index fd07f2fb2e..499b01d503 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h +++ b/qpid/cpp/src/qpid/store/ms-clfs/Transaction.h @@ -27,6 +27,7 @@ #include <boost/enable_shared_from_this.hpp> #include <boost/shared_ptr.hpp> #include <string> +#include <vector> #include "TransactionLog.h" diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 79c82106cb..bb4f52d319 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -151,13 +151,6 @@ set(unit_tests_to_build mark_as_advanced(unit_tests_to_build) -# Disabled till we move to amqp_0_10 codec. -# amqp_0_10/serialize.cpp allSegmentTypes.h \ -# amqp_0_10/ProxyTemplate.cpp \ -# amqp_0_10/apply.cpp \ -# amqp_0_10/Map.cpp \ -# amqp_0_10/handlers.cpp - add_executable (unit_test unit_test ${unit_tests_to_build} ${platform_test_additions}) target_link_libraries (unit_test @@ -326,7 +319,7 @@ if (BUILD_MSSQL) add_test (store_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL) endif (BUILD_MSSQL) if (BUILD_MSCLFS) - add_test (store_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL-CLFS) + add_test (store_tests_clfs ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_store_tests${test_script_suffix} MSSQL-CLFS) endif (BUILD_MSCLFS) endif (PYTHON_EXECUTABLE) diff --git a/qpid/cpp/src/tests/Frame.cpp b/qpid/cpp/src/tests/Frame.cpp index 1270eabba3..cfcfde04a7 100644 --- a/qpid/cpp/src/tests/Frame.cpp +++ b/qpid/cpp/src/tests/Frame.cpp @@ -30,7 +30,6 @@ QPID_AUTO_TEST_SUITE(FrameTestSuite) using namespace std; using namespace qpid::framing; -using namespace boost; QPID_AUTO_TEST_CASE(testContentBody) { Frame f(42, AMQContentBody("foobar")); diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index a7f3bc1fbd..7c0afc4a63 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -352,7 +352,8 @@ EXTRA_DIST += \ run_queue_flow_limit_tests \ run_msg_group_tests \ ipv6_test \ - ha_tests.py + ha_tests.py \ + test_env.ps1.in check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) diff --git a/qpid/cpp/src/tests/SessionState.cpp b/qpid/cpp/src/tests/SessionState.cpp index 3be9bb0cbc..1cf3415484 100644 --- a/qpid/cpp/src/tests/SessionState.cpp +++ b/qpid/cpp/src/tests/SessionState.cpp @@ -34,7 +34,6 @@ namespace tests { QPID_AUTO_TEST_SUITE(SessionStateTestSuite) using namespace std; -using namespace boost; using namespace qpid::framing; // ================================================================ diff --git a/qpid/cpp/src/tests/amqp_0_10/Map.cpp b/qpid/cpp/src/tests/amqp_0_10/Map.cpp deleted file mode 100644 index ffb235829e..0000000000 --- a/qpid/cpp/src/tests/amqp_0_10/Map.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "amqp_0_10/unit_test.h" -#include "qpid/amqp_0_10/Map.h" -#include "qpid/amqp_0_10/Array.h" -#include "qpid/amqp_0_10/Struct32.h" -#include "qpid/amqp_0_10/UnknownType.h" -#include "qpid/amqp_0_10/Codec.h" -#include <iostream> - -using namespace qpid::amqp_0_10; -using namespace std; - -QPID_AUTO_TEST_SUITE(MapTestSuite) - - QPID_AUTO_TEST_CASE(testGetSet) { - MapValue v; - v = Str8("foo"); - BOOST_CHECK(v.get<Str8>()); - BOOST_CHECK(!v.get<uint8_t>()); - BOOST_CHECK_EQUAL(*v.get<Str8>(), "foo"); - - v = uint8_t(42); - BOOST_CHECK(!v.get<Str8>()); - BOOST_CHECK(v.get<uint8_t>()); - BOOST_CHECK_EQUAL(*v.get<uint8_t>(), 42); - - v = uint16_t(12); - BOOST_CHECK(v.get<uint16_t>()); - BOOST_CHECK_EQUAL(*v.get<uint16_t>(), 12); -} - -template <class R> struct TestVisitor : public MapValue::Visitor<R> { - template <class T> R operator()(const T&) const { throw MapValue::BadTypeException(); } - R operator()(const R& r) const { return r; } -}; - -QPID_AUTO_TEST_CASE(testVisit) { - MapValue v; - v = Str8("foo"); - BOOST_CHECK_EQUAL(v.apply_visitor(TestVisitor<Str8>()), "foo"); - v = Uint16(42); - BOOST_CHECK_EQUAL(v.apply_visitor(TestVisitor<Uint16>()), 42); - try { - v.apply_visitor(TestVisitor<bool>()); - BOOST_FAIL("Expecting exception"); - } - catch(const MapValue::BadTypeException&) {} -} - - -QPID_AUTO_TEST_CASE(testEncodeMapValue) { - MapValue mv; - std::string data; - mv = Str8("hello"); - Codec::encode(back_inserter(data))(mv); - BOOST_CHECK_EQUAL(data.size(), Codec::size(mv)); - MapValue mv2; - Codec::decode(data.begin())(mv2); - BOOST_CHECK_EQUAL(mv2.getCode(), 0x85); - BOOST_REQUIRE(mv2.get<Str8>()); - BOOST_CHECK_EQUAL(*mv2.get<Str8>(), "hello"); -} - -QPID_AUTO_TEST_CASE(testEncode) { - Map map; - std::string data; - map["A"] = true; - map["b"] = Str8("hello"); - Codec::encode(back_inserter(data))(map); - BOOST_CHECK_EQUAL(Codec::size(map), data.size()); - Map map2; - Codec::decode(data.begin())(map2); - BOOST_CHECK_EQUAL(map.size(), 2u); - BOOST_CHECK(map["A"].get<bool>()); - BOOST_CHECK_EQUAL(*map["b"].get<Str8>(), "hello"); -} - - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/amqp_0_10/ProxyTemplate.cpp b/qpid/cpp/src/tests/amqp_0_10/ProxyTemplate.cpp deleted file mode 100644 index f54ee0da22..0000000000 --- a/qpid/cpp/src/tests/amqp_0_10/ProxyTemplate.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "amqp_0_10/unit_test.h" -#include "qpid/amqp_0_10/ProxyTemplate.h" -#include <boost/any.hpp> - -QPID_AUTO_TEST_SUITE(ProxyTemplateTestSuite) - -using namespace qpid::amqp_0_10; - -struct ToAny { - template <class T> - boost::any operator()(const T& t) { return boost::any(t); } -}; - -struct AnyProxy : public ProxyTemplate<ToAny, boost::any> {}; - -QPID_AUTO_TEST_CASE(testAnyProxy) { - AnyProxy p; - boost::any a=p.connectionTune(1,2,3,4); - BOOST_CHECK_EQUAL(a.type().name(), typeid(connection::Tune).name()); - connection::Tune* tune=boost::any_cast<connection::Tune>(&a); - BOOST_REQUIRE(tune); - BOOST_CHECK_EQUAL(tune->channelMax, 1u); - BOOST_CHECK_EQUAL(tune->maxFrameSize, 2u); - BOOST_CHECK_EQUAL(tune->heartbeatMin, 3u); - BOOST_CHECK_EQUAL(tune->heartbeatMax, 4u); -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/amqp_0_10/apply.cpp b/qpid/cpp/src/tests/amqp_0_10/apply.cpp deleted file mode 100644 index 0aa4421791..0000000000 --- a/qpid/cpp/src/tests/amqp_0_10/apply.cpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "amqp_0_10/unit_test.h" -#include "qpid/amqp_0_10/specification.h" -#include "qpid/amqp_0_10/ApplyControl.h" - -QPID_AUTO_TEST_SUITE(VisitorTestSuite) - -using namespace qpid::amqp_0_10; - -struct GetCode : public ApplyFunctor<uint8_t> { - template <class T> uint8_t operator()(const T&) const { return T::CODE; } -}; - -struct SetChannelMax : ApplyFunctor<void> { - template <class T> void operator()(T&) const { BOOST_FAIL(""); } - void operator()(connection::Tune& t) const { t.channelMax=42; } -}; - -struct TestFunctor { - typedef bool result_type; - bool operator()(const connection::Tune& tune) { - BOOST_CHECK_EQUAL(tune.channelMax, 1u); - BOOST_CHECK_EQUAL(tune.maxFrameSize, 2u); - BOOST_CHECK_EQUAL(tune.heartbeatMin, 3u); - BOOST_CHECK_EQUAL(tune.heartbeatMax, 4u); - return true; - } - template <class T> - bool operator()(const T&) { return false; } -}; - -QPID_AUTO_TEST_CASE(testApply) { - connection::Tune tune(1,2,3,4); - Control* p = &tune; - - // boost oddity - without the cast we get undefined symbol errors. - BOOST_CHECK_EQUAL(apply(GetCode(), *p), (uint8_t)connection::Tune::CODE); - - TestFunctor tf; - BOOST_CHECK(apply(tf, *p)); - - connection::Start start; - p = &start; - BOOST_CHECK(!apply(tf, *p)); - - apply(SetChannelMax(), tune); - BOOST_CHECK_EQUAL(tune.channelMax, 42); -} - -struct VoidTestFunctor { - typedef void result_type; - - int code; - VoidTestFunctor() : code() {} - - void operator()(const connection::Tune& tune) { - BOOST_CHECK_EQUAL(tune.channelMax, 1u); - BOOST_CHECK_EQUAL(tune.maxFrameSize, 2u); - BOOST_CHECK_EQUAL(tune.heartbeatMin, 3u); - BOOST_CHECK_EQUAL(tune.heartbeatMax, 4u); - code=connection::Tune::CODE; - } - template <class T> - void operator()(const T&) { code=0xFF; } -}; - -QPID_AUTO_TEST_CASE(testApplyVoid) { - connection::Tune tune(1,2,3,4); - Control* p = &tune; - VoidTestFunctor tf; - apply(tf, *p); - BOOST_CHECK_EQUAL(uint8_t(connection::Tune::CODE), tf.code); - - connection::Start start; - p = &start; - apply(tf, *p); - BOOST_CHECK_EQUAL(0xFF, tf.code); -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/amqp_0_10/handlers.cpp b/qpid/cpp/src/tests/amqp_0_10/handlers.cpp deleted file mode 100644 index 91bb304a17..0000000000 --- a/qpid/cpp/src/tests/amqp_0_10/handlers.cpp +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "amqp_0_10/unit_test.h" -#include "qpid/Exception.h" -#include "qpid/amqp_0_10/Unit.h" -#include "qpid/amqp_0_10/ControlHolder.h" -#include "qpid/amqp_0_10/CommandHolder.h" -#include "qpid/amqp_0_10/handlers.h" -#include "qpid/amqp_0_10/specification.h" - -QPID_AUTO_TEST_SUITE(handler_tests) - -using namespace qpid::amqp_0_10; -using namespace std; - -string called; // Set by called handler function - -// Note on handlers: -// -// Control and Command handlers are separate, both behave the same way, -// so substitute "control or command" for command in the following. -// -// Command handlers derive from CommandHandler and implement functions -// for all the commands they handle. Handling an unimplemented command -// will raise NotImplementedException. -// -// Using virtual inheritance from CommandHandler allows multiple -// handlers to be aggregated into one with multiple inheritance, -// See test code for example. -// -// E.g. the existing broker model would have two control handlers: -// - ConnectionHandler: ControlHandler for connection controls. -// - SessionHandler: ControlHandler for session controls. -// It would have class-command handlers for each AMQP class: -// - QueueHandler, MessageHandler etc.. handle each class. -// And an aggregate handler in place of BrokerAdapter -// - BrokerCommandHandler: public QueueHandler, MessageHandler ... -// -// In other applications (e.g. cluster) any combination of commands -// can be handled by a given handler. It _might_ simplify the code -// to collaps ConnectionHandler and SessionHandler into a single -// ControlHandler (or it might not.) - -struct TestExecutionHandler : public virtual CommandHandler { - void executionSync() { called = "executionSync"; } - // ... etc. for all execution commands -}; - -struct TestMessageHandler : public virtual CommandHandler { - void messageCancel(const Str8&) { called="messageCancel"; } - // ... etc. -}; - -// Aggregate handler for all recognised commands. -struct TestCommandHandler : - public TestExecutionHandler, - public TestMessageHandler - // ... etc. handlers for all command classes. -{}; // Nothing to do. - - -// Sample unit handler, written as a static_visitor. -// Note it could equally be written with if/else statements -// in handle. -// -struct TestUnitHandler : public boost::static_visitor<void> { - TestCommandHandler handler; - void handle(const Unit& u) { u.applyVisitor(*this); } - - void operator()(const Body&) { called="Body"; } - void operator()(const Header&) { called="Header"; } - void operator()(const ControlHolder&) { throw qpid::Exception("I don't do controls."); } - void operator()(const CommandHolder& c) { c.invoke(handler); } -}; - -QPID_AUTO_TEST_CASE(testHandlers) { - TestUnitHandler handler; - Unit u; - - u = Body(); - handler.handle(u); - BOOST_CHECK_EQUAL("Body", called); - - u = Header(); - handler.handle(u); - BOOST_CHECK_EQUAL("Header", called); - - // in_place<Foo>(...) is equivalent to Foo(...) but - // constructs Foo directly in the holder, avoiding - // a copy. - - u = CommandHolder(in_place<execution::Sync>()); - handler.handle(u); - BOOST_CHECK_EQUAL("executionSync", called); - - u = ControlHolder(in_place<connection::Start>(Map(), Str16Array(), Str16Array())); - try { - handler.handle(u); - } catch (const qpid::Exception&) {} - - u = CommandHolder(in_place<message::Cancel>(Str8())); - handler.handle(u); - BOOST_CHECK_EQUAL("messageCancel", called); -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/amqp_0_10/serialize.cpp b/qpid/cpp/src/tests/amqp_0_10/serialize.cpp deleted file mode 100644 index 975d6206ec..0000000000 --- a/qpid/cpp/src/tests/amqp_0_10/serialize.cpp +++ /dev/null @@ -1,429 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "amqp_0_10/unit_test.h" -#include "amqp_0_10/allSegmentTypes.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/Buffer.h" - -#include "qpid/amqp_0_10/Packer.h" -#include "qpid/amqp_0_10/built_in_types.h" -#include "qpid/amqp_0_10/Codec.h" -#include "qpid/amqp_0_10/specification.h" -#include "qpid/amqp_0_10/ControlHolder.h" -#include "qpid/amqp_0_10/Struct32.h" -#include "qpid/amqp_0_10/FrameHeader.h" -#include "qpid/amqp_0_10/Map.h" -#include "qpid/amqp_0_10/Unit.h" -#include "allSegmentTypes.h" - -#include <boost/test/test_case_template.hpp> -#include <boost/type_traits/is_arithmetic.hpp> -#include <boost/utility/enable_if.hpp> -#include <boost/optional.hpp> -#include <boost/mpl/vector.hpp> -#include <boost/mpl/back_inserter.hpp> -#include <boost/mpl/copy.hpp> -#include <boost/mpl/empty_sequence.hpp> -#include <boost/current_function.hpp> -#include <iterator> -#include <string> -#include <sstream> -#include <iostream> -#include <netinet/in.h> - -// Missing operators needed for tests. -namespace boost { -template <class T, size_t N> -std::ostream& operator<<(std::ostream& out, const array<T,N>& a) { - std::ostream_iterator<T> o(out, " "); - std::copy(a.begin(), a.end(), o); - return out; -} -} // boost - -QPID_AUTO_TEST_SUITE(SerializeTestSuite) - -using namespace std; -namespace mpl=boost::mpl; -using namespace qpid::amqp_0_10; -using qpid::framing::in_place; - -template <class A, class B> struct concat2 { typedef typename mpl::copy<B, typename mpl::back_inserter<A> >::type type; }; -template <class A, class B, class C> struct concat3 { typedef typename concat2<A, typename concat2<B, C>::type>::type type; }; -template <class A, class B, class C, class D> struct concat4 { typedef typename concat2<A, typename concat3<B, C, D>::type>::type type; }; - -typedef mpl::vector<Boolean, Char, Int32, Int64, Int8, Uint16, CharUtf32, Uint32, Uint64, Bin8, Uint8>::type IntegralTypes; -typedef mpl::vector<Bin1024, Bin128, Bin16, Bin256, Bin32, Bin40, Bin512, Bin64, Bin72>::type BinTypes; -typedef mpl::vector<Double, Float>::type FloatTypes; -typedef mpl::vector<SequenceNo, Uuid, Datetime, Dec32, Dec64> FixedSizeClassTypes; -typedef mpl::vector<Map, Vbin8, Str8Latin, Str8, Str8Utf16, Vbin16, Str16Latin, Str16, Str16Utf16, Vbin32> VariableSizeTypes; - -typedef concat4<IntegralTypes, BinTypes, FloatTypes, FixedSizeClassTypes>::type FixedSizeTypes; -typedef concat2<FixedSizeTypes, VariableSizeTypes>::type AllTypes; - -// TODO aconway 2008-02-20: should test 64 bit integrals for order also. -QPID_AUTO_TEST_CASE(testNetworkByteOrder) { - string data; - - uint32_t l = 0x11223344; - Codec::encode(std::back_inserter(data))(l); - uint32_t enc=reinterpret_cast<const uint32_t&>(*data.data()); - uint32_t l2 = ntohl(enc); - BOOST_CHECK_EQUAL(l, l2); - - data.clear(); - uint16_t s = 0x1122; - Codec::encode(std::back_inserter(data))(s); - uint32_t s2 = ntohs(*reinterpret_cast<const uint32_t*>(data.data())); - BOOST_CHECK_EQUAL(s, s2); -} - -QPID_AUTO_TEST_CASE(testSetLimit) { - typedef Codec::Encoder<back_insert_iterator<string> > Encoder; - string data; - Encoder encode(back_inserter(data), 3); - encode('1')('2')('3'); - try { - encode('4'); - BOOST_FAIL("Expected exception"); - } catch (...) {} // FIXME aconway 2008-04-03: catch proper exception - BOOST_CHECK_EQUAL(data, "123"); -} - -QPID_AUTO_TEST_CASE(testScopedLimit) { - typedef Codec::Encoder<back_insert_iterator<string> > Encoder; - string data; - Encoder encode(back_inserter(data), 10); - encode(Str8("123")); // 4 bytes - { - Encoder::ScopedLimit l(encode, 3); - encode('a')('b')('c'); - try { - encode('d'); - BOOST_FAIL("Expected exception"); - } catch(...) {} // FIXME aconway 2008-04-03: catch proper exception - } - BOOST_CHECK_EQUAL(data, "\003123abc"); - encode('x')('y')('z'); - try { - encode('!'); - BOOST_FAIL("Expected exception"); - } catch(...) {} // FIXME aconway 2008-04-03: catch proper exception - BOOST_CHECK_EQUAL(data.size(), 10u); -} - -// Assign test values to the various types. -void testValue(bool& b) { b = true; } -void testValue(Bit&) { } -template <class T> typename boost::enable_if<boost::is_arithmetic<T> >::type testValue(T& n) { n=42; } -void testValue(CharUtf32& c) { c = 43; } -void testValue(long long& l) { l = 0x012345; } -void testValue(Datetime& dt) { dt = qpid::sys::now(); } -void testValue(Uuid& uuid) { uuid=Uuid(true); } -template <class E, class M> void testValue(Decimal<E,M>& d) { d.exponent=2; d.mantissa=0x1122; } -void testValue(SequenceNo& s) { s = 42; } -template <size_t N> void testValue(Bin<N>& a) { a.assign(42); } -template <class T, class S, int Unique> void testValue(SerializableString<T, S, Unique>& s) { - char msg[]="foobar"; - s.assign(msg, msg+sizeof(msg)); -} -void testValue(Str16& s) { s = "the quick brown fox jumped over the lazy dog"; } -void testValue(Str8& s) { s = "foobar"; } -void testValue(Map& m) { m["s"] = Str8("foobar"); m["b"] = true; m["c"] = uint16_t(42); } - -//typedef mpl::vector<Str8, Str16>::type TestTypes; -/*BOOST_AUTO_TEST_CASE_TEMPLATE(testEncodeDecode, T, AllTypes) -{ - string data; - T t; - testValue(t); - Codec::encode(std::back_inserter(data))(t); - - BOOST_CHECK_EQUAL(Codec::size(t), data.size()); - - T t2; - Codec::decode(data.begin())(t2); - BOOST_CHECK_EQUAL(t,t2); -} -*/ - -struct TestMe { - bool encoded, decoded; - char value; - TestMe(char v) : encoded(), decoded(), value(v) {} - template <class S> void encode(S& s) const { - const_cast<TestMe*>(this)->encoded=true; s(value); - } - template <class S> void decode(S& s) { decoded=true; s(value); } - template <class S> void serialize(S& s) { s.split(*this); } -}; - -QPID_AUTO_TEST_CASE(testSplit) { - string data; - TestMe t1('x'); - Codec::encode(std::back_inserter(data))(t1); - BOOST_CHECK(t1.encoded); - BOOST_CHECK(!t1.decoded); - BOOST_CHECK_EQUAL(data, "x"); - - TestMe t2('y'); - Codec::decode(data.begin())(t2); - BOOST_CHECK(!t2.encoded); - BOOST_CHECK(t2.decoded); - BOOST_CHECK_EQUAL(t2.value, 'x'); -} - -QPID_AUTO_TEST_CASE(testControlEncodeDecode) { - string data; - Control::Holder h(in_place<connection::Tune>(1,2,3,4)); - Codec::encode(std::back_inserter(data))(h); - - BOOST_CHECK_EQUAL(data.size(), Codec::size(h)); - - Codec::Decoder<string::iterator> decode(data.begin()); - Control::Holder h2; - decode(h2); - - BOOST_REQUIRE(h2.get()); - BOOST_CHECK_EQUAL(h2.get()->getClassCode(), connection::CODE); - BOOST_CHECK_EQUAL(h2.get()->getCode(), uint8_t(connection::Tune::CODE)); - connection::Tune& tune=static_cast<connection::Tune&>(*h2.get()); - BOOST_CHECK_EQUAL(tune.channelMax, 1u); - BOOST_CHECK_EQUAL(tune.maxFrameSize, 2u); - BOOST_CHECK_EQUAL(tune.heartbeatMin, 3u); - BOOST_CHECK_EQUAL(tune.heartbeatMax, 4u); -} - -QPID_AUTO_TEST_CASE(testStruct32) { - message::DeliveryProperties dp; - dp.priority=message::MEDIUM; - dp.routingKey="foo"; - Struct32 s(dp); - string data; - Codec::encode(back_inserter(data))(s); - - uint32_t structSize; // Starts with size - Codec::decode(data.begin())(structSize); - BOOST_CHECK_EQUAL(structSize, Codec::size(dp) + 2); // +2 for code - BOOST_CHECK_EQUAL(structSize, data.size()-4); // encoded body - - BOOST_CHECK_EQUAL(data.size(), Codec::size(s)); - Struct32 s2; - Codec::decode(data.begin())(s2); - message::DeliveryProperties* dp2 = s2.getIf<message::DeliveryProperties>(); - BOOST_REQUIRE(dp2); - BOOST_CHECK_EQUAL(dp2->priority, message::MEDIUM); - BOOST_CHECK_EQUAL(dp2->routingKey, "foo"); -} - -QPID_AUTO_TEST_CASE(testStruct32Unknown) { - // Verify we can recode an unknown struct unchanged. - Struct32 s; - string data; - Codec::encode(back_inserter(data))(uint32_t(10)); - data.append(10, 'X'); - Codec::decode(data.begin())(s); - string data2; - Codec::encode(back_inserter(data2))(s); - BOOST_CHECK_EQUAL(data.size(), data2.size()); - BOOST_CHECK_EQUAL(data, data2); -} - -struct DummyPacked { - static const uint8_t PACK=1; - boost::optional<char> i, j; - char k; - Bit l,m; - DummyPacked(char a=0, char b=0, char c=0) : i(a), j(b), k(c), l(), m() {} - template <class S> void serialize(S& s) { s(i)(j)(k)(l)(m); } -}; - -Packer<DummyPacked> serializable(DummyPacked& d) { return Packer<DummyPacked>(d); } - -QPID_AUTO_TEST_CASE(testPackBits) { - DummyPacked d('a','b','c'); - BOOST_CHECK_EQUAL(packBits(d), 7u); - d.j = boost::none; - BOOST_CHECK_EQUAL(packBits(d), 5u); - d.m = true; - BOOST_CHECK_EQUAL(packBits(d), 0x15u); -} - - -QPID_AUTO_TEST_CASE(testPacked) { - string data; - - Codec::encode(back_inserter(data))('a')(boost::optional<char>('b'))(boost::optional<char>())('c'); - BOOST_CHECK_EQUAL(data, "abc"); - data.clear(); - - DummyPacked dummy('a','b','c'); - - Codec::encode(back_inserter(data))(dummy); - BOOST_CHECK_EQUAL(data.size(), 4u); - BOOST_CHECK_EQUAL(data, string("\007abc")); - data.clear(); - - dummy.i = boost::none; - Codec::encode(back_inserter(data))(dummy); - BOOST_CHECK_EQUAL(data, string("\6bc")); - data.clear(); - - const char* missing = "\5xy"; - Codec::decode(missing)(dummy); - BOOST_CHECK(dummy.i); - BOOST_CHECK_EQUAL(*dummy.i, 'x'); - BOOST_CHECK(!dummy.j); - BOOST_CHECK_EQUAL(dummy.k, 'y'); -} - -QPID_AUTO_TEST_CASE(testUnitControl) { - string data; - Control::Holder h(in_place<connection::Tune>(1,2,3,4)); - Codec::encode(std::back_inserter(data))(h); - - Unit unit(FrameHeader(FIRST_FRAME|LAST_FRAME, CONTROL)); - Codec::decode(data.begin())(unit); - - BOOST_REQUIRE(unit.get<ControlHolder>()); - - string data2; - Codec::encode(back_inserter(data2))(unit); - - BOOST_CHECK_EQUAL(data, data2); -} - -QPID_AUTO_TEST_CASE(testArray) { - ArrayDomain<char> a; - a.resize(3, 'x'); - string data; - Codec::encode(back_inserter(data))(a); - - ArrayDomain<char> b; - Codec::decode(data.begin())(b); - BOOST_CHECK_EQUAL(b.size(), 3u); - string data3; - Codec::encode(back_inserter(data3))(a); - BOOST_CHECK_EQUAL(data, data3); - - Array x; - Codec::decode(data.begin())(x); - BOOST_CHECK_EQUAL(x.size(), 3u); - BOOST_CHECK_EQUAL(x[0].size(), 1u); - BOOST_CHECK_EQUAL(*x[0].begin(), 'x'); - BOOST_CHECK_EQUAL(*x[2].begin(), 'x'); - - string data2; - Codec::encode(back_inserter(data2))(x); - BOOST_CHECK_EQUAL(data,data2); -} - -QPID_AUTO_TEST_CASE(testStruct) { - string data; - - message::DeliveryProperties dp; - BOOST_CHECK(!dp.discardUnroutable); - dp.immediate = true; - dp.redelivered = false; - dp.priority = message::MEDIUM; - dp.exchange = "foo"; - - Codec::encode(back_inserter(data))(dp); - // Skip 4 bytes size, little-endian decode for pack bits. - uint16_t encodedBits=uint8_t(data[5]); - encodedBits <<= 8; - encodedBits += uint8_t(data[4]); - BOOST_CHECK_EQUAL(encodedBits, packBits(dp)); - - data.clear(); - Struct32 h(dp); - Codec::encode(back_inserter(data))(h); - - Struct32 h2; - Codec::decode(data.begin())(h2); - BOOST_CHECK_EQUAL(h2.getClassCode(), Uint8(message::DeliveryProperties::CLASS_CODE)); - BOOST_CHECK_EQUAL(h2.getCode(), Uint8(message::DeliveryProperties::CODE)); - message::DeliveryProperties* dp2 = - dynamic_cast<message::DeliveryProperties*>(h2.get()); - BOOST_CHECK(dp2); - BOOST_CHECK(!dp2->discardUnroutable); - BOOST_CHECK(dp2->immediate); - BOOST_CHECK(!dp2->redelivered); - BOOST_CHECK_EQUAL(dp2->priority, message::MEDIUM); - BOOST_CHECK_EQUAL(dp2->exchange, "foo"); -} - -struct RecodeUnit { - template <class T> - void operator() (const T& t) { - BOOST_MESSAGE(BOOST_CURRENT_FUNCTION << " called with: " << t); - using qpid::framing::Buffer; - using qpid::framing::AMQFrame; - - session::Header sh; - BOOST_CHECK_EQUAL(Codec::size(sh), 2u); - - // Encode unit. - Unit u(t); - string data; - Codec::encode(back_inserter(data))(u.getHeader())(u); - data.push_back(char(0xCE)); // Preview end-of-frame - - // Decode AMQFrame - Buffer buf(&data[0], data.size()); - AMQFrame f; - f.decode(buf); - BOOST_MESSAGE("AMQFrame decoded: " << f); - // Encode AMQFrame - string data2(f.size(), ' '); - Buffer buf2(&data2[0], data.size()); - f.encode(buf2); - - // Verify encoded by unit == encoded by AMQFrame - BOOST_CHECK_MESSAGE(data == data2, BOOST_CURRENT_FUNCTION); - - // Decode unit - // FIXME aconway 2008-04-15: must set limit to decode a header. - Codec::Decoder<string::iterator> decode(data2.begin(), data2.size()-1); - - FrameHeader h; - decode(h); - BOOST_CHECK_EQUAL(u.getHeader(), h); - Unit u2(h); - decode(u2); - - // Re-encode unit - string data3; - Codec::encode(back_inserter(data3))(u2.getHeader())(u2); - data3.push_back(char(0xCE)); // Preview end-of-frame - - BOOST_CHECK_MESSAGE(data3 == data2, BOOST_CURRENT_FUNCTION); - } -}; - -QPID_AUTO_TEST_CASE(testSerializeAllSegmentTypes) { - RecodeUnit recode; - allSegmentTypes(recode); -} - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/logging.cpp b/qpid/cpp/src/tests/logging.cpp index dcecf0b54c..5d5bb1feef 100644 --- a/qpid/cpp/src/tests/logging.cpp +++ b/qpid/cpp/src/tests/logging.cpp @@ -44,8 +44,10 @@ namespace tests { QPID_AUTO_TEST_SUITE(loggingTestSuite) using namespace std; -using namespace boost; using namespace qpid::log; +using boost::ends_with; +using boost::contains; +using boost::format; QPID_AUTO_TEST_CASE(testStatementInit) { Statement s=QPID_LOG_STATEMENT_INIT(debug); int line=__LINE__; diff --git a/qpid/cpp/src/tests/ssl_test b/qpid/cpp/src/tests/ssl_test index fc634ba242..ac978d3bd0 100755 --- a/qpid/cpp/src/tests/ssl_test +++ b/qpid/cpp/src/tests/ssl_test @@ -100,8 +100,10 @@ start_ssl_mux_broker() { PORTS=( ${PORTS[@]} $1 ) } +sasl_config_dir=$builddir/sasl_config + start_authenticating_broker() { - start_brokers 1 "--transport ssl --ssl-port 0 --require-encryption --ssl-sasl-no-dict --ssl-require-client-authentication --auth yes" + start_brokers 1 "--transport ssl --ssl-port 0 --require-encryption --ssl-sasl-no-dict --ssl-require-client-authentication --auth yes --sasl-config=${sasl_config_dir}" } ssl_cluster_broker() { # $1 = port diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 588b2079f2..22f9544b0c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1123,11 +1123,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } - public Object getID() - { - return _channelId; - } - public AMQConnectionModel getConnectionModel() { return _session; @@ -1377,7 +1372,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { if(_blockingQueues.remove(queue)) { - if(_blocking.compareAndSet(true,false)) + if(_blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); @@ -1627,6 +1622,6 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm public int compareTo(AMQSessionModel session) { - return getId().toString().compareTo(session.getID().toString()); + return getId().compareTo(session.getId()); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfig.java index 9256724c56..b96ddc56c6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfig.java @@ -29,6 +29,4 @@ public interface VirtualHostConfig extends ConfiguredObject<VirtualHostConfigTyp String getFederationTag(); - void setBroker(BrokerConfig brokerConfig); - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index f6980be525..1a055240b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -1315,7 +1315,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { - closeChannel((Integer)session.getID()); + int channelId = ((AMQChannel)session).getChannelId(); + closeChannel(channelId); MethodRegistry methodRegistry = getMethodRegistry(); ChannelCloseBody responseBody = @@ -1324,7 +1325,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr new AMQShortString(message), 0,0); - writeFrame(responseBody.generateFrame((Integer)session.getID())); + writeFrame(responseBody.generateFrame(channelId)); } public void close(AMQConstant cause, String message) throws AMQException diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a69f2a74ee..fa171815ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol; +import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.AMQException; @@ -35,7 +36,8 @@ import org.apache.qpid.server.queue.SimpleAMQQueue; */ public interface AMQSessionModel extends Comparable<AMQSessionModel> { - public Object getID(); + /** Unique session ID across entire broker*/ + public UUID getId(); public AMQConnectionModel getConnectionModel(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 30c2846732..cc1041d9de 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -2091,9 +2091,13 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes // Only process nodes that are not currently deleted and not dequeued if (!node.isDispensed()) { - // If the node has exired then aquire it + // If the node has exired then acquire it if (node.expired() && node.acquire()) { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing expired node " + node); + } // Then dequeue it. dequeueEntry(node); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 58fdc99dd3..db436b99e8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -114,7 +114,6 @@ public class BrokerConfigAdapter implements BrokerConfig public void addVirtualHost(final VirtualHostConfig virtualHost) { - virtualHost.setBroker(this); _vhosts.put(virtualHost.getId(), virtualHost); getConfigStore().addConfiguredObject(virtualHost); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 9f7b8c53b8..462e880e5f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -720,11 +720,6 @@ public class ServerSession extends Session close(); } - public Object getID() - { - return getName(); - } - public AMQConnectionModel getConnectionModel() { return getConnection(); @@ -854,7 +849,6 @@ public class ServerSession extends Session // unregister subscriptions in order to prevent sending of new messages // to subscriptions with closing session unregisterSubscriptions(); - super.close(); } @@ -1025,6 +1019,7 @@ public class ServerSession extends Session public int compareTo(AMQSessionModel session) { - return getId().toString().compareTo(session.getID().toString()); + return getId().compareTo(session.getId()); } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 785d4610eb..fcad1550e1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -59,7 +59,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; @@ -72,7 +71,6 @@ import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -84,8 +82,26 @@ public class VirtualHostImpl implements VirtualHost { private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); + private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; + + private final UUID _id; + private final String _name; + private final long _createTime = System.currentTimeMillis(); + + private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); + + private final ScheduledThreadPoolExecutor _houseKeepingTasks; + + private final IApplicationRegistry _appRegistry; + + private final SecurityManager _securityManager; + + private final BrokerConfig _brokerConfig; + + private final VirtualHostConfiguration _configuration; + private ConnectionRegistry _connectionRegistry; private QueueRegistry _queueRegistry; @@ -102,92 +118,23 @@ public class VirtualHostImpl implements VirtualHost private AMQBrokerManagerMBean _brokerMBean; - private SecurityManager _securityManager; - private final ScheduledThreadPoolExecutor _houseKeepingTasks; - private final IApplicationRegistry _appRegistry; - private VirtualHostConfiguration _configuration; private DurableConfigurationStore _durableConfigurationStore; private BindingFactory _bindingFactory; - private BrokerConfig _broker; - private UUID _id; private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; - private final long _createTime = System.currentTimeMillis(); - private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>(); - private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; - - - public IConnectionRegistry getConnectionRegistry() - { - return _connectionRegistry; - } - - public VirtualHostConfiguration getConfiguration() - { - return _configuration; - } - - public UUID getId() - { - return _id; - } - - public VirtualHostConfigType getConfigType() - { - return VirtualHostConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getBroker(); - } - public boolean isDurable() - { - return false; - } - - /** - * Virtual host JMX MBean class. - * - * This has some of the methods implemented from management intrerface for exchanges. Any - * implementaion of an Exchange MBean should extend this class. - */ - public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception { - public VirtualHostMBean() throws NotCompliantMBeanException - { - super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE); - } - - public String getObjectInstanceName() - { - return ObjectName.quote(_name); - } - - public String getName() + if (hostConfig == null) { - return _name; + throw new IllegalArgumentException("HostConfig cannot be null"); } - public VirtualHostImpl getVirtualHost() - { - return VirtualHostImpl.this; - } - } - - public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception - { - if (hostConfig == null) - { - throw new IllegalArgumentException("HostConfig cannot be null"); - } - _appRegistry = appRegistry; - _broker = _appRegistry.getBroker(); + _brokerConfig = _appRegistry.getBroker(); _configuration = hostConfig; _name = _configuration.getName(); _dtxRegistry = new DtxRegistry(); @@ -198,7 +145,7 @@ public class VirtualHostImpl implements VirtualHost if (_name == null || _name.length() == 0) { - throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); + throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost."); } _securityManager = new SecurityManager(_appRegistry.getSecurityManager()); @@ -238,17 +185,76 @@ public class VirtualHostImpl implements VirtualHost } else { - initialiseMessageStore(hostConfig); + initialiseMessageStore(hostConfig); } - _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod()); - + initialiseStatistics(); } + public IConnectionRegistry getConnectionRegistry() + { + return _connectionRegistry; + } + + public VirtualHostConfiguration getConfiguration() + { + return _configuration; + } + + public UUID getId() + { + return _id; + } + + public VirtualHostConfigType getConfigType() + { + return VirtualHostConfigType.getInstance(); + } + + public ConfiguredObject getParent() + { + return getBroker(); + } + + public boolean isDurable() + { + return false; + } + + /** + * Virtual host JMX MBean class. + * + * This has some of the methods implemented from management intrerface for exchanges. Any + * implementaion of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException + { + super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE); + } + + public String getObjectInstanceName() + { + return ObjectName.quote(_name); + } + + public String getName() + { + return _name; + } + + public VirtualHostImpl getVirtualHost() + { + return VirtualHostImpl.this; + } + } + + /** * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers * and checking for idle or open transactions that have exceeded the permitted thresholds. @@ -263,8 +269,7 @@ public class VirtualHostImpl implements VirtualHost scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); - Map<String, VirtualHostPluginFactory> plugins = - ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); + Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins(); if (plugins != null) { @@ -389,11 +394,9 @@ public class VirtualHostImpl implements VirtualHost { String messageStoreClass = hostConfig.getMessageStoreClass(); - Class clazz = Class.forName(messageStoreClass); + Class<?> clazz = Class.forName(messageStoreClass); Object o = clazz.newInstance(); - - if (!(o instanceof MessageStore)) { throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + @@ -435,11 +438,10 @@ public class VirtualHostImpl implements VirtualHost { _logger.debug("Loading configuration for virtualhost: " + config.getName()); - List exchangeNames = config.getExchanges(); + List<String> exchangeNames = config.getExchanges(); - for (Object exchangeNameObj : exchangeNames) + for (String exchangeName : exchangeNames) { - String exchangeName = String.valueOf(exchangeNameObj); configureExchange(config.getExchangeConfiguration(exchangeName)); } @@ -538,17 +540,12 @@ public class VirtualHostImpl implements VirtualHost public BrokerConfig getBroker() { - return _broker; + return _brokerConfig; } public String getFederationTag() { - return _broker.getFederationTag(); - } - - public void setBroker(final BrokerConfig broker) - { - _broker = broker; + return _brokerConfig.getFederationTag(); } public long getCreateTime() @@ -634,7 +631,7 @@ public class VirtualHostImpl implements VirtualHost } catch (Exception e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + _logger.error("Failed to close message store", e); } } @@ -805,39 +802,15 @@ public class VirtualHostImpl implements VirtualHost */ private static class StartupRoutingTable implements DurableConfigurationStore { - private List<Exchange> exchange = new LinkedList<Exchange>(); - private List<CreateQueueTuple> queue = new LinkedList<CreateQueueTuple>(); - private List<CreateBindingTuple> bindings = new LinkedList<CreateBindingTuple>(); - private List<BrokerLink> links = new LinkedList<BrokerLink>(); - private List<Bridge> bridges = new LinkedList<Bridge>(); - - public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception - { - } - - public void close() throws Exception - { - } - - public void removeMessage(Long messageId) throws AMQException - { - //To change body of implemented methods use File | Settings | File Templates. - } - public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config, LogSubject logSubject) throws Exception { - //To change body of implemented methods use File | Settings | File Templates. } public void createExchange(Exchange exchange) throws AMQStoreException { - if (exchange.isDurable()) - { - this.exchange.add(exchange); - } } public void removeExchange(Exchange exchange) throws AMQStoreException @@ -846,10 +819,6 @@ public class VirtualHostImpl implements VirtualHost public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException { - if (exchange.isDurable() && queue.isDurable()) - { - bindings.add(new CreateBindingTuple(exchange, routingKey, queue, args)); - } } public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException @@ -858,60 +827,22 @@ public class VirtualHostImpl implements VirtualHost public void createQueue(AMQQueue queue) throws AMQStoreException { - createQueue(queue, null); } public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException { - if (queue.isDurable()) - { - this.queue.add(new CreateQueueTuple(queue, arguments)); - } } public void removeQueue(AMQQueue queue) throws AMQStoreException { } - - private static class CreateQueueTuple - { - private AMQQueue queue; - private FieldTable arguments; - - public CreateQueueTuple(AMQQueue queue, FieldTable arguments) - { - this.queue = queue; - this.arguments = arguments; - } - } - - private static class CreateBindingTuple - { - private AMQQueue queue; - private FieldTable arguments; - private Exchange exchange; - private AMQShortString routingKey; - - public CreateBindingTuple(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) - { - this.exchange = exchange; - this.routingKey = routingKey; - this.queue = queue; - arguments = args; - } - } - public void updateQueue(AMQQueue queue) throws AMQStoreException { } public void createBrokerLink(final BrokerLink link) throws AMQStoreException { - if(link.isDurable()) - { - links.add(link); - } } public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException @@ -920,10 +851,6 @@ public class VirtualHostImpl implements VirtualHost public void createBridge(final Bridge bridge) throws AMQStoreException { - if(bridge.isDurable()) - { - bridges.add(bridge); - } } public void deleteBridge(final Bridge bridge) throws AMQStoreException diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQChannelTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQChannelTest.java new file mode 100644 index 0000000000..fc6cbcb248 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQChannelTest.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server; + +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.protocol.InternalTestProtocolSession; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class AMQChannelTest extends InternalBrokerBaseCase +{ + private VirtualHost _virtualHost; + private AMQProtocolSession _protocolSession; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); + _protocolSession = new InternalTestProtocolSession(_virtualHost); + } + + public void testCompareTo() throws Exception + { + AMQChannel channel1 = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore()); + + // create a channel with the same channelId but on a different session + AMQChannel channel2 = new AMQChannel(new InternalTestProtocolSession(_virtualHost), 1, _virtualHost.getMessageStore()); + assertFalse("Unexpected compare result", channel1.compareTo(channel2) == 0); + assertEquals("Unexpected compare result", 0, channel1.compareTo(channel1)); + } + +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java new file mode 100644 index 0000000000..d775b0f2f8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerSessionTest.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.util.UUID; + +import org.apache.qpid.server.configuration.MockConnectionConfig; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.util.InternalBrokerBaseCase; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.Binary; + +public class ServerSessionTest extends InternalBrokerBaseCase +{ + + private VirtualHost _virtualHost; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); + } + + public void testCompareTo() throws Exception + { + ServerConnection connection = new ServerConnection(1); + connection.setConnectionConfig(createConnectionConfig()); + ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(), + new Binary(getName().getBytes()), 0 , connection.getConfig()); + + // create a session with the same name but on a different connection + ServerConnection connection2 = new ServerConnection(2); + connection2.setConnectionConfig(createConnectionConfig()); + ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(), + new Binary(getName().getBytes()), 0 , connection2.getConfig()); + + assertFalse("Unexpected compare result", session1.compareTo(session2) == 0); + assertEquals("Unexpected compare result", 0, session1.compareTo(session1)); + } + + private MockConnectionConfig createConnectionConfig() + { + return new MockConnectionConfig(UUID.randomUUID(), null, null, + false, 1, _virtualHost, "address", Boolean.TRUE, Boolean.TRUE, Boolean.TRUE, + "authid", "remoteProcessName", new Integer(1967), new Integer(1970), _virtualHost.getConfigStore(), Boolean.FALSE); + } + +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index efc5982dac..d919504185 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3330,7 +3330,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { Dispatchable disp; - while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null)) + while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get()) { disp.dispatch(AMQSession.this); } diff --git a/qpid/java/jca/example/build-jboss-properties.xml b/qpid/java/jca/example/build-jboss-properties.xml index b7edf3d796..ce0cec2b35 100644 --- a/qpid/java/jca/example/build-jboss-properties.xml +++ b/qpid/java/jca/example/build-jboss-properties.xml @@ -21,6 +21,7 @@ <project name="qpid-jca-example-jboss-properties" basedir="." default=""> <property name="jndi.scheme" value="mappedName"/> + <property name="qpid.xacf.jndi.name" value="java:QpidJMSXA"/> <property name="qpid.cf.jndi.name" value="QpidConnectionFactory"/> <property name="qpid.hello.topic.jndi.name" value="HelloTopic"/> diff --git a/qpid/java/jca/example/build-jboss7-properties.xml b/qpid/java/jca/example/build-jboss7-properties.xml new file mode 100644 index 0000000000..cfbaa9b473 --- /dev/null +++ b/qpid/java/jca/example/build-jboss7-properties.xml @@ -0,0 +1,117 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - +--> +<project name="qpid-jca-example-jboss7-properties" basedir="." default=""> + + <property name="jndi.scheme" value="name"/> + + <property name="qpid.xacf.jndi.name" value="java:/QpidJMSXA"/> + <property name="qpid.cf.jndi.name" value="QpidConnectionFactory"/> + <property name="qpid.hello.topic.jndi.name" value="java:/HelloTopic"/> + <property name="qpid.goodbye.topic.jndi.name" value="java:/GoodByeTopic"/> + <property name="qpid.hello.queue.jndi.name" value="java:/HelloQueue"/> + <property name="qpid.goodbye.queue.jndi.name" value="java:/GoodByeQueue"/> + <property name="qpid.responder.queue.jndi.name" value="java:/QpidResponderQueue"/> + <property name="qpid.ejb.jndi.name" value="mappedName="QpidTestEJB""/> + <property name="qpid.ejb.ref.name" value="QpidTestBean/local"/> + <property name="qpid.ejb.name" value="qpid-jcaex/QpidTestBean/remote"/> + + <property name="jndi.context" value="org.jnp.interfaces.NamingContextFactory"/> + <property name="server.host" value="jnp://localhost:1099"/> + + + <property name="jboss.server.config" value="standalone"/> + + <property name="jboss.home" location="${env.JBOSS_HOME}"/> + <property name="jboss.server" value="${jboss.server.config}"/> + <property name="jboss.deploy" location="${jboss.home}/${jboss.server}/deployments"/> + <property name="jboss.modules" location="${jboss.home}/modules"/> + <property name="jboss.config.dir" location="${jboss.home}/${jboss.server}/configuration/"/> + + <path id="compile.classpath"> + <fileset dir="${jboss.modules}/javax/jms/api/main"> + <include name="jboss-jms-api_1.1_spec-1.0.0.Final.jar"/> + </fileset> + <fileset dir="${jboss.modules}/javax/ejb/api/main"> + <include name="jboss-ejb-api_3.1_spec-1.0.1.Final.jar"/> + </fileset> + <fileset dir="${jboss.modules}/javax/servlet/api/main"> + <include name="jboss-servlet-api_3.0_spec-1.0.0.Final.jar"/> + </fileset> + <fileset dir="${jboss.modules}/javax/transaction/api/main"> + <include name="jboss-transaction-api_1.1_spec-1.0.0.Final.jar"/> + </fileset> + <fileset dir="${jboss.modules}/org/slf4j/main"> + <include name="slf4j-api-1.6.1.jar"/> + </fileset> + </path> + + <path id="run.classpath"> + <fileset dir="${lib.dir}"> + <include name="qpid-ra-*.jar"/> + <include name="qpid-client-*.jar"/> + <include name="qpid-common-*.jar"/> + </fileset> + <fileset dir="${jboss.client}"> + <!-- Shortcut to get it working!--> + <include name="jbossall-client.jar"/> + </fileset> + </path> + + <filterset id="extra.filterset"/> + + <!-- Deployment is target specific so is included here --> + <target name="deploy-rar" description="Deploy the RAR file."> + <copy todir="${jboss.deploy}" overwrite="true"> + <fileset dir="${qpid.jca.dir}"> + <include name="${rar.name}"/> + </fileset> + </copy> + </target> + + <target name="undeploy-rar" description="Undeploys the RAR deployment."> + <delete file="${jboss.deploy}/${rar.name}"/> + </target> + + <target name="deploy-ear" depends="package-ear" description="Deploys the EAR archive."> + <copy todir="${jboss.deploy}" overwrite="true"> + <fileset dir="${build.dir}"> + <include name="${ear.name}"/> + </fileset> + </copy> + </target> + + <target name="undeploy-ear" description="Undeploys the EAR archive."> + <delete file="${jboss.deploy}/${ear.name}"/> + </target> + + <target name="deploy-config" depends="generate" description="Deploys the standalone file to the JBoss environment."> + <copy todir="${jboss.config.dir}" overwrite="true"> + <fileset dir="${gen.dir}"> + <include name="${jboss.server.config}.xml"/> + </fileset> + </copy> + </target> + + <target name="undeploy-ds" description="Undeploys the ds.xml file from the JBoss environment."> + <delete file="${jboss.deploy}/qpid-jca-ds.xml"/> + </target> + +</project> diff --git a/qpid/java/jca/example/conf/standalone.xml b/qpid/java/jca/example/conf/standalone.xml new file mode 100644 index 0000000000..5d4256f8e7 --- /dev/null +++ b/qpid/java/jca/example/conf/standalone.xml @@ -0,0 +1,417 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - +--> +<?xml version='1.0' encoding='UTF-8'?> + +<server xmlns="urn:jboss:domain:1.1" xmlns:xsd="http://www.w3.org/2001/XMLSchema-instance"> + + <extensions> + <extension module="org.jboss.as.clustering.infinispan"/> + <extension module="org.jboss.as.connector"/> + <extension module="org.jboss.as.configadmin"/> + <extension module="org.jboss.as.deployment-scanner"/> + <extension module="org.jboss.as.ee"/> + <extension module="org.jboss.as.ejb3"/> + <extension module="org.jboss.as.jaxr"/> + <extension module="org.jboss.as.jaxrs"/> + <extension module="org.jboss.as.jdr"/> + <extension module="org.jboss.as.jmx"/> + <extension module="org.jboss.as.jpa"/> + <extension module="org.jboss.as.logging"/> + <extension module="org.jboss.as.mail"/> + <extension module="org.jboss.as.naming"/> + <extension module="org.jboss.as.osgi"/> + <extension module="org.jboss.as.pojo"/> + <extension module="org.jboss.as.remoting"/> + <extension module="org.jboss.as.sar"/> + <extension module="org.jboss.as.security"/> + <extension module="org.jboss.as.threads"/> + <extension module="org.jboss.as.transactions"/> + <extension module="org.jboss.as.web"/> + <extension module="org.jboss.as.webservices"/> + <extension module="org.jboss.as.weld"/> + </extensions> + + <management> + <security-realms> + <security-realm name="ManagementRealm"> + <authentication> + <properties path="mgmt-users.properties" relative-to="jboss.server.config.dir"/> + </authentication> + </security-realm> + </security-realms> + <management-interfaces> + <native-interface security-realm="ManagementRealm"> + <socket-binding native="management-native"/> + </native-interface> + <http-interface security-realm="ManagementRealm"> + <socket-binding http="management-http"/> + </http-interface> + </management-interfaces> + </management> + + <profile> + <subsystem xmlns="urn:jboss:domain:logging:1.1"> + <console-handler name="CONSOLE"> + <level name="INFO"/> + <formatter> + <pattern-formatter pattern="%d{HH:mm:ss,SSS} %-5p [%c] (%t) %s%E%n"/> + </formatter> + </console-handler> + <periodic-rotating-file-handler name="FILE"> + <formatter> + <pattern-formatter pattern="%d{HH:mm:ss,SSS} %-5p [%c] (%t) %s%E%n"/> + </formatter> + <file relative-to="jboss.server.log.dir" path="server.log"/> + <suffix value=".yyyy-MM-dd"/> + <append value="true"/> + </periodic-rotating-file-handler> + <logger category="com.arjuna"> + <level name="WARN"/> + </logger> + <logger category="org.apache.tomcat.util.modeler"> + <level name="WARN"/> + </logger> + <logger category="sun.rmi"> + <level name="WARN"/> + </logger> + <logger category="jacorb"> + <level name="WARN"/> + </logger> + <logger category="jacorb.config"> + <level name="ERROR"/> + </logger> + <root-logger> + <level name="DEBUG"/> + <handlers> + <handler name="CONSOLE"/> + <handler name="FILE"/> + </handlers> + </root-logger> + </subsystem> + <subsystem xmlns="urn:jboss:domain:configadmin:1.0"/> + <subsystem xmlns="urn:jboss:domain:datasources:1.0"> + <datasources> + <datasource jndi-name="java:jboss/datasources/ExampleDS" pool-name="ExampleDS" enabled="true" use-java-context="true"> + <connection-url> + jdbc:h2:mem:test;DB_CLOSE_DELAY=-1 + </connection-url> + <driver> + h2 + </driver> + <security> + <user-name> + sa + </user-name> + <password> + sa + </password> + </security> + </datasource> + <drivers> + <driver name="h2" module="com.h2database.h2"> + <xa-datasource-class> + org.h2.jdbcx.JdbcDataSource + </xa-datasource-class> + </driver> + </drivers> + </datasources> + </subsystem> + <subsystem xmlns="urn:jboss:domain:deployment-scanner:1.1"> + <deployment-scanner name="default" path="deployments" scan-interval="5000" relative-to="jboss.server.base.dir"/> + </subsystem> + <subsystem xmlns="urn:jboss:domain:ee:1.0"/> + <subsystem xmlns="urn:jboss:domain:ejb3:1.2"> + <session-bean> + <stateless> + <bean-instance-pool-ref pool-name="slsb-strict-max-pool"/> + </stateless> + <stateful default-access-timeout="5000" cache-ref="simple"/> + <singleton default-access-timeout="5000"/> + </session-bean> + <mdb> + <resource-adapter-ref resource-adapter-name="@rar.name@"/> + <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/> + </mdb> + <pools> + <bean-instance-pools> + <strict-max-pool name="slsb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/> + <strict-max-pool name="mdb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/> + </bean-instance-pools> + </pools> + <caches> + <cache name="simple" aliases="NoPassivationCache"/> + <cache name="passivating" passivation-store-ref="file" aliases="SimpleStatefulCache"/> + </caches> + <passivation-stores> + <file-passivation-store name="file"/> + </passivation-stores> + <async thread-pool-name="default"/> + <timer-service thread-pool-name="default"> + <data-store path="timer-service-data" relative-to="jboss.server.data.dir"/> + </timer-service> + <remote connector-ref="remoting-connector" thread-pool-name="default"/> + <thread-pools> + <thread-pool name="default"> + <max-threads count="10"/> + <keepalive-time time="100" unit="milliseconds"/> + </thread-pool> + </thread-pools> + </subsystem> + <subsystem xmlns="urn:jboss:domain:infinispan:1.1" default-cache-container="hibernate"> + <cache-container name="hibernate" default-cache="local-query"> + <local-cache name="entity"> + <transaction mode="NON_XA"/> + <eviction strategy="LRU" max-entries="10000"/> + <expiration max-idle="100000"/> + </local-cache> + <local-cache name="local-query"> + <transaction mode="NONE"/> + <eviction strategy="LRU" max-entries="10000"/> + <expiration max-idle="100000"/> + </local-cache> + <local-cache name="timestamps"> + <transaction mode="NONE"/> + <eviction strategy="NONE"/> + </local-cache> + </cache-container> + </subsystem> + <subsystem xmlns="urn:jboss:domain:jaxr:1.0"> + <connection-factory jndi-name="java:jboss/jaxr/ConnectionFactory"/> + <juddi-server publish-url="http://localhost:8080/juddi/publish" query-url="http://localhost:8080/juddi/query"/> + </subsystem> + <subsystem xmlns="urn:jboss:domain:jaxrs:1.0"/> + <subsystem xmlns="urn:jboss:domain:jca:1.1"> + <archive-validation enabled="false"/> + <bean-validation enabled="false"/> + <default-workmanager> + <short-running-threads> + <core-threads count="50"/> + <queue-length count="50"/> + <max-threads count="50"/> + <keepalive-time time="10" unit="seconds"/> + </short-running-threads> + <long-running-threads> + <core-threads count="50"/> + <queue-length count="50"/> + <max-threads count="50"/> + <keepalive-time time="10" unit="seconds"/> + </long-running-threads> + </default-workmanager> + </subsystem> + <subsystem xmlns="urn:jboss:domain:jdr:1.0"/> + <subsystem xmlns="urn:jboss:domain:jmx:1.1"> + <show-model value="true"/> + <remoting-connector/> + </subsystem> + <subsystem xmlns="urn:jboss:domain:jpa:1.0"> + <jpa default-datasource=""/> + </subsystem> + <subsystem xmlns="urn:jboss:domain:mail:1.0"> + <mail-session jndi-name="java:jboss/mail/Default"> + <smtp-server outbound-socket-binding-ref="mail-smtp"/> + </mail-session> + </subsystem> + <subsystem xmlns="urn:jboss:domain:naming:1.1"/> + <subsystem xmlns="urn:jboss:domain:osgi:1.2" activation="lazy"> + <properties> + <property name="org.osgi.framework.startlevel.beginning"> + 1 + </property> + </properties> + <capabilities> + <capability name="javax.servlet.api"/> + <capability name="javax.transaction.api"/> + <capability name="org.apache.felix.log" startlevel="1"/> + <capability name="org.jboss.osgi.logging" startlevel="1"/> + <capability name="org.apache.felix.configadmin" startlevel="1"/> + <capability name="org.jboss.as.osgi.configadmin" startlevel="1"/> + <capability name="org.jboss.osgi.repository" startlevel="1"/> + </capabilities> + </subsystem> + <subsystem xmlns="urn:jboss:domain:pojo:1.0"/> + <subsystem xmlns="urn:jboss:domain:remoting:1.1"> + <connector name="remoting-connector" socket-binding="remoting"/> + </subsystem> + <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0"> + <resource-adapters> + <resource-adapter> + <archive> + @rar.name@ + </archive> + <transaction-support> + XATransaction + </transaction-support> + <config-property name="connectionURL"> + @broker.url@ + </config-property> + <config-property name="TransactionManagerLocatorClass"> + org.apache.qpid.ra.tm.JBoss7TransactionManagerLocator + </config-property> + <config-property name="TransactionManagerLocatorMethod"> + getTm + </config-property> + <connection-definitions> + <connection-definition class-name="org.apache.qpid.ra.QpidRAManagedConnectionFactory" jndi-name="QpidJMSXA" pool-name="QpidJMSXA"> + <config-property name="connectionURL"> + @broker.url@ + </config-property> + <config-property name="SessionDefaultType"> + javax.jms.Queue + </config-property> + </connection-definition> + </connection-definitions> + <admin-objects> + <admin-object class-name="org.apache.qpid.ra.admin.QpidConnectionFactoryProxy" jndi-name="QpidJMS" use-java-context="false"> + <config-property name="ConnectionURL"> + @broker.url@ + </config-property> + </admin-object> + <admin-object class-name="org.apache.qpid.ra.admin.QpidTopicImpl" jndi-name="GoodByeTopic" use-java-context="false" pool-name="GoodByeTopic"> + <config-property name="destinationAddress"> + @qpid.hello.topic.dest.address@ + </config-property> + </admin-object> + <admin-object class-name="org.apache.qpid.ra.admin.QpidTopicImpl" jndi-name="HelloTopic" use-java-context="false" pool-name="HelloTopic"> + <config-property name="destinationAddress"> + @qpid.goodbye.topic.dest.address@ + </config-property> + </admin-object> + <admin-object class-name="org.apache.qpid.ra.admin.QpidQueueImpl" jndi-name="GoodByeQueue" use-java-context="false" pool-name="GoodByeQueue"> + <config-property name="destinationAddress"> + @qpid.goodbye.queue.dest.address@ + </config-property> + </admin-object> + <admin-object class-name="org.apache.qpid.ra.admin.QpidQueueImpl" jndi-name="HelloQueue" use-java-context="false" pool-name="HelloQueue"> + <config-property name="destinationAddress"> + @qpid.hello.queue.dest.address@ + </config-property> + </admin-object> + <admin-object class-name="org.apache.qpid.ra.admin.QpidQueueImpl" jndi-name="QpidResponderQueue" use-java-context="false" pool-name="QpidResponderQueue"> + <config-property name="destinationAddress"> + @qpid.responder.queue.dest.address@ + </config-property> + </admin-object> + </admin-objects> + </resource-adapter> + </resource-adapters> + </subsystem> + <subsystem xmlns="urn:jboss:domain:sar:1.0"/> + <subsystem xmlns="urn:jboss:domain:security:1.1"> + <security-domains> + <security-domain name="other" cache-type="default"> + <authentication> + <login-module code="UsersRoles" flag="required"/> + </authentication> + </security-domain> + <security-domain name="jboss-web-policy" cache-type="default"> + <authorization> + <policy-module code="Delegating" flag="required"/> + </authorization> + </security-domain> + <security-domain name="jboss-ejb-policy" cache-type="default"> + <authorization> + <policy-module code="Delegating" flag="required"/> + </authorization> + </security-domain> + </security-domains> + </subsystem> + <subsystem xmlns="urn:jboss:domain:threads:1.1"/> + <subsystem xmlns="urn:jboss:domain:transactions:1.1"> + <core-environment> + <process-id> + <uuid/> + </process-id> + </core-environment> + <recovery-environment socket-binding="txn-recovery-environment" status-socket-binding="txn-status-manager"/> + <coordinator-environment default-timeout="300"/> + </subsystem> + <subsystem xmlns="urn:jboss:domain:web:1.1" default-virtual-server="default-host"> + <connector name="http" protocol="HTTP/1.1" scheme="http" socket-binding="http"/> + <virtual-server name="default-host" enable-welcome-root="true"> + <alias name="localhost"/> + <alias name="example.com"/> + </virtual-server> + </subsystem> + <subsystem xmlns="urn:jboss:domain:webservices:1.0" xmlns:javaee="http://java.sun.com/xml/ns/javaee" xmlns:jaxwsconfig="urn:jboss:jbossws-jaxws-config:4.0"> + <modify-wsdl-address> + true + </modify-wsdl-address> + <wsdl-host> + ${jboss.bind.address:127.0.0.1} + </wsdl-host> + <endpoint-config> + <jaxwsconfig:config-name> + Standard-Endpoint-Config + </jaxwsconfig:config-name> + </endpoint-config> + <endpoint-config> + <jaxwsconfig:config-name> + Recording-Endpoint-Config + </jaxwsconfig:config-name> + <jaxwsconfig:pre-handler-chains> + <javaee:handler-chain id="recording-handlers"> + <javaee:protocol-bindings> + ##SOAP11_HTTP ##SOAP11_HTTP_MTOM ##SOAP12_HTTP ##SOAP12_HTTP_MTOM + </javaee:protocol-bindings> + <javaee:handler> + <javaee:handler-name> + RecordingHandler + </javaee:handler-name> + <javaee:handler-class> + org.jboss.ws.common.invocation.RecordingServerHandler + </javaee:handler-class> + </javaee:handler> + </javaee:handler-chain> + </jaxwsconfig:pre-handler-chains> + </endpoint-config> + </subsystem> + <subsystem xmlns="urn:jboss:domain:weld:1.0"/> + </profile> + + <interfaces> + <interface name="management"> + <inet-address value="${jboss.bind.address.management:127.0.0.1}"/> + </interface> + <interface name="public"> + <inet-address value="${jboss.bind.address:127.0.0.1}"/> + </interface> + </interfaces> + + <socket-binding-group name="standard-sockets" default-interface="public" port-offset="${jboss.socket.binding.port-offset:0}"> + <socket-binding name="http" port="8080"/> + <socket-binding name="https" port="8443"/> + <socket-binding name="jacorb" port="3528"/> + <socket-binding name="jacorb-ssl" port="3529"/> + <socket-binding name="management-native" interface="management" port="${jboss.management.native.port:9999}"/> + <socket-binding name="management-http" interface="management" port="${jboss.management.http.port:9990}"/> + <socket-binding name="messaging" port="5445"/> + <socket-binding name="messaging-throughput" port="5455"/> + <socket-binding name="osgi-http" interface="management" port="8090"/> + <socket-binding name="remoting" port="4447"/> + <socket-binding name="txn-recovery-environment" port="4712"/> + <socket-binding name="txn-status-manager" port="4713"/> + <outbound-socket-binding name="mail-smtp"> + <remote-destination host="localhost" port="25"/> + </outbound-socket-binding> + </socket-binding-group> + + +</server> diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java index 37b5ffbc76..7cb7095ff4 100644 --- a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java @@ -53,7 +53,7 @@ public class QpidHelloListenerBean implements MessageListener @Resource(@jndi.scheme@="@qpid.xacf.jndi.name@") private ConnectionFactory _connectionFactory; - @Resource(@jndi.scheme@="GoodByeQueue") + @Resource(@jndi.scheme@="@qpid.goodbye.queue.jndi.name@") private Destination _queue; @Override diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java index 0d87cb6955..768cf25c3c 100644 --- a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java @@ -54,7 +54,7 @@ public class QpidHelloSubscriberBean implements MessageListener @Resource(@jndi.scheme@="@qpid.xacf.jndi.name@") private ConnectionFactory _connectionFactory; - @Resource(@jndi.scheme@="GoodByeTopic") + @Resource(@jndi.scheme@="@qpid.goodbye.topic.jndi.name@") private Destination _topic; @Override diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java index 07c3e38f60..7526daa83d 100644 --- a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java @@ -58,10 +58,10 @@ public class QpidTestServlet extends HttpServlet @Resource(@jndi.scheme@="@qpid.xacf.jndi.name@") private ConnectionFactory _connectionFactory; - @Resource(@jndi.scheme@="HelloQueue") + @Resource(@jndi.scheme@="@qpid.hello.queue.jndi.name@") private Destination _queue; - @Resource(@jndi.scheme@="HelloTopic") + @Resource(@jndi.scheme@="@qpid.hello.topic.jndi.name@") private Destination _topic; @EJB diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java new file mode 100644 index 0000000000..09402c140d --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.LogMonitor; + +/** + * Tests the behaviour of JMS asynchronous message listeners as provided by + * {@link MessageListener#onMessage(Message)}. + * + */ +public class AsynchMessageListenerTest extends QpidBrokerTestCase +{ + private static final int MSG_COUNT = 10; + private static final long AWAIT_MESSAGE_TIMEOUT = 2000; + private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250; + private final String _testQueueName = getTestQueueName(); + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _queue = _consumerSession.createQueue(_testQueueName); + _consumer = _consumerSession.createConsumer(_queue); + + // Populate queue + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(producerSession, _queue, MSG_COUNT); + producerConnection.close(); + + } + + public void testMessageListener() throws Exception + { + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount()); + } + + public void testSynchronousReceiveFollowedByMessageListener() throws Exception + { + // Receive initial message synchronously + assertNotNull("Could not receive first message synchronously", _consumer.receive(AWAIT_MESSAGE_TIMEOUT) != null); + final int numberOfMessagesToReceiveByMessageListener = MSG_COUNT - 1; + + // Consume remainder asynchronously + CountingMessageListener countingMessageListener = new CountingMessageListener(numberOfMessagesToReceiveByMessageListener); + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount()); + } + + public void testMessageListenerSetDisallowsSynchronousReceive() throws Exception + { + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + + try + { + _consumer.receive(); + fail("Exception not thrown"); + } + catch (JMSException e) + { + // PASS + assertEquals("A listener has already been set.", e.getMessage()); + } + } + + + public void testConnectionStopThenStart() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume at least two messages + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerConnection.stop(); + + assertTrue("Too few messages received afer Connection#stop()", countingMessageListener.getReceivedCount() >= messageToReceivedBeforeConnectionStop); + countingMessageListener.resetLatch(); + + // Restart connection + _consumerConnection.start(); + + // Consume the remainder + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount()); + } + + public void testConnectionStopAndMessageListenerChange() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener1 = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume remainder asynchronously + _consumer.setMessageListener(countingMessageListener1); + countingMessageListener1.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerConnection.stop(); + assertTrue("Too few messages received afer Connection#stop()", countingMessageListener1.getReceivedCount() >= messageToReceivedBeforeConnectionStop); + + CountingMessageListener countingMessageListener2 = new CountingMessageListener(countingMessageListener1.getOutstandingCount()); + + // Reset Message Listener + _consumer.setMessageListener(countingMessageListener2); + + _consumerConnection.start(); + + // Consume the remainder + countingMessageListener2.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener2.getOutstandingCount()); + + } + + public void testConnectionStopHaltsDeliveryToListener() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume at least two messages + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerConnection.stop(); + + // Connection should now be stopped and listener should receive no more + final int outstandingCountAtStop = countingMessageListener.getOutstandingCount(); + countingMessageListener.resetLatch(); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT_NEGATIVE); + + assertEquals("Unexpected number of outstanding messages", outstandingCountAtStop, countingMessageListener.getOutstandingCount()); + } + + public void testSessionCloseHaltsDelivery() throws Exception + { + int messageToReceivedBeforeConnectionStop = 2; + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop); + + // Consume at least two messages + _consumer.setMessageListener(countingMessageListener); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + _consumerSession.close(); + + // Once a session is closed, the listener should receive no more + final int outstandingCountAtClose = countingMessageListener.getOutstandingCount(); + countingMessageListener.resetLatch(); + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT_NEGATIVE); + + assertEquals("Unexpected number of outstanding messages", outstandingCountAtClose, countingMessageListener.getOutstandingCount()); + } + + public void testImmediatePrefetchWithMessageListener() throws Exception + { + // Close connection provided by setup so we can set IMMEDIATE_PREFETCH + _consumerConnection.close(); + setTestClientSystemProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _consumerSession.createConsumer(_queue); + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + + assertEquals("Unexpected number of messages received", MSG_COUNT, countingMessageListener.getReceivedCount()); + } + + public void testReceiveTwoConsumers() throws Exception + { + Session consumerSession2 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer _consumer2 = consumerSession2.createConsumer(_queue); + + CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); + _consumer.setMessageListener(countingMessageListener); + _consumer2.setMessageListener(countingMessageListener); + + countingMessageListener.awaitMessages(AWAIT_MESSAGE_TIMEOUT); + assertEquals("Unexpected number of messages received", MSG_COUNT, countingMessageListener.getReceivedCount()); + } + + /** + * Tests the case where the message listener throws an java.lang.Error. + * TODO - a useful test?. + */ + public void testMessageListenerThrowsError() throws Exception + { + int expectedMessages = 1; // The error will kill the dispatcher so only one message will be delivered. + final CountDownLatch awaitMessages = new CountDownLatch(expectedMessages); + final AtomicInteger receivedCount = new AtomicInteger(0); + final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error"; + CountingExceptionListener countingExceptionListener = new CountingExceptionListener(); + _consumerConnection.setExceptionListener(countingExceptionListener); + + _consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message message) + { + try + { + throw new Error(javaLangErrorMessageText); + } + finally + { + receivedCount.incrementAndGet(); + awaitMessages.countDown(); + } + } + }); + + awaitMessages.await(AWAIT_MESSAGE_TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals("Unexpected number of messages received", expectedMessages, receivedCount.get()); + assertEquals("onException should NOT have been called", 0, countingExceptionListener.getErrorCount()); + + // Check that Error has been written to the application log. + + LogMonitor _monitor = new LogMonitor(_outputFile); + assertTrue("The expected message not written to log file.", + _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT)); + + if (_consumerConnection != null) + { + try + { + _consumerConnection.close(); + } + catch (JMSException e) + { + // Ignore connection close errors for this test. + } + finally + { + _consumerConnection = null; + } + } + } + + private final class CountingExceptionListener implements ExceptionListener + { + private final AtomicInteger _errorCount = new AtomicInteger(); + + @Override + public void onException(JMSException arg0) + { + _errorCount.incrementAndGet(); + } + + public int getErrorCount() + { + return _errorCount.intValue(); + } + } + + private final class CountingMessageListener implements MessageListener + { + private volatile CountDownLatch _awaitMessages; + private final AtomicInteger _receivedCount; + private final AtomicInteger _outstandingMessageCount; + + public CountingMessageListener(final int totalExpectedMessageCount) + { + this(totalExpectedMessageCount, totalExpectedMessageCount); + } + + + public CountingMessageListener(int totalExpectedMessageCount, int numberOfMessagesToAwait) + { + _receivedCount = new AtomicInteger(0); + _outstandingMessageCount = new AtomicInteger(totalExpectedMessageCount); + _awaitMessages = new CountDownLatch(numberOfMessagesToAwait); + } + + public int getOutstandingCount() + { + return _outstandingMessageCount.get(); + } + + public int getReceivedCount() + { + return _receivedCount.get(); + } + + public void resetLatch() + { + _awaitMessages = new CountDownLatch(_outstandingMessageCount.get()); + } + + @Override + public void onMessage(Message message) + { + _receivedCount.incrementAndGet(); + _outstandingMessageCount.decrementAndGet(); + _awaitMessages.countDown(); + } + + public boolean awaitMessages(long timeout) + { + try + { + return _awaitMessages.await(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + } + } + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java deleted file mode 100644 index 3537dd0533..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/DispatcherTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery queue - * <p/> - * The message delivery process: - * Mina puts a message on _queue in AMQSession and the dispatcher thread take()s - * from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at connection start - * then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple consumers on a - * session can run in any order and a synchronous put/poll will block the dispatcher). - * <p/> - * When setting the message listener later the _synchronousQueue is just poll()'ed and the first message delivered - * the remaining messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class DispatcherTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(DispatcherTest.class); - - private Context _context; - - private static final int MSG_COUNT = 6; - private int _receivedCount = 0; - private int _receivedCountWhileStopped = 0; - private Connection _clientConnection, _producerConnection; - private MessageConsumer _consumer; - private MessageProducer _producer; - private Session _clientSession, _producerSession; - - private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(1); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(1); // all messages Sent Lock - - private volatile boolean _connectionStopped = false; - - protected void setUp() throws Exception - { - super.setUp(); - - // Create Client 1 - _clientConnection = getConnection(); - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queue = _clientSession.createQueue(this.getClass().getName()); - _consumer = _clientSession.createConsumer(queue); - - // Create Producer - _producerConnection = getConnection(); - - _producerConnection.start(); - - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _producer = _producerSession.createProducer(queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(_producerSession.createTextMessage("Message " + msg)); - } - } - - protected void tearDown() throws Exception - { - - _clientConnection.close(); - - _producerConnection.close(); - super.tearDown(); - } - - public void testAsynchronousRecieve() - { - _logger.info("Test Start"); - - assertTrue(!((AMQConnection) _clientConnection).started()); - - // Set default Message Listener - try - { - _consumer.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 1 ML 1 Received Message(" + _receivedCount + "):" + message); - - _receivedCount++; - - if (_receivedCount == MSG_COUNT) - { - _allFirstMessagesSent.countDown(); - } - - if (_connectionStopped) - { - _logger.info("Running with Message:" + _receivedCount); - } - - if (_connectionStopped && (_allFirstMessagesSent.getCount() == 0)) - { - _receivedCountWhileStopped++; - } - - if (_allFirstMessagesSent.getCount() == 0) - { - if (_receivedCount == (MSG_COUNT * 2)) - { - _allSecondMessagesSent.countDown(); - } - } - } - }); - - assertTrue("Connecion should not be started", !((AMQConnection) _clientConnection).started()); - _clientConnection.start(); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer1"); - } - - try - { - _allFirstMessagesSent.await(1000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - - try - { - assertTrue("Connecion should be started", ((AMQConnection) _clientConnection).started()); - _clientConnection.stop(); - _connectionStopped = true; - } - catch (JMSException e) - { - _logger.error("Error stopping connection"); - } - - try - { - _logger.error("Send additional messages"); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(_producerSession.createTextMessage("Message " + msg)); - } - } - catch (JMSException e) - { - _logger.error("Unable to send additional messages", e); - } - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) - { - // ignore - } - - try - { - _logger.info("Restarting connection"); - - _connectionStopped = false; - _clientConnection.start(); - } - catch (JMSException e) - { - _logger.error("Error Setting Better ML on consumer1", e); - } - - _logger.info("Waiting upto 2 seconds for messages"); - - try - { - _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - - assertEquals("Messages not received correctly", 0, _allFirstMessagesSent.getCount()); - assertEquals("Messages not received correctly", 0, _allSecondMessagesSent.getCount()); - assertEquals("Client didn't get all messages", MSG_COUNT * 2, _receivedCount); - assertEquals("Messages received while stopped is not 0", 0, _receivedCountWhileStopped); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(DispatcherTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java deleted file mode 100644 index 7461f6c200..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerImmediatePrefetch.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class MessageListenerMultiConsumerImmediatePrefetch extends MessageListenerMultiConsumerTest -{ - protected void setUp() throws Exception - { - System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true"); - super.setUp(); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MessageListenerMultiConsumerImmediatePrefetch.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java deleted file mode 100644 index 4fd10a0134..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class MessageListenerMultiConsumerTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageListenerMultiConsumerTest.class); - - private Context _context; - - private static final int MSG_COUNT = 6; - private int receivedCount1 = 0; - private int receivedCount2 = 0; - private Connection _clientConnection; - private MessageConsumer _consumer1; - private MessageConsumer _consumer2; - private Session _clientSession1; - private Queue _queue; - private final CountDownLatch _allMessagesSent = new CountDownLatch(2); // all messages Sent Lock - private static final String QUEUE_NAME = "queue" + UUID.randomUUID().toString(); - - protected void setUp() throws Exception - { - super.setUp(); - - // Create Client 1 - _clientConnection = getConnection("guest", "guest"); - - _clientConnection.start(); - - _clientSession1 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _queue =_clientSession1.createQueue(QUEUE_NAME); - - _consumer1 = _clientSession1.createConsumer(_queue); - - // Create Client 2 - Session clientSession2 = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _consumer2 = clientSession2.createConsumer(_queue); - - // Create Producer - Connection producerConnection = getConnection("guest", "guest"); - - producerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - producer.send(producerSession.createTextMessage("Message " + msg)); - } - - producerConnection.close(); - - } - - protected void tearDown() throws Exception - { - _clientConnection.close(); - super.tearDown(); - } - - public void testRecieveInterleaved() throws Exception - { - int msg = 0; - int MAX_LOOPS = MSG_COUNT * 2; - for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++) - { - - if (_consumer1.receive(1000) != null) - { - msg++; - } - - if (_consumer2.receive(1000) != null) - { - msg++; - } - } - - assertEquals("Not all messages received.", MSG_COUNT, msg); - } - - public void testAsynchronousRecieve() throws Exception - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 1 Received Message(" + receivedCount1 + "):" + message); - - receivedCount1++; - - if (receivedCount1 == (MSG_COUNT / 2)) - { - _allMessagesSent.countDown(); - } - - } - }); - - _consumer2.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - _logger.info("Client 2 Received Message(" + receivedCount2 + "):" + message); - - receivedCount2++; - if (receivedCount2 == (MSG_COUNT / 2)) - { - _allMessagesSent.countDown(); - } - } - }); - - _logger.info("Waiting upto 2 seconds for messages"); - - try - { - _allMessagesSent.await(4000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - - assertEquals(MSG_COUNT, receivedCount1 + receivedCount2); - } - - public void testRecieveC2Only() throws Exception - { - if ( - !Boolean.parseBoolean( - System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH, - AMQSession.IMMEDIATE_PREFETCH_DEFAULT))) - { - _logger.info("Performing Receive only on C2"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - assertTrue(MSG_COUNT + " msg should be received. Only received:" + msg, _consumer2.receive(1000) != null); - } - } - } - - public void testRecieveBoth() throws Exception - { - if ( - !Boolean.parseBoolean( - System.getProperties().getProperty(AMQSession.IMMEDIATE_PREFETCH, - AMQSession.IMMEDIATE_PREFETCH_DEFAULT))) - { - _logger.info("Performing Receive only with two consumers on one session "); - - //Create a new consumer on session one that we don't use - _clientSession1.createConsumer(_queue); - - int msg; - for (msg = 0; msg < (MSG_COUNT / 2); msg++) - { - - // Attempt to receive up to half the messages - // The other half may have gone to the consumer above - final Message message = _consumer1.receive(1000); - if(message == null) - { - break; - } - - } - - _consumer1.close(); - // This will close the unused consumer above. - _clientSession1.close(); - - - // msg will now have recorded the number received on session 1 - // attempt to retrieve the rest on session 2 - for (; msg < MSG_COUNT ; msg++) - { - assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null); - } - - } - else - { - _logger.info("Performing Receive only on both C1 and C2"); - - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) - { - - assertTrue(_consumer1.receive(3000) != null); - } - - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) - { - assertTrue(_consumer2.receive(3000) != null); - } - } - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MessageListenerMultiConsumerTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java deleted file mode 100644 index 142f301bd0..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.util.LogMonitor; - -import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.naming.Context; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); - - private Context _context; - - private static final int MSG_COUNT = 5; - private int _receivedCount = 0; - private int _errorCount = 0; - private MessageConsumer _consumer; - private Connection _clientConnection; - private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT); - - protected void setUp() throws Exception - { - super.setUp(); - - // Create Client - _clientConnection = getConnection("guest", "guest"); - - _clientConnection.start(); - - Session clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queue =clientSession.createQueue("message-listener-test-queue"); - - _consumer = clientSession.createConsumer(queue); - - // Create Producer - - Connection producerConnection = getConnection("guest", "guest"); - - producerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer producer = producerSession.createProducer(queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - producer.send(producerSession.createTextMessage("Message " + msg)); - } - - producerConnection.close(); - - } - - protected void tearDown() throws Exception - { - if (_clientConnection != null) - { - _clientConnection.close(); - } - super.tearDown(); - } - - public void testSynchronousReceive() throws Exception - { - for (int msg = 0; msg < MSG_COUNT; msg++) - { - assertTrue(_consumer.receive(2000) != null); - } - } - - public void testSynchronousReceiveNoWait() throws Exception - { - for (int msg = 0; msg < MSG_COUNT; msg++) - { - assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null); - } - } - - public void testAsynchronousReceive() throws Exception - { - _consumer.setMessageListener(this); - - _logger.info("Waiting 3 seconds for messages"); - - try - { - _awaitMessages.await(3000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - // Should have received all async messages - assertEquals(MSG_COUNT, _receivedCount); - - } - - public void testReceiveThenUseMessageListener() throws Exception - { - _logger.error("Test disabled as initial receive is not called first"); - // Perform initial receive to start connection - assertTrue(_consumer.receive(2000) != null); - _receivedCount++; - - // Sleep to ensure remaining 4 msgs end up on _synchronousQueue - Thread.sleep(1000); - - // Set the message listener and wait for the messages to come in. - _consumer.setMessageListener(this); - - _logger.info("Waiting 3 seconds for messages"); - - try - { - _awaitMessages.await(3000, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - // do nothing - } - // Should have received all async messages - assertEquals(MSG_COUNT, _receivedCount); - - _clientConnection.close(); - - Connection conn = getConnection("guest", "guest"); - Session clientSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = clientSession.createQueue("message-listener-test-queue"); - MessageConsumer cons = clientSession.createConsumer(queue); - conn.start(); - - // check that the messages were actually dequeued - assertTrue(cons.receive(2000) == null); - } - - /** - * Tests the case where the message listener throws an java.lang.Error. - * - */ - public void testMessageListenerThrowsError() throws Exception - { - final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error"; - _clientConnection.setExceptionListener(this); - - _awaitMessages = new CountDownLatch(1); - - _consumer.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - try - { - _logger.debug("onMessage called"); - _receivedCount++; - - - throw new Error(javaLangErrorMessageText); - } - finally - { - _awaitMessages.countDown(); - } - } - }); - - - _logger.info("Waiting 3 seconds for message"); - _awaitMessages.await(3000, TimeUnit.MILLISECONDS); - - assertEquals("onMessage should have been called", 1, _receivedCount); - assertEquals("onException should NOT have been called", 0, _errorCount); - - // Check that Error has been written to the application log. - - LogMonitor _monitor = new LogMonitor(_outputFile); - assertTrue("The expected message not written to log file.", - _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT)); - - if (_clientConnection != null) - { - try - { - _clientConnection.close(); - } - catch (JMSException e) - { - // Ignore connection close errors for this test. - } - finally - { - _clientConnection = null; - } - } - } - - public void onMessage(Message message) - { - _logger.info("Received Message(" + _receivedCount + "):" + message); - - _receivedCount++; - _awaitMessages.countDown(); - } - - public void onException(JMSException e) - { - _logger.info("Exception received", e); - _errorCount++; - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(MessageListenerTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java deleted file mode 100644 index 6ff6681c47..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/ResetMessageListenerTest.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.client; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.Context; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-293 Setting MessageListener after connection has started can cause messages to be "lost" on a internal delivery - * queue <p/> The message delivery process: Mina puts a message on _queue in AMQSession and the dispatcher thread - * take()s from here and dispatches to the _consumers. If the _consumer1 doesn't have a message listener set at - * connection start then messages are stored on _synchronousQueue (which needs to be > 1 to pass JMS TCK as multiple - * consumers on a session can run in any order and a synchronous put/poll will block the dispatcher). <p/> When setting - * the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining - * messages will be left on the queue and lost, subsequent messages on the session will arrive first. - */ -public class ResetMessageListenerTest extends QpidBrokerTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(ResetMessageListenerTest.class); - - private Context _context; - - private static final int MSG_COUNT = 6; - private Connection _clientConnection, _producerConnection; - private MessageConsumer _consumer1; - private MessageProducer _producer; - private Session _clientSession, _producerSession; - - private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(MSG_COUNT); // all messages Sent Lock - - protected void setUp() throws Exception - { - super.setUp(); - - _clientConnection = getConnection("guest", "guest"); - _clientConnection.start(); - // Create Client 1 - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queue = _clientSession.createQueue("reset-message-listener-test-queue"); - - _consumer1 = _clientSession.createConsumer(queue); - - // Create Producer - _producerConnection = getConnection("guest", "guest"); - - _producerConnection.start(); - - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _producer = _producerSession.createProducer(queue); - - TextMessage m = _producerSession.createTextMessage(); - m.setStringProperty("rank", "first"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - m.setText("Message " + msg); - _producer.send(m); - } - } - - protected void tearDown() throws Exception - { - _clientConnection.close(); - - super.tearDown(); - } - - public void testAsynchronousRecieve() - { - - _logger.info("Test Start"); - - try - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - try - { - if (message.getStringProperty("rank").equals("first")) - { - _allFirstMessagesSent.countDown(); - } - } - catch (JMSException e) - { - e.printStackTrace(); - fail("error receiving message"); - } - } - }); - } - catch (JMSException e) - { - _logger.error("Error Setting Default ML on consumer1"); - } - try - { - assertTrue("Did not receive all first batch of messages", - _allFirstMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS)); - _logger.info("Received first batch of messages"); - } - catch (InterruptedException e) - { - // do nothing - } - - try - { - _clientConnection.stop(); - } - catch (JMSException e) - { - _logger.error("Error stopping connection"); - } - - _logger.info("Reset Message Listener "); - try - { - _consumer1.setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - try - { - if (message.getStringProperty("rank").equals("first")) - { - // Something ugly will happen, it'll probably kill the dispatcher - fail("All first set of messages should have been received"); - } - else - { - _allSecondMessagesSent.countDown(); - } - } - catch (JMSException e) - { - e.printStackTrace(); - // Something ugly will happen, it'll probably kill the dispatcher - fail("error receiving message"); - } - } - }); - - _clientConnection.start(); - } - catch (javax.jms.IllegalStateException e) - { - _logger.error("Connection not stopped while setting ML", e); - fail("Unable to change message listener:" + e.getCause()); - } - catch (JMSException e) - { - _logger.error("Error Setting Better ML on consumer1", e); - } - - try - { - _logger.info("Send additional messages"); - TextMessage m = _producerSession.createTextMessage(); - m.setStringProperty("rank", "second"); - for (int msg = 0; msg < MSG_COUNT; msg++) - { - m.setText("Message " + msg); - _producer.send(m); - } - } - catch (JMSException e) - { - _logger.error("Unable to send additional messages", e); - } - - _logger.info("Waiting for messages"); - - try - { - assertTrue(_allSecondMessagesSent.await(MSG_COUNT, TimeUnit.SECONDS)); - } - catch (InterruptedException e) - { - // do nothing - } - assertEquals("First batch of messages not received correctly", 0, _allFirstMessagesSent.getCount()); - assertEquals("Second batch of messages not received correctly", 0, _allSecondMessagesSent.getCount()); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(ResetMessageListenerTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java index d7295b298e..08ed2258b2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SessionCreateTest.java @@ -27,7 +27,6 @@ import org.apache.qpid.test.utils.QpidBrokerTestCase; import javax.jms.Connection; import javax.jms.Session; -import javax.naming.Context; /** @@ -35,9 +34,7 @@ import javax.naming.Context; */ public class SessionCreateTest extends QpidBrokerTestCase { - private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class); - - private Context _context; + private static final Logger LOGGER = LoggerFactory.getLogger(SessionCreateTest.class); private Connection _clientConnection; protected int maxSessions = 65555; @@ -54,7 +51,7 @@ public class SessionCreateTest extends QpidBrokerTestCase Session sess = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(sess); sess.close(); - System.out.println("created session: " + i); + LOGGER.debug("created session: " + i); } _clientConnection.close(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java new file mode 100644 index 0000000000..bf147197e4 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SynchReceiveTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class SynchReceiveTest extends QpidBrokerTestCase +{ + private static final long AWAIT_MESSAGE_TIMEOUT = 2000; + private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250; + private static final int MSG_COUNT = 10; + private final String _testQueueName = getTestQueueName(); + private Connection _consumerConnection; + private Session _consumerSession; + private MessageConsumer _consumer; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _queue = _consumerSession.createQueue(_testQueueName); + _consumer = _consumerSession.createConsumer(_queue); + + // Populate queue + Connection producerConnection = getConnection(); + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + sendMessage(producerSession, _queue, MSG_COUNT); + producerConnection.close(); + } + + public void testReceiveWithTimeout() throws Exception + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertNotNull("Expected message number " + msg, _consumer.receive(AWAIT_MESSAGE_TIMEOUT)); + } + + assertNull("Received too many messages", _consumer.receive(500)); + } + + public void testReceiveNoWait() throws Exception + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertNotNull("Expected message number " + msg, _consumer.receiveNoWait()); + } + + assertNull("Received too many messages", _consumer.receive(500)); + } + + public void testTwoConsumersInterleaved() throws Exception + { + //create a new connection with prefetch set to 1 + _consumerConnection.close(); + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString()); + + _consumerConnection = getConnection(); + _consumerConnection.start(); + Session consumerSession1 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer1 = consumerSession1.createConsumer(_queue); + + Session consumerSession2 = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = consumerSession2.createConsumer(_queue); + + final int maxLoops = MSG_COUNT * 2; + int msg = 0; + int loops = 0; + while(msg < MSG_COUNT && loops < maxLoops) + { + if (consumer1.receive(AWAIT_MESSAGE_TIMEOUT) != null) + { + msg++; + } + + if (consumer2.receive(AWAIT_MESSAGE_TIMEOUT) != null) + { + msg++; + } + + loops++; + } + + assertEquals("Not all messages received.", MSG_COUNT, msg); + assertNull("Received too many messages", consumer1.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE)); + assertNull("Received too many messages", consumer2.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE)); + } + + public void testIdleSecondConsumer() throws Exception + { + Session idleSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + @SuppressWarnings("unused") + MessageConsumer idleConsumerOnSameQueue = idleSession.createConsumer(_queue); + + // Since we don't call receive on the idle consumer, all messages will flow to other + + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertNotNull("Expected message number " + msg, _consumer.receive(AWAIT_MESSAGE_TIMEOUT)); + } + + assertNull("Received too many messages", _consumer.receive(AWAIT_MESSAGE_TIMEOUT_NEGATIVE)); + } + + +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAConnectionTest.java new file mode 100644 index 0000000000..658e1fca1f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/ra/QpidRAConnectionTest.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ra; + +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.qpid.ra.QpidRAConnectionFactoryImpl; +import org.apache.qpid.ra.QpidRAManagedConnectionFactory; +import org.apache.qpid.ra.QpidResourceAdapter; + +public class QpidRAConnectionTest extends QpidBrokerTestCase +{ + private static final String BROKER_PORT = "15672"; + + private static final String URL = "amqp://guest:guest@client/test?brokerlist='tcp://localhost:" + BROKER_PORT + "?sasl_mechs='PLAIN''"; + + public void testSessionCommitOnClosedConnectionThrowsException() throws Exception + { + QpidResourceAdapter ra = new QpidResourceAdapter(); + QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory(); + mcf.setConnectionURL(URL); + mcf.setResourceAdapter(ra); + ConnectionFactory cf = new QpidRAConnectionFactoryImpl(mcf, null); + Connection c = cf.createConnection(); + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + c.close(); + + try + { + s.commit(); + fail("Exception should be thrown"); + } + catch(Exception e) + { + e.printStackTrace(); + assertTrue(e instanceof javax.jms.IllegalStateException); + } + + } + + public void testMessageAck() throws Exception + { + QpidResourceAdapter ra = new QpidResourceAdapter(); + QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory(); + mcf.setConnectionURL(URL); + mcf.setResourceAdapter(ra); + ConnectionFactory cf = new QpidRAConnectionFactoryImpl(mcf, null); + Connection c = cf.createConnection(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Message m = s.createTextMessage(); + + try + { + m.acknowledge(); + } + catch(Exception e) + { + fail("Acknowledge should not throw an exception"); + } + finally + { + s.close(); + c.close(); + } + } + @Override + public void stopBroker(int port) throws Exception + { + if (isBrokerPresent(port)) + { + super.stopBroker(port); + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index ad8c856a74..13053d02df 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -357,13 +357,45 @@ public class ProducerFlowControlTest extends AbstractTestLogging consumer.receive(); } + public void testQueueDeleteWithBlockedFlow() throws Exception + { + String queueName = getTestQueueName(); + createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800, true, false); + + producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + sendMessagesAsync(producer, producerSession, 5, 50L); + + Thread.sleep(5000); + + assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); + + // close blocked producer session and connection + producerConnection.close(); + + // delete queue with a consumer session + ((AMQSession<?,?>) consumerSession).sendQueueDelete(new AMQShortString(queueName)); + + consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(1000l); + assertNull("Unexpected message", message); + } + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception { + createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true); + } + + private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception + { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("x-qpid-capacity",capacity); arguments.put("x-qpid-flow-resume-capacity",resumeCapacity); - ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), true, false, false, arguments); - queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='false'&autodelete='true'"); + ((AMQSession<?,?>) session).createQueue(new AMQShortString(queueName), autoDelete, durable, false, arguments); + queue = session.createQueue("direct://amq.direct/"+queueName+"/"+queueName+"?durable='" + durable + "'&autodelete='" + autoDelete + "'"); ((AMQSession<?,?>) session).declareAndBind((AMQDestination)queue); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java deleted file mode 100644 index c764eda799..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/ReceiveTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.basic; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Message; -import javax.jms.MessageConsumer; - -public class ReceiveTest extends QpidBrokerTestCase -{ - private AMQConnection _connection; - private AMQDestination _destination; - private AMQSession _session; - private MessageConsumer _consumer; - - protected void setUp() throws Exception - { - super.setUp(); - init((AMQConnection) getConnection("guest", "guest")); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - private void init(AMQConnection connection) throws Exception - { - init(connection, new AMQQueue(connection,"ReceiveTest", true)); - } - - private void init(AMQConnection connection, AMQDestination destination) throws Exception - { - _connection = connection; - _destination = destination; - _session = (AMQSession) connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); - _consumer = _session.createConsumer(_destination); - _connection.start(); - } - - public void test() throws Exception - { - Message m = _consumer.receive(5000); - assertNull("should not have received a message", m); - _connection.close(); - } - - - public static junit.framework.Test suite() - { - // TODO: note that this test doesn't use the VMBrokerSetup - // test helper class to create and tear down its - // VMBroker. This is because the main() above seems to - // indicate that it's also used outside of the surefire test - // framework. If it isn't, then this test should also be - // changed to use VMBrokerSetup here. - return new junit.framework.TestSuite(ReceiveTest.class); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index f554b0089e..cc76d89a67 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.test.unit.transacted; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; + /** * This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration * is set for a virtual host. @@ -305,4 +311,33 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase monitor(0, 0); } + + /** + * Tests that sending an unroutable persistent message does not result in a long running store transaction [warning]. + */ + public void testTransactionCommittedOnNonRoutableQueuePersistentMessage() throws Exception + { + checkTransactionCommittedOnNonRoutableQueueMessage(DeliveryMode.PERSISTENT); + } + + /** + * Tests that sending an unroutable transient message does not result in a long running store transaction [warning]. + */ + public void testTransactionCommittedOnNonRoutableQueueTransientMessage() throws Exception + { + checkTransactionCommittedOnNonRoutableQueueMessage(DeliveryMode.NON_PERSISTENT); + } + + private void checkTransactionCommittedOnNonRoutableQueueMessage(int deliveryMode) throws JMSException, Exception + { + Queue nonExisting = _psession.createQueue(getTestQueueName() + System.currentTimeMillis()); + MessageProducer producer = _psession.createProducer(nonExisting); + Message message = _psession.createMessage(); + producer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + _psession.commit(); + + // give time to house keeping thread to log messages + sleep(3f); + monitor(0, 0); + } } diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 14671f97af..74b02153ac 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -24,7 +24,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateEx org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testDeleteOptions org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* -org.apache.qpid.client.ResetMessageListenerTest#* // Those tests are testing 0.8 specific semantics org.apache.qpid.test.client.ImmediateAndMandatoryPublishingTest#* @@ -169,3 +168,6 @@ org.apache.qpid.server.message.MessageProtocolConversionTest#* // passwd script is a Java Broker specific command line tool org.apache.qpid.scripts.QpidPasswdTest#* + +// QPID-3604: Immediate Prefetch no longer supported by 0-10 +org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener diff --git a/qpid/java/test-profiles/CPPPrefetchExcludes b/qpid/java/test-profiles/CPPPrefetchExcludes index 7ef52f89c7..9b4d69cebd 100644 --- a/qpid/java/test-profiles/CPPPrefetchExcludes +++ b/qpid/java/test-profiles/CPPPrefetchExcludes @@ -18,6 +18,6 @@ // // those tests should be run with prefetch off -org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveC2Only -org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth +org.apache.qpid.client.SynchReceiveTest#testTwoConsumersInterleaved +org.apache.qpid.client.SynchReceiveTest#testIdleSecondConsumer org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index ac6ac8ae1c..90df1cee81 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -54,3 +54,6 @@ org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionC org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* +// QPID-3604: Immediate Prefetch no longer supported by 0-10 +org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener + diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index ce0016fcff..9741eed2e9 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -33,3 +33,4 @@ org.apache.qpid.client.ssl.SSLTest#testMultipleCertsInSingleStore //QPID-3605 Durable subscriber with no-local true receives messages on re-connection org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testNoLocalMessagesNotDeliveredAfterReconnection + diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 2b77911177..18320646ee 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -36,7 +36,7 @@ org.apache.qpid.server.queue.AddressBasedSortedQueueTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* -org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait +org.apache.qpid.client.SynchReceiveTest#testReceiveNoWait // Tests 0.10 client feature org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism @@ -63,3 +63,7 @@ org.apache.qpid.jms.xa.XAResourceTest#* //Tests durable subscription selector verification behaviour that 0-8/0-9/0-9-1 cant provide org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart + +// JCA system tests require XA support (should look to see if we can reduce scope of excludes here) +org.apache.qpid.ra.QpidRAConnectionTest#* + diff --git a/qpid/python/qpid/connection.py b/qpid/python/qpid/connection.py index f2c83d113c..66e1cb49be 100644 --- a/qpid/python/qpid/connection.py +++ b/qpid/python/qpid/connection.py @@ -170,6 +170,10 @@ class Connection(Framer): if not status: self.detach_all() break + # When we do not use SSL transport, we get periodic + # spurious timeout events on the socket. When using SSL, + # these events show up as timeout *errors*. Both should be + # ignored unless we have aborted. except socket.timeout: if self.aborted(): self.close_code = (None, "connection timed out") @@ -178,9 +182,12 @@ class Connection(Framer): else: continue except socket.error, e: - self.close_code = (None, str(e)) - self.detach_all() - break + if self.aborted() or str(e) != "The read operation timed out": + self.close_code = (None, str(e)) + self.detach_all() + break + else: + continue frame_dec.write(data) seg_dec.write(*frame_dec.read()) op_dec.write(*seg_dec.read()) |