From acf3a1931ec404d1b02a2e115ef18e531d3924e4 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Sat, 27 Feb 2010 00:38:13 +0000 Subject: Rebased the wmf branch to the trunk. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916887 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/include/qpid/Exception.h | 2 - qpid/cpp/include/qpid/sys/posix/check.h | 1 + qpid/cpp/include/qpid/sys/windows/check.h | 1 + qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb | 1 + qpid/cpp/rubygen/framing.0-10/constants.rb | 2 + qpid/cpp/src/CMakeLists.txt | 5 +- qpid/cpp/src/qmf/engine/EventImpl.cpp | 2 + qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp | 1 + qpid/cpp/src/qmf/engine/SchemaImpl.cpp | 1 + qpid/cpp/src/qmf/engine/SchemaImpl.h | 1 + qpid/cpp/src/qpid/cluster/Cluster.cpp | 8 +- qpid/cpp/src/qpid/cluster/ClusterTimer.h | 6 + qpid/cpp/src/qpid/cluster/Cpg.cpp | 1 - qpid/cpp/src/qpid/cluster/StoreStatus.cpp | 7 + qpid/cpp/src/qpid/cluster/StoreStatus.h | 6 +- qpid/cpp/src/qpid/console/Value.cpp | 2 + qpid/cpp/src/qpid/framing/AMQFrame.cpp | 2 + qpid/cpp/src/qpid/framing/Array.cpp | 1 + qpid/cpp/src/qpid/framing/BodyHandler.cpp | 1 + qpid/cpp/src/qpid/framing/FieldTable.cpp | 1 + qpid/cpp/src/qpid/framing/FieldValue.cpp | 1 + qpid/cpp/src/qpid/framing/List.cpp | 1 + qpid/cpp/src/qpid/framing/SequenceSet.cpp | 1 + qpid/cpp/src/qpid/framing/Uuid.cpp | 1 + qpid/cpp/src/qpid/messaging/Variant.cpp | 1 + qpid/cpp/src/qpid/sys/AggregateOutput.cpp | 4 +- qpid/cpp/src/qpid/sys/posix/Shlib.cpp | 1 + qpid/cpp/src/qpid/sys/ssl/check.h | 2 + qpid/cpp/src/tests/Makefile.am | 1 - qpid/cpp/src/tests/TxMocks.h | 1 + qpid/cpp/src/tests/cluster_tests.py | 54 +- qpid/cpp/src/tests/failover_soak.cpp | 159 ++-- qpid/cpp/src/tests/run_failover_soak | 4 +- qpid/cpp/src/tests/test_env.sh.in | 3 + qpid/cpp/src/tests/verify_cluster_objects | 34 +- qpid/extras/qmf/src/py/qmf2/agent.py | 687 +++++++++++++----- qpid/extras/qmf/src/py/qmf2/common.py | 83 +-- qpid/extras/qmf/src/py/qmf2/console.py | 589 ++++++++++++--- qpid/extras/qmf/src/py/qmf2/tests/__init__.py | 1 + .../qmf/src/py/qmf2/tests/agent_discovery.py | 4 +- qpid/extras/qmf/src/py/qmf2/tests/async_method.py | 2 +- qpid/extras/qmf/src/py/qmf2/tests/async_query.py | 2 +- qpid/extras/qmf/src/py/qmf2/tests/basic_method.py | 2 +- qpid/extras/qmf/src/py/qmf2/tests/basic_query.py | 2 +- qpid/extras/qmf/src/py/qmf2/tests/events.py | 2 +- .../extras/qmf/src/py/qmf2/tests/multi_response.py | 4 +- qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py | 2 +- qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py | 808 +++++++++++++++++++++ .../apache/qpid/example/shared/example.properties | 1 + qpid/java/module.xml | 1 + qpid/java/perftests/etc/scripts/drainBroker.sh | 41 ++ qpid/java/perftests/etc/scripts/fillBroker.sh | 41 ++ qpid/java/perftests/etc/scripts/testWithPreFill.sh | 41 ++ .../org/apache/qpid/ping/PingAsyncTestPerf.java | 17 +- .../main/java/org/apache/qpid/ping/PingClient.java | 5 + .../java/org/apache/qpid/ping/PingTestPerf.java | 62 +- .../apache/qpid/requestreply/PingPongProducer.java | 111 ++- .../apache/qpid/server/queue/TimeToLiveTest.java | 160 ++++ .../qpid/test/unit/ct/DurableSubscriberTest.java | 4 +- .../test/unit/topic/DurableSubscriptionTest.java | 132 +++- .../org/apache/qpid/test/utils/QpidTestCase.java | 7 + qpid/java/test-profiles/JavaExcludes | 3 +- qpid/java/test-profiles/JavaStandaloneExcludes | 1 - qpid/java/test-profiles/JavaTransientExcludes | 2 + qpid/java/test-profiles/java-derby.testprofile | 1 + qpid/python/qpid-python-test | 28 +- qpid/python/qpid/messaging/driver.py | 342 +++++---- qpid/python/qpid/messaging/endpoints.py | 5 +- 68 files changed, 2858 insertions(+), 655 deletions(-) mode change 100644 => 100755 qpid/cpp/src/tests/verify_cluster_objects create mode 100644 qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py create mode 100755 qpid/java/perftests/etc/scripts/drainBroker.sh create mode 100755 qpid/java/perftests/etc/scripts/fillBroker.sh create mode 100755 qpid/java/perftests/etc/scripts/testWithPreFill.sh diff --git a/qpid/cpp/include/qpid/Exception.h b/qpid/cpp/include/qpid/Exception.h index 7b937c242a..fa7111160c 100644 --- a/qpid/cpp/include/qpid/Exception.h +++ b/qpid/cpp/include/qpid/Exception.h @@ -26,9 +26,7 @@ #include "qpid/framing/constants.h" #include "qpid/framing/enum.h" #include "qpid/sys/StrError.h" -#include "qpid/Msg.h" #include "qpid/CommonImportExport.h" -#include #include #include diff --git a/qpid/cpp/include/qpid/sys/posix/check.h b/qpid/cpp/include/qpid/sys/posix/check.h index bbc66d389b..1bfe5d6d78 100644 --- a/qpid/cpp/include/qpid/sys/posix/check.h +++ b/qpid/cpp/include/qpid/sys/posix/check.h @@ -23,6 +23,7 @@ */ #include "qpid/Exception.h" +#include "qpid/Msg.h" #include #include diff --git a/qpid/cpp/include/qpid/sys/windows/check.h b/qpid/cpp/include/qpid/sys/windows/check.h index aba38814b2..2a8e439bed 100755 --- a/qpid/cpp/include/qpid/sys/windows/check.h +++ b/qpid/cpp/include/qpid/sys/windows/check.h @@ -23,6 +23,7 @@ */ #include "qpid/Exception.h" +#include "qpid/Msg.h" #include "qpid/sys/StrError.h" #define QPID_WINDOWS_ERROR(ERRVAL) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRVAL))) diff --git a/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb b/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb index 95c79fd1a4..28a5d94e32 100644 --- a/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb +++ b/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb @@ -35,6 +35,7 @@ class MethodBodyFactoryGen < CppGen include "qpid/framing/BodyFactory" @amqp.methods_.each { |m| include "qpid/framing/#{m.body_name}" } include "qpid/Exception.h" + include "qpid/Msg.h" genl namespace(@namespace) { scope("boost::intrusive_ptr #{@classname}::create(ClassId c, MethodId m) {") { diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb index 5c1c1047f7..85bfb96ac0 100755 --- a/qpid/cpp/rubygen/framing.0-10/constants.rb +++ b/qpid/cpp/rubygen/framing.0-10/constants.rb @@ -78,6 +78,7 @@ EOS cpp_file(path) { include(path); include("qpid/Exception.h") + include("qpid/Msg.h") include("") namespace(@namespace) { scope("const char* typeName(TypeCode t) {") { @@ -181,6 +182,7 @@ EOS def reply_exceptions_cpp() cpp_file("#{@dir}/reply_exceptions") { include "#{@dir}/reply_exceptions" + include "qpid/Msg.h" include "" include "" namespace("qpid::framing") { diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 62bab239be..a7078bea47 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -855,7 +855,6 @@ set (qmfengine_SOURCES qmf/engine/Protocol.h qmf/engine/QueryImpl.cpp qmf/engine/QueryImpl.h - qmf/engine/ResilientConnection.cpp qmf/engine/SequenceManager.cpp qmf/engine/SequenceManager.h qmf/engine/SchemaImpl.cpp @@ -863,6 +862,10 @@ set (qmfengine_SOURCES qmf/engine/ValueImpl.cpp qmf/engine/ValueImpl.h ) +if (NOT WIN32) + list(APPEND qmfengine_SOURCES qmf/engine/ResilientConnection.cpp) +endif (NOT WIN32) + add_library (qmfengine SHARED ${qmfengine_SOURCES}) target_link_libraries (qmfengine qpidclient) set_target_properties (qmfengine PROPERTIES diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp index 79d5a491fc..27501cc396 100644 --- a/qpid/cpp/src/qmf/engine/EventImpl.cpp +++ b/qpid/cpp/src/qmf/engine/EventImpl.cpp @@ -20,6 +20,8 @@ #include #include +#include + using namespace std; using namespace qmf::engine; using qpid::framing::Buffer; diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp index 670ee385a3..9216f7bac0 100644 --- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp @@ -19,6 +19,7 @@ #include "qmf/engine/ObjectIdImpl.h" #include +#include using namespace std; using namespace qmf::engine; diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp index a4f56464a7..e276df8cec 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include using namespace std; using namespace qmf::engine; diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h index d78c921c67..71a10559cf 100644 --- a/qpid/cpp/src/qmf/engine/SchemaImpl.h +++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h @@ -25,6 +25,7 @@ #include #include #include +#include namespace qmf { namespace engine { diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index e718819f48..f49fbb03a5 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -285,6 +285,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : } Cluster::~Cluster() { + broker.setClusterTimer(std::auto_ptr(0)); // Delete cluster timer if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } @@ -914,6 +915,12 @@ void Cluster::memberUpdate(Lock& l) { size_t size = urls.size(); failoverExchange->updateUrls(urls); + if (store.hasStore()) { + // Mark store clean if I am the only broker, dirty otherwise. + if (size == 1) store.clean(Uuid(true)); + else store.dirty(clusterId); + } + if (size == 1 && lastSize > 1 && state >= CATCHUP) { QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; @@ -996,7 +1003,6 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu } void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name) timer->deliverWakeup(name); } diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.h b/qpid/cpp/src/qpid/cluster/ClusterTimer.h index 395e505451..69f6c622e4 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.h +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.h @@ -30,6 +30,12 @@ namespace cluster { class Cluster; +/** + * Timer implementation that executes tasks consistently in the + * deliver thread across a cluster. Task is not executed when timer + * fires, instead the elder multicasts a wakeup. The task is executed + * when the wakeup is delivered. + */ class ClusterTimer : public sys::Timer { public: ClusterTimer(Cluster&); diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 3ae0c970c7..0856bcd824 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -54,7 +54,6 @@ void Cpg::callCpg ( CpgOp & c ) { unsigned int snooze = 10; for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { if ( CPG_OK == (result = c.op(handle, & group))) { - QPID_LOG(info, c.opName << " successful."); break; } else if ( result == CPG_ERR_TRY_AGAIN ) { diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp index cf75cd3b5f..648fcfbbd5 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp @@ -20,6 +20,7 @@ */ #include "StoreStatus.h" #include "qpid/Exception.h" +#include "qpid/Msg.h" #include #include #include @@ -113,7 +114,12 @@ void StoreStatus::save() { } } +bool StoreStatus::hasStore() const { + return state != framing::cluster::STORE_STATE_NO_STORE; +} + void StoreStatus::dirty(const Uuid& clusterId_) { + if (!hasStore()) return; assert(clusterId_); clusterId = clusterId_; shutdownId = Uuid(); @@ -122,6 +128,7 @@ void StoreStatus::dirty(const Uuid& clusterId_) { } void StoreStatus::clean(const Uuid& shutdownId_) { + if (!hasStore()) return; assert(shutdownId_); state = STORE_STATE_CLEAN_STORE; shutdownId = shutdownId_; diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h index 911b3a2ba2..2371f0424e 100644 --- a/qpid/cpp/src/qpid/cluster/StoreStatus.h +++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h @@ -46,14 +46,14 @@ class StoreStatus const Uuid& getShutdownId() const { return shutdownId; } framing::SequenceNumber getConfigSeq() const { return configSeq; } - void dirty(const Uuid& start); // Start using the store. - void clean(const Uuid& stop); // Stop using the store. + void dirty(const Uuid& clusterId); // Mark the store in use by clusterId. + void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number. void load(); void save(); - bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; } + bool hasStore() const; private: framing::cluster::StoreState state; diff --git a/qpid/cpp/src/qpid/console/Value.cpp b/qpid/cpp/src/qpid/console/Value.cpp index c30660f1dc..47c6a4ce57 100644 --- a/qpid/cpp/src/qpid/console/Value.cpp +++ b/qpid/cpp/src/qpid/console/Value.cpp @@ -22,6 +22,8 @@ #include "qpid/console/Value.h" #include "qpid/framing/Buffer.h" +#include + using namespace qpid; using namespace qpid::console; using namespace std; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index 5c5920d786..d863970ece 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -24,6 +24,8 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/BodyFactory.h" #include "qpid/framing/MethodBodyFactory.h" +#include "qpid/Msg.h" + #include #include diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp index d95e0d167d..454e8e298f 100644 --- a/qpid/cpp/src/qpid/framing/Array.cpp +++ b/qpid/cpp/src/qpid/framing/Array.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" #include namespace qpid { diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp index e2128596ed..db302b1e4c 100644 --- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp +++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp @@ -25,6 +25,7 @@ #include "qpid/framing/AMQHeartbeatBody.h" #include #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" using namespace qpid::framing; using namespace boost; diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp index e2e91e450a..023e4af819 100644 --- a/qpid/cpp/src/qpid/framing/FieldTable.cpp +++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp @@ -25,6 +25,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" #include namespace qpid { diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp index 0b49748de8..fd911645f4 100644 --- a/qpid/cpp/src/qpid/framing/FieldValue.cpp +++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp @@ -24,6 +24,7 @@ #include "qpid/framing/Endian.h" #include "qpid/framing/List.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" namespace qpid { namespace framing { diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp index bde7dabbac..963ebc206b 100644 --- a/qpid/cpp/src/qpid/framing/List.cpp +++ b/qpid/cpp/src/qpid/framing/List.cpp @@ -23,6 +23,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" namespace qpid { namespace framing { diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp index dcfb4689b6..72fcd8a9e2 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -22,6 +22,7 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" using namespace qpid::framing; using std::max; diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp index 432c7ab94e..67ca96d53f 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.cpp +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -22,6 +22,7 @@ #include "qpid/Exception.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/Msg.h" namespace qpid { namespace framing { diff --git a/qpid/cpp/src/qpid/messaging/Variant.cpp b/qpid/cpp/src/qpid/messaging/Variant.cpp index 116018f797..ba93f160ec 100644 --- a/qpid/cpp/src/qpid/messaging/Variant.cpp +++ b/qpid/cpp/src/qpid/messaging/Variant.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/messaging/Variant.h" +#include "qpid/Msg.h" #include #include #include diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp index 4f0a4fa5af..fc95f46fb9 100644 --- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp +++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp @@ -34,6 +34,7 @@ void AggregateOutput::activateOutput() { control.activateOutput(); } void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); } +namespace { // Clear the busy flag and notify waiting threads in destructor. struct ScopedBusy { bool& flag; @@ -41,7 +42,8 @@ struct ScopedBusy { ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; } ~ScopedBusy() { flag = false; monitor.notifyAll(); } }; - +} + bool AggregateOutput::doOutput() { Mutex::ScopedLock l(lock); ScopedBusy sb(busy, lock); diff --git a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp index 299331103c..3fb685d5b8 100644 --- a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp @@ -20,6 +20,7 @@ #include "qpid/sys/Shlib.h" #include "qpid/Exception.h" +#include "qpid/Msg.h" #include diff --git a/qpid/cpp/src/qpid/sys/ssl/check.h b/qpid/cpp/src/qpid/sys/ssl/check.h index 984c338a18..94db120afa 100644 --- a/qpid/cpp/src/qpid/sys/ssl/check.h +++ b/qpid/cpp/src/qpid/sys/ssl/check.h @@ -21,6 +21,8 @@ * under the License. * */ +#include "qpid/Msg.h" + #include #include #include diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 4d65803ac1..4a931ab680 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -298,7 +298,6 @@ TESTS_ENVIRONMENT = \ VALGRIND=$(VALGRIND) \ LIBTOOL="$(LIBTOOL)" \ QPID_DATA_DIR= \ - BOOST_TEST_SHOW_PROGRESS=yes \ $(srcdir)/run_test system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h index a34d864bae..72cb50cd21 100644 --- a/qpid/cpp/src/tests/TxMocks.h +++ b/qpid/cpp/src/tests/TxMocks.h @@ -23,6 +23,7 @@ #include "qpid/Exception.h" +#include "qpid/Msg.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/broker/TxOp.h" #include diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index b3274b1b1e..22b7c8f5b8 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -305,21 +305,47 @@ class StoreTests(BrokerTest): self.assertRaises(Exception, lambda: a.ready()) self.assertRaises(Exception, lambda: b.ready()) - def test_total_failure(self): - # Verify we abort with sutiable error message if no clean stores. - cluster = self.cluster(0, args=self.args()+["--cluster-size=2"]) - a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True) - a.kill() - b.kill() - a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False) - b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False) - self.assertRaises(Exception, lambda: a.ready()) - self.assertRaises(Exception, lambda: b.ready()) + def assert_dirty_store(self, broker): + self.assertRaises(Exception, lambda: broker.ready()) msg = re.compile("critical.*no clean store") - assert msg.search(readfile(a.log)) - assert msg.search(readfile(b.log)) + assert msg.search(readfile(broker.log)) + + def test_solo_store_clean(self): + # A single node cluster should always leave a clean store. + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + a.send_message("q", Message("x", durable=True)) + a.kill() + a = cluster.start("a") + self.assertEqual(a.get_message("q").content, "x") + + def test_last_store_clean(self): + + # Verify that only the last node in a cluster to shut down has + # a clean store. Start with cluster of 3, reduce to 1 then + # increase again to ensure that a node that was once alone but + # finally did not finish as the last node does not get a clean + # store. + cluster = self.cluster(0, self.args()) + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) + b = cluster.start("b", expect=EXPECT_EXIT_FAIL) + c = cluster.start("c", expect=EXPECT_EXIT_FAIL) + a.send_message("q", Message("x", durable=True)) + a.kill() + b.kill() # c is last man + time.sleep(0.1) # pause for c to find out hes last. + a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man + c.kill() # a is now last man + time.sleep(0.1) # pause for a to find out hes last. + a.kill() # really last + # b & c should be dirty + b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK) + self.assert_dirty_store(b) + c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK) + self.assert_dirty_store(c) + # a should be clean + a = cluster.start("a") + self.assertEqual(a.get_message("q").content, "x") - # FIXME aconway 2009-12-03: verify manual restore procedure diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp index ed5c9cbee5..8bf6eca9e6 100644 --- a/qpid/cpp/src/tests/failover_soak.cpp +++ b/qpid/cpp/src/tests/failover_soak.cpp @@ -54,6 +54,8 @@ using namespace qpid::client; namespace qpid { namespace tests { +vector pids; + typedef vector brokerVector; typedef enum @@ -184,17 +186,29 @@ struct children : public vector int checkChildren ( ) { - vector::iterator i; - for ( i = begin(); i != end(); ++ i ) - if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) ) - { - cerr << "checkChildren: error on child of type " - << (*i)->type - << endl; - return (*i)->retval; - } + for ( unsigned int i = 0; i < pids.size(); ++ i ) + { + int pid = pids[i]; + int returned_pid; + int status; - return 0; + child * kid = get ( pid ); + + if ( kid->status != COMPLETED ) + { + returned_pid = waitpid ( pid, &status, WNOHANG ); + + if ( returned_pid == pid ) + { + int exit_status = WEXITSTATUS(status); + exited ( pid, exit_status ); + if ( exit_status ) // this is a child error. + return exit_status; + } + } + } + + return 0; } @@ -323,6 +337,7 @@ startNewBroker ( brokerVector & brokers, int verbosity, int durable ) { + // ("--log-enable=notice+") static int brokerId = 0; stringstream path, prefix; prefix << "soak-" << brokerId; @@ -516,6 +531,7 @@ startReceivingClient ( brokerVector brokers, argv.push_back ( 0 ); pid_t pid = fork(); + pids.push_back ( pid ); if ( ! pid ) { execv ( receiverPath, const_cast(&argv[0]) ); @@ -571,6 +587,7 @@ startSendingClient ( brokerVector brokers, argv.push_back ( 0 ); pid_t pid = fork(); + pids.push_back ( pid ); if ( ! pid ) { execv ( senderPath, const_cast(&argv[0]) ); @@ -602,6 +619,7 @@ using namespace qpid::tests; int main ( int argc, char const ** argv ) { + int brokerKills = 0; if ( argc != 11 ) { cerr << "Usage: " << argv[0] @@ -625,7 +643,6 @@ main ( int argc, char const ** argv ) int n_brokers = atoi(argv[i++]); char const * host = "127.0.0.1"; - int maxBrokers = 50; allMyChildren.verbosity = verbosity; @@ -722,104 +739,86 @@ main ( int argc, char const ** argv ) int minSleep = 2, - maxSleep = 4; + maxSleep = 6; + int totalBrokers = n_brokers; - for ( int totalBrokers = n_brokers; - totalBrokers < maxBrokers; - ++ totalBrokers - ) + int loop = 0; + + while ( 1 ) { + ++ loop; + + /* + if ( verbosity > 1 ) + std::cerr << "------- loop " << loop << " --------\n"; + if ( verbosity > 0 ) cout << totalBrokers << " brokers have been added to the cluster.\n\n\n"; + */ // Sleep for a while. ------------------------- int sleepyTime = mrand ( minSleep, maxSleep ); - if ( verbosity > 0 ) - cout << "Sleeping for " << sleepyTime << " seconds.\n"; sleep ( sleepyTime ); - // Kill the oldest broker. -------------------------- - if ( ! killFrontBroker ( brokers, verbosity ) ) + int bullet = mrand ( 100 ); + if ( bullet >= 95 ) + { + fprintf ( stderr, "Killing oldest broker...\n" ); + + // Kill the oldest broker. -------------------------- + if ( ! killFrontBroker ( brokers, verbosity ) ) + { + allMyChildren.killEverybody(); + killAllBrokers ( brokers, 5 ); + std::cerr << "END_OF_TEST ERROR_BROKER\n"; + return ERROR_KILLING_BROKER; + } + ++ brokerKills; + + // Start a new broker. -------------------------- + if ( verbosity > 0 ) + cout << "Starting new broker.\n\n"; + + startNewBroker ( brokers, + moduleOrDir, + clusterName, + verbosity, + durable ); + ++ totalBrokers; + printBrokers ( brokers ); + cerr << brokerKills << " brokers have been killed.\n\n\n"; + } + + int retval = allMyChildren.checkChildren(); + if ( retval ) { - allMyChildren.killEverybody(); - killAllBrokers ( brokers, 5 ); - std::cerr << "END_OF_TEST ERROR_BROKER\n"; - return ERROR_KILLING_BROKER; + std::cerr << "END_OF_TEST ERROR_CLIENT\n"; + allMyChildren.killEverybody(); + killAllBrokers ( brokers, 5 ); + return ERROR_ON_CHILD; } - // Sleep for a while. ------------------------- - sleepyTime = mrand ( minSleep, maxSleep ); - if ( verbosity > 1 ) - cerr << "Sleeping for " << sleepyTime << " seconds.\n"; - sleep ( sleepyTime ); - - // Start a new broker. -------------------------- - if ( verbosity > 0 ) - cout << "Starting new broker.\n\n"; - - startNewBroker ( brokers, - moduleOrDir, - clusterName, - verbosity, - durable ); - - if ( verbosity > 1 ) - printBrokers ( brokers ); - // If all children have exited, quit. int unfinished = allMyChildren.unfinished(); - if ( ! unfinished ) { + if ( unfinished == 0 ) { killAllBrokers ( brokers, 5 ); if ( verbosity > 1 ) cout << "failoverSoak: all children have exited.\n"; - int retval = allMyChildren.checkChildren(); - if ( verbosity > 1 ) - std::cerr << "failoverSoak: checkChildren: " << retval << endl; - - if ( retval ) - { - std::cerr << "END_OF_TEST ERROR_CLIENT\n"; - return ERROR_ON_CHILD; - } - else - { - std::cerr << "END_OF_TEST SUCCESSFUL\n"; - return HUNKY_DORY; - } - } - // Even if some are still running, if there's an error, quit. - if ( allMyChildren.checkChildren() ) - { - if ( verbosity > 0 ) - cout << "failoverSoak: error on child.\n"; - allMyChildren.killEverybody(); - killAllBrokers ( brokers, 5 ); - std::cerr << "END_OF_TEST ERROR_CLIENT\n"; - return ERROR_ON_CHILD; + std::cerr << "END_OF_TEST SUCCESSFUL\n"; + return HUNKY_DORY; } - if ( verbosity > 1 ) { - std::cerr << "------- next kill-broker loop --------\n"; - allMyChildren.print(); - } } - retval = allMyChildren.checkChildren(); - if ( verbosity > 1 ) - std::cerr << "failoverSoak: checkChildren: " << retval << endl; - - if ( verbosity > 1 ) - cout << "failoverSoak: maxBrokers reached.\n"; - allMyChildren.killEverybody(); killAllBrokers ( brokers, 5 ); std::cerr << "END_OF_TEST SUCCESSFUL\n"; - return retval ? ERROR_ON_CHILD : HUNKY_DORY; + return HUNKY_DORY; } diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak index 69551a51c2..c276e9cc2f 100755 --- a/qpid/cpp/src/tests/run_failover_soak +++ b/qpid/cpp/src/tests/run_failover_soak @@ -26,12 +26,12 @@ host=127.0.0.1 unset QPID_NO_MODULE_DIR # failover_soak uses --module-dir, dont want clash MODULES=${MODULES:-$moduledir} -MESSAGES=${MESSAGES:-1000000} +MESSAGES=${MESSAGES:-500000} REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000} VERBOSITY=${VERBOSITY:-10} DURABILITY=${DURABILITY:-0} N_QUEUES=${N_QUEUES:-1} -N_BROKERS=${N_BROKERS:-3} +N_BROKERS=${N_BROKERS:-4} rm -f soak-*.log exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 87fbbd128b..07bd4b2bee 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -73,3 +73,6 @@ exportmodule XML_LIB xml.so export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules export QPID_DATA_DIR= # Default to no data dir, not ~/.qpidd +# Options for boost test framework +export BOOST_TEST_SHOW_PROGRESS=yes +export BOOST_TEST_CATCH_SYSTEM_ERRORS=no diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects old mode 100644 new mode 100755 index cea875662f..664b88cb3b --- a/qpid/cpp/src/tests/verify_cluster_objects +++ b/qpid/cpp/src/tests/verify_cluster_objects @@ -75,6 +75,12 @@ class IpAddr: bestAddr = addrPort return bestAddr +class ObjectId: + """Object identity, use for dictionaries by object id""" + def __init__(self, object): self.object = object + def __eq__(self, other): return self.object is other.object + def __hash__(self): return hash(id(self.object)) + class Broker(object): def __init__(self, qmf, broker): self.broker = broker @@ -94,6 +100,7 @@ class Broker(object): self.uptime = 0 self.tablesByName = {} self.package = "org.apache.qpid.broker" + self.id_cache = {} # Cache for getAbstractId def getUrl(self): return self.broker.getUrl() @@ -114,13 +121,14 @@ class Broker(object): # def getAbstractId(self, object): """ return a string the of the hierarchical name """ + if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)] global _debug_recursion result = u"" valstr = u"" _debug_recursion += 1 debug_prefix = _debug_recursion if (_verbose > 9): - print debug_prefix, " enter gai: props ", self._properties + print debug_prefix, " enter gai: props ", object._properties for property, value in object._properties: # we want to recurse on things which are refs. we tell by @@ -138,6 +146,7 @@ class Broker(object): if property.name == "systemRef": _debug_recursion -= 1 + self.id_cache[ObjectId(object)] = "" return "" if property.index: @@ -176,6 +185,7 @@ class Broker(object): if (_verbose > 9): print debug_prefix, " id ", self, " -> ", result _debug_recursion -= 1 + self.id_cache[ObjectId(object)] = result return result def loadTable(self, cls): @@ -196,13 +206,12 @@ class Broker(object): # error (ie, the name-generation code is busted) if we do key = self.getAbstractId(obj) if key in self.tablesByName[cls.getClassName()]: - print "internal error: collision for %s on key %s\n" % (obj, key) - sys.exit(1) + raise Exception("internal error: collision for %s on key %s\n" + % (obj, key)) - self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj -# sys.exit(1) + self.tablesByName[cls.getClassName()][key] = obj if _verbose > 1: - print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj) + print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key class BrokerManager: @@ -253,9 +262,10 @@ class BrokerManager: raise Exception("Invalid URL 2") addrList.append((tokens[1], tokens[2])) - # Find the address in the list that is most likely to be in the same subnet as the address - # with which we made the original QMF connection. This increases the probability that we will - # be able to reach the cluster member. + # Find the address in the list that is most likely to be + # in the same subnet as the address with which we made the + # original QMF connection. This increases the probability + # that we will be able to reach the cluster member. best = hostAddr.bestAddr(addrList) bestUrl = best[0] + ":" + best[1] @@ -284,8 +294,7 @@ class BrokerManager: if _verbose > 0: print " ", b else: - print "Failed - Not a cluster" - sys.exit(1) + raise Exception("Failed - Not a cluster") failures = [] @@ -348,11 +357,10 @@ class BrokerManager: print "Failures:" for failure in failures: print " %s" % failure - sys.exit(1) + raise Exception("Failures") if _verbose > 0: print "Success" - sys.exit(0) ## ## Main Program diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py index a6b3c39ad1..d884325071 100644 --- a/qpid/extras/qmf/src/py/qmf2/agent.py +++ b/qpid/extras/qmf/src/py/qmf2/agent.py @@ -21,18 +21,19 @@ import logging import datetime import time import Queue -from threading import Thread, Lock, currentThread, Event +from threading import Thread, RLock, currentThread, Event from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 -from common import (make_subject, parse_subject, OpCode, QmfQuery, - SchemaObjectClass, MsgKey, QmfData, QmfAddress, - SchemaClass, SchemaClassId, WorkItem, SchemaMethod, - timedelta_to_secs) +from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass, + QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem, + SchemaMethod, timedelta_to_secs, QMF_APP_ID) # global flag that indicates which thread (if any) is # running the agent notifier callback _callback_thread=None + + ##============================================================================== ## METHOD CALL ##============================================================================== @@ -78,14 +79,73 @@ class MethodCallParams(object): return self._user_id + ##============================================================================== + ## SUBSCRIPTIONS + ##============================================================================== + +class _ConsoleHandle(object): + """ + """ + def __init__(self, handle, reply_to): + self.console_handle = handle + self.reply_to = reply_to + +class SubscriptionParams(object): + """ + """ + def __init__(self, console_handle, query, interval, duration, user_id): + self._console_handle = console_handle + self._query = query + self._interval = interval + self._duration = duration + self._user_id = user_id + + def get_console_handle(self): + return self._console_handle + + def get_query(self): + return self._query + + def get_interval(self): + return self._interval + + def get_duration(self): + return self._duration + + def get_user_id(self): + return self._user_id + +class _SubscriptionState(object): + """ + An internally-managed subscription. + """ + def __init__(self, reply_to, cid, query, interval, duration): + self.reply_to = reply_to + self.correlation_id = cid + self.query = query + self.interval = interval + self.duration = duration + now = datetime.datetime.utcnow() + self.next_update = now # do an immediate update + self.expiration = now + datetime.timedelta(seconds=duration) + self.id = 0 + + def resubscribe(self, now, _duration=None): + if _duration is not None: + self.duration = _duration + self.expiration = now + datetime.timedelta(seconds=self.duration) + + def reset_interval(self, now): + self.next_update = now + datetime.timedelta(seconds=self.interval) + + ##============================================================================== ## AGENT ##============================================================================== class Agent(Thread): - def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, - _max_msg_size=0, _capacity=10): + def __init__(self, name, _domain=None, _notifier=None, **options): Thread.__init__(self) self._running = False self._ready = Event() @@ -94,11 +154,20 @@ class Agent(Thread): self._domain = _domain self._address = QmfAddress.direct(self.name, self._domain) self._notifier = _notifier - self._heartbeat_interval = _heartbeat_interval + + # configurable parameters + # + self._heartbeat_interval = options.get("heartbeat_interval", 30) + self._capacity = options.get("capacity", 10) + self._default_duration = options.get("default_duration", 300) + self._max_duration = options.get("max_duration", 3600) + self._min_duration = options.get("min_duration", 10) + self._default_interval = options.get("default_interval", 30) + self._min_interval = options.get("min_interval", 5) + # @todo: currently, max # of objects in a single reply message, would # be better if it were max bytesize of per-msg content... - self._max_msg_size = _max_msg_size - self._capacity = _capacity + self._max_msg_size = options.get("max_msg_size", 0) self._conn = None self._session = None @@ -107,7 +176,7 @@ class Agent(Thread): self._direct_sender = None self._topic_sender = None - self._lock = Lock() + self._lock = RLock() self._packages = {} self._schema_timestamp = long(0) self._schema = {} @@ -119,6 +188,10 @@ class Agent(Thread): self._undescribed_data = {} self._work_q = Queue.Queue() self._work_q_put = False + # subscriptions + self._subscription_id = long(time.time()) + self._subscriptions = {} + self._next_subscribe_event = None def destroy(self, timeout=None): @@ -192,10 +265,11 @@ class Agent(Thread): if self.isAlive(): # kick my thread to wake it up try: - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, + msg = Message(id=QMF_APP_ID, subject=self.name, - content={"noop":"noop"}) + properties={ "method":"request", + "qmf.opcode":OpCode.noop}, + content={}) # TRACE #logging.error("!!! sending wakeup to myself: %s" % msg) @@ -258,13 +332,14 @@ class Agent(Thread): raise Exception("No connection available") # @todo: should we validate against the schema? - _map = {"_name": self.get_name(), - "_event": qmfEvent.map_encode()} - msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + + msg = Message(id=QMF_APP_ID, + subject=QmfAddress.SUBJECT_AGENT_EVENT + "." + qmfEvent.get_severity() + "." + self.name, - properties={"method":"response", - "qmf.subject":make_subject(OpCode.event_ind)}, - content={MsgKey.event:_map}) + properties={"method":"indication", + "qmf.opcode":OpCode.data_ind, + "qmf.content": ContentType.event, + "qmf.agent":self.name}, + content=[qmfEvent.map_encode()]) # TRACE # logging.error("!!! Agent %s sending Event (%s)" % # (self.name, str(msg))) @@ -330,9 +405,10 @@ class Agent(Thread): raise TypeError("Invalid type for error - must be QmfData") _map[SchemaMethod.KEY_ERROR] = _error.map_encode() - msg = Message( properties={"method":"response", - "qmf.subject":make_subject(OpCode.response)}, - content={MsgKey.method:_map}) + msg = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.method_rsp}, + content=_map) msg.correlation_id = handle.correlation_id self._send_reply(msg, handle.reply_to) @@ -370,25 +446,10 @@ class Agent(Thread): while self._running: - now = datetime.datetime.utcnow() - # print("now=%s next_heartbeat=%s" % (now, next_heartbeat)) - if now >= next_heartbeat: - ind = self._makeAgentIndMsg() - ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT - # TRACE - #logging.error("!!! Agent %s sending Heartbeat (%s)" % - # (self.name, str(ind))) - self._topic_sender.send(ind) - logging.debug("Agent Indication Sent") - next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) - - timeout = timedelta_to_secs(next_heartbeat - now) - # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout)) - try: - self._session.next_receiver(timeout=timeout) - except Empty: - continue - + # + # Process inbound messages + # + logging.debug("%s processing inbound messages..." % self.name) for i in range(batch_limit): try: msg = self._topic_receiver.fetch(timeout=0) @@ -409,7 +470,71 @@ class Agent(Thread): # (self.name, self._direct_receiver.source, msg)) self._dispatch(msg, _direct=True) + # + # Send Heartbeat Notification + # + now = datetime.datetime.utcnow() + if now >= next_heartbeat: + logging.debug("%s sending heartbeat..." % self.name) + ind = Message(id=QMF_APP_ID, + subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT, + properties={"method":"indication", + "qmf.opcode":OpCode.agent_heartbeat_ind, + "qmf.agent":self.name}, + content=self._makeAgentInfoBody()) + # TRACE + #logging.error("!!! Agent %s sending Heartbeat (%s)" % + # (self.name, str(ind))) + self._topic_sender.send(ind) + logging.debug("Agent Indication Sent") + next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) + + + + # + # Monitor Subscriptions + # + if (self._next_subscribe_event is None or + now >= self._next_subscribe_event): + + logging.debug("%s polling subscriptions..." % self.name) + self._next_subscribe_event = now + datetime.timedelta(seconds= + self._max_duration) + self._lock.acquire() + try: + dead_ss = [] + for sid,ss in self._subscriptions.iteritems(): + if now >= ss.expiration: + dead_ss.append(sid) + continue + if now >= ss.next_update: + response = [] + objs = self._queryData(ss.query) + if objs: + for obj in objs: + response.append(obj.map_encode()) + logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id)) + self._send_query_response( ContentType.data, + ss.correlation_id, + ss.reply_to, + response) + ss.reset_interval(now) + + next_timeout = min(ss.expiration, ss.next_update) + if next_timeout < self._next_subscribe_event: + self._next_subscribe_event = next_timeout + + for sid in dead_ss: + del self._subscriptions[sid] + finally: + self._lock.release() + + # + # notify application of pending WorkItems + # + if self._work_q_put and self._notifier: + logging.debug("%s notifying application..." % self.name) # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() @@ -417,19 +542,33 @@ class Agent(Thread): self._notifier.indication() _callback_thread = None + # + # Sleep until messages arrive or something times out + # + next_timeout = min(next_heartbeat, self._next_subscribe_event) + timeout = timedelta_to_secs(next_timeout - + datetime.datetime.utcnow()) + if timeout > 0.0: + logging.debug("%s sleeping %s seconds..." % (self.name, + timeout)) + try: + self._session.next_receiver(timeout=timeout) + except Empty: + pass + + + + # # Private: # - def _makeAgentIndMsg(self): + def _makeAgentInfoBody(self): """ - Create an agent indication message identifying this agent + Create an agent indication message body identifying this agent """ - _map = {"_name": self.get_name(), + return {"_name": self.get_name(), "_schema_timestamp": self._schema_timestamp} - return Message(properties={"method":"response", - "qmf.subject":make_subject(OpCode.agent_ind)}, - content={MsgKey.agent_info: _map}) def _send_reply(self, msg, reply_to): """ @@ -458,7 +597,7 @@ class Agent(Thread): except SendError, e: logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) - def _send_query_response(self, subject, msgkey, cid, reply_to, objects): + def _send_query_response(self, content_type, cid, reply_to, objects): """ Send a response to a query, breaking the result into multiple messages based on the agent's _max_msg_size config parameter @@ -472,24 +611,28 @@ class Agent(Thread): start = 0 end = min(total, max_count) - while end <= total: - m = Message(properties={"qmf.subject":subject, - "method":"response"}, + # send partial response if too many objects present + while end < total: + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "partial":None, + "qmf.opcode":OpCode.data_ind, + "qmf.content":content_type, + "qmf.agent":self.name}, correlation_id = cid, - content={msgkey:objects[start:end]}) + content=objects[start:end]) self._send_reply(m, reply_to) - if end == total: - break; start = end end = min(total, end + max_count) - # response terminator - last message has empty object array - if total: - m = Message(properties={"qmf.subject":subject, - "method":"response"}, - correlation_id = cid, - content={msgkey: []} ) - self._send_reply(m, reply_to) + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.data_ind, + "qmf.content":content_type, + "qmf.agent":self.name}, + correlation_id = cid, + content=objects[start:end]) + self._send_reply(m, reply_to) def _dispatch(self, msg, _direct=False): """ @@ -497,33 +640,32 @@ class Agent(Thread): @param _direct: True if msg directly addressed to this agent. """ - logging.debug( "Message received from Console! [%s]" % msg ) - try: - version,opcode = parse_subject(msg.properties.get("qmf.subject")) - except: - logging.warning("Ignoring unrecognized message '%s'" % msg.subject) - return + # logging.debug( "Message received from Console! [%s]" % msg ) + # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) ) + opcode = msg.properties.get("qmf.opcode") + if not opcode: + logging.warning("Ignoring unrecognized message '%s'" % msg) + return + version = 2 # @todo: fix me cmap = {}; props={} if msg.content_type == "amqp/map": cmap = msg.content if msg.properties: props = msg.properties - if opcode == OpCode.agent_locate: + if opcode == OpCode.agent_locate_req: self._handleAgentLocateMsg( msg, cmap, props, version, _direct ) - elif opcode == OpCode.get_query: + elif opcode == OpCode.query_req: self._handleQueryMsg( msg, cmap, props, version, _direct ) elif opcode == OpCode.method_req: self._handleMethodReqMsg(msg, cmap, props, version, _direct) - elif opcode == OpCode.cancel_subscription: - logging.warning("!!! CANCEL_SUB TBD !!!") - elif opcode == OpCode.create_subscription: - logging.warning("!!! CREATE_SUB TBD !!!") - elif opcode == OpCode.renew_subscription: - logging.warning("!!! RENEW_SUB TBD !!!") - elif opcode == OpCode.schema_query: - logging.warning("!!! SCHEMA_QUERY TBD !!!") + elif opcode == OpCode.subscribe_req: + self._handleSubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_refresh_req: + self._handleResubscribeReqMsg(msg, cmap, props, version, _direct) + elif opcode == OpCode.subscribe_cancel_ind: + self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.noop: logging.debug("No-op msg received.") else: @@ -536,18 +678,28 @@ class Agent(Thread): """ logging.debug("_handleAgentLocateMsg") - reply = True - if "method" in props and props["method"] == "request": - query = cmap.get(MsgKey.query) - if query is not None: - # fake a QmfData containing my identifier for the query compare - tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: - self.get_name()}, - _object_id="my-name") - reply = QmfQuery.from_map(query).evaluate(tmpData) + reply = False + if props.get("method") == "request": + # if the message is addressed to me or wildcard, process it + if (msg.subject == "console.ind" or + msg.subject == "console.ind.locate" or + msg.subject == "console.ind.locate." + self.name): + pred = msg.content + if not pred: + reply = True + elif isinstance(pred, type([])): + # fake a QmfData containing my identifier for the query compare + query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred) + tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME: + self.get_name()}, + _object_id="my-name") + reply = query.evaluate(tmpData) if reply: - m = self._makeAgentIndMsg() + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.agent_locate_rsp}, + content=self._makeAgentInfoBody()) m.correlation_id = msg.correlation_id self._send_reply(m, msg.reply_to) else: @@ -561,22 +713,25 @@ class Agent(Thread): logging.debug("_handleQueryMsg") if "method" in props and props["method"] == "request": - qmap = cmap.get(MsgKey.query) - if qmap: - query = QmfQuery.from_map(qmap) + if cmap: + try: + query = QmfQuery.from_map(cmap) + except TypeError: + logging.error("Invalid Query format: '%s'" % str(cmap)) + return target = query.get_target() if target == QmfQuery.TARGET_PACKAGES: - self._queryPackages( msg, query ) + self._queryPackagesReply( msg, query ) elif target == QmfQuery.TARGET_SCHEMA_ID: - self._querySchema( msg, query, _idOnly=True ) + self._querySchemaReply( msg, query, _idOnly=True ) elif target == QmfQuery.TARGET_SCHEMA: - self._querySchema( msg, query) + self._querySchemaReply( msg, query) elif target == QmfQuery.TARGET_AGENT: logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") elif target == QmfQuery.TARGET_OBJECT_ID: - self._queryData(msg, query, _idOnly=True) + self._queryDataReply(msg, query, _idOnly=True) elif target == QmfQuery.TARGET_OBJECT: - self._queryData(msg, query) + self._queryDataReply(msg, query) else: logging.warning("Unrecognized query target: '%s'" % str(target)) @@ -634,7 +789,173 @@ class Agent(Thread): self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param)) self._work_q_put = True - def _queryPackages(self, msg, query): + def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Subscription Request + """ + if "method" in props and props["method"] == "request": + query_map = cmap.get("_query") + interval = cmap.get("_interval") + duration = cmap.get("_duration") + + try: + query = QmfQuery.from_map(query_map) + except TypeError: + logging.warning("Invalid query for subscription: %s" % + str(query_map)) + return + + if isinstance(self, AgentExternal): + # param = SubscriptionParams(_ConsoleHandle(console_handle, + # msg.reply_to), + # query, + # interval, + # duration, + # msg.user_id) + # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST, + # msg.correlation_id, param)) + # self._work_q_put = True + logging.error("External Subscription TBD") + return + + # validate the query - only specific objects, or + # objects wildcard, are currently supported. + if (query.get_target() != QmfQuery.TARGET_OBJECT or + (query.get_selector() == QmfQuery.PREDICATE and + query.get_predicate())): + logging.error("Subscriptions only support (wildcard) Object" + " Queries.") + err = QmfData.create( + {"reason": "Unsupported Query type for subscription.", + "query": str(query.map_encode())}) + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content={"_error": err.map_encode()}) + self._send_reply(m, msg.reply_to) + return + + if duration is None: + duration = self._default_duration + else: + try: + duration = float(duration) + if duration > self._max_duration: + duration = self._max_duration + elif duration < self._min_duration: + duration = self._min_duration + except: + logging.warning("Bad duration value: %s" % str(msg)) + duration = self._default_duration + + if interval is None: + interval = self._default_interval + else: + try: + interval = float(interval) + if interval < self._min_interval: + interval = self._min_interval + except: + logging.warning("Bad interval value: %s" % str(msg)) + interval = self._default_interval + + ss = _SubscriptionState(msg.reply_to, + msg.correlation_id, + query, + interval, + duration) + self._lock.acquire() + try: + sid = self._subscription_id + self._subscription_id += 1 + ss.id = sid + self._subscriptions[sid] = ss + self._next_subscribe_event = None + finally: + self._lock.release() + + sr_map = {"_subscription_id": sid, + "_interval": interval, + "_duration": duration} + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_rsp}, + correlation_id = msg.correlation_id, + content=sr_map) + self._send_reply(m, msg.reply_to) + + + + def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Renew Subscription Request + """ + if props.get("method") == "request": + sid = cmap.get("_subscription_id") + if not sid: + logging.error("Invalid subscription refresh msg: %s" % + str(msg)) + return + + self._lock.acquire() + try: + ss = self._subscriptions.get(sid) + if not ss: + logging.error("Ignoring unknown subscription: %s" % + str(sid)) + return + duration = cmap.get("_duration") + if duration is not None: + try: + duration = float(duration) + if duration > self._max_duration: + duration = self._max_duration + elif duration < self._min_duration: + duration = self._min_duration + except: + logging.error("Bad duration value: %s" % str(msg)) + duration = None # use existing duration + + ss.resubscribe(datetime.datetime.utcnow(), duration) + + new_duration = ss.duration + new_interval = ss.interval + + finally: + self._lock.release() + + + sr_map = {"_subscription_id": sid, + "_interval": new_interval, + "_duration": new_duration} + m = Message(id=QMF_APP_ID, + properties={"method":"response", + "qmf.opcode":OpCode.subscribe_refresh_rsp}, + correlation_id = msg.correlation_id, + content=sr_map) + self._send_reply(m, msg.reply_to) + + + def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct): + """ + Process received Cancel Subscription Request + """ + if props.get("method") == "request": + sid = cmap.get("_subscription_id") + if not sid: + logging.warning("No subscription id supplied: %s" % msg) + return + + self._lock.acquire() + try: + if sid in self._subscriptions: + del self._subscriptions[sid] + finally: + self._lock.release() + + + def _queryPackagesReply(self, msg, query): """ Run a query against the list of known packages """ @@ -646,58 +967,83 @@ class Agent(Thread): _object_id="_package") if query.evaluate(qmfData): pnames.append(name) + + self._send_query_response(ContentType.schema_package, + msg.correlation_id, + msg.reply_to, + pnames) finally: self._lock.release() - self._send_query_response(make_subject(OpCode.data_ind), - MsgKey.package_info, - msg.correlation_id, - msg.reply_to, - pnames) - def _querySchema( self, msg, query, _idOnly=False ): + def _querySchemaReply( self, msg, query, _idOnly=False ): """ """ schemas = [] - # if querying for a specific schema, do a direct lookup - if query.get_selector() == QmfQuery.ID: - found = None - self._lock.acquire() - try: + + self._lock.acquire() + try: + # if querying for a specific schema, do a direct lookup + if query.get_selector() == QmfQuery.ID: found = self._schema.get(query.get_id()) - finally: - self._lock.release() - if found: - if _idOnly: - schemas.append(query.get_id().map_encode()) - else: - schemas.append(found.map_encode()) - else: # otherwise, evaluate all schema - self._lock.acquire() - try: + if found: + if _idOnly: + schemas.append(query.get_id().map_encode()) + else: + schemas.append(found.map_encode()) + else: # otherwise, evaluate all schema for sid,val in self._schema.iteritems(): if query.evaluate(val): if _idOnly: schemas.append(sid.map_encode()) else: schemas.append(val.map_encode()) - finally: - self._lock.release() + if _idOnly: + msgkey = ContentType.schema_id + else: + msgkey = ContentType.schema_class - if _idOnly: - msgkey = MsgKey.schema_id - else: - msgkey = MsgKey.schema + self._send_query_response(msgkey, + msg.correlation_id, + msg.reply_to, + schemas) + finally: + self._lock.release() + + + def _queryDataReply( self, msg, query, _idOnly=False ): + """ + """ + # hold the (recursive) lock for the duration so the Agent + # won't send data that is currently being modified by the + # app. + self._lock.acquire() + try: + response = [] + data_objs = self._queryData(query) + if _idOnly: + for obj in data_objs: + response.append(obj.get_object_id()) + else: + for obj in data_objs: + response.append(obj.map_encode()) + + if _idOnly: + msgkey = ContentType.object_id + else: + msgkey = ContentType.data - self._send_query_response(make_subject(OpCode.data_ind), - msgkey, - msg.correlation_id, - msg.reply_to, - schemas) + self._send_query_response(msgkey, + msg.correlation_id, + msg.reply_to, + response) + finally: + self._lock.release() - def _queryData( self, msg, query, _idOnly=False ): + def _queryData(self, query): """ + Return a list of QmfData objects that match a given query """ data_objs = [] # extract optional schema_id from target params @@ -705,12 +1051,12 @@ class Agent(Thread): t_params = query.get_target_param() if t_params: sid = t_params.get(QmfData.KEY_SCHEMA_ID) - # if querying for a specific object, do a direct lookup - if query.get_selector() == QmfQuery.ID: - oid = query.get_id() - found = None - self._lock.acquire() - try: + + self._lock.acquire() + try: + # if querying for a specific object, do a direct lookup + if query.get_selector() == QmfQuery.ID: + oid = query.get_id() if sid and not sid.get_hash_string(): # wildcard schema_id match, check each schema for name,db in self._described_data.iteritems(): @@ -718,11 +1064,9 @@ class Agent(Thread): and name.get_package_name() == sid.get_package_name()): found = db.get(oid) if found: - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(found.map_encode()) + data_objs.append(found) else: + found = None if sid: db = self._described_data.get(sid) if db: @@ -730,15 +1074,9 @@ class Agent(Thread): else: found = self._undescribed_data.get(oid) if found: - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(found.map_encode()) - finally: - self._lock.release() - else: # otherwise, evaluate all data - self._lock.acquire() - try: + data_objs.append(found) + + else: # otherwise, evaluate all data if sid and not sid.get_hash_string(): # wildcard schema_id match, check each schema for name,db in self._described_data.iteritems(): @@ -746,10 +1084,7 @@ class Agent(Thread): and name.get_package_name() == sid.get_package_name()): for oid,data in db.iteritems(): if query.evaluate(data): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(data.map_encode()) + data_objs.append(data) else: if sid: db = self._described_data.get(sid) @@ -759,23 +1094,28 @@ class Agent(Thread): if db: for oid,data in db.iteritems(): if query.evaluate(data): - if _idOnly: - data_objs.append(oid) - else: - data_objs.append(data.map_encode()) - finally: - self._lock.release() + data_objs.append(data) + finally: + self._lock.release() - if _idOnly: - msgkey = MsgKey.object_id - else: - msgkey = MsgKey.data_obj + return data_objs - self._send_query_response(make_subject(OpCode.data_ind), - msgkey, - msg.correlation_id, - msg.reply_to, - data_objs) + + + ##============================================================================== + ## EXTERNAL DATABASE AGENT + ##============================================================================== + +class AgentExternal(Agent): + """ + An Agent which uses an external management database. + """ + def __init__(self, name, _domain=None, _notifier=None, + _heartbeat_interval=30, _max_msg_size=0, _capacity=10): + super(AgentExternal, self).__init__(name, _domain, _notifier, + _heartbeat_interval, + _max_msg_size, _capacity) + logging.error("AgentExternal TBD") @@ -823,9 +1163,11 @@ class QmfAgentData(QmfData): _schema_id=schema_id, _const=False) self._agent = agent self._validated = False + self._modified = True def destroy(self): self._dtime = long(time.time() * 1000) + self._touch() # @todo: publish change def is_deleted(self): @@ -833,6 +1175,7 @@ class QmfAgentData(QmfData): def set_value(self, _name, _value, _subType=None): super(QmfAgentData, self).set_value(_name, _value, _subType) + self._touch() # @todo: publish change def inc_value(self, name, delta=1): @@ -849,6 +1192,7 @@ class QmfAgentData(QmfData): """ subtract the delta from the property """ # @todo: need to take write-lock logging.error(" TBD!!!") + self._touch() def validate(self): """ @@ -868,6 +1212,13 @@ class QmfAgentData(QmfData): raise Exception("Required property '%s' not present." % name) self._validated = True + def _touch(self): + """ + Mark this object as modified. Used to force a publish of this object + if on subscription. + """ + self._modified = True + ################################################################################ diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py index 8107b86666..8070add806 100644 --- a/qpid/extras/qmf/src/py/qmf2/common.py +++ b/qpid/extras/qmf/src/py/qmf2/common.py @@ -34,61 +34,44 @@ log_query = getLogger("qmf.query") ## Constants ## -AMQP_QMF_SUBJECT = "qmf" -AMQP_QMF_VERSION = 4 -AMQP_QMF_SUBJECT_FMT = "%s%d.%s" - -class MsgKey(object): - agent_info = "agent_info" - query = "query" - package_info = "package_info" - schema_id = "schema_id" - schema = "schema" - object_id="object_id" - data_obj="object" - method="method" - event="event" +QMF_APP_ID="qmf2" -class OpCode(object): - noop = "noop" - - # codes sent by a console and processed by the agent - agent_locate = "agent-locate" - cancel_subscription = "cancel-subscription" - create_subscription = "create-subscription" - get_query = "get-query" - method_req = "method" - renew_subscription = "renew-subscription" - schema_query = "schema-query" # @todo: deprecate - - # codes sent by the agent to a console - agent_ind = "agent" - data_ind = "data" - event_ind = "event" - managed_object = "managed-object" - object_ind = "object" - response = "response" - schema_ind="schema" # @todo: deprecate +class ContentType(object): + """ Values for the 'qmf.content' message header + """ + schema_package = "_schema_package" + schema_id = "_schema_id" + schema_class = "_schema_class" + object_id = "_object_id" + data = "_data" + event = "_event" +class OpCode(object): + """ Values for the 'qmf.opcode' message header. + """ + noop = "_noop" + # codes sent by a console and processed by the agent + agent_locate_req = "_agent_locate_request" + subscribe_req = "_subscribe_request" + subscribe_cancel_ind = "_subscribe_cancel_indication" + subscribe_refresh_req = "_subscribe_refresh_indication" + query_req = "_query_request" + method_req = "_method_request" -def make_subject(_code): - """ - Create a message subject field value. - """ - return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code) + # codes sent by the agent to a console + agent_locate_rsp = "_agent_locate_response" + agent_heartbeat_ind = "_agent_heartbeat_indication" + query_rsp = "_query_response" + subscribe_rsp = "_subscribe_response" + subscribe_refresh_rsp = "_subscribe_refresh_response" + data_ind = "_data_indication" + method_rsp = "_method_response" -def parse_subject(_sub): - """ - Deconstruct a subject field, return version,opcode values - """ - if _sub[:3] != "qmf": - raise Exception("Non-QMF message received") - return _sub[3:].split('.', 1) def timedelta_to_secs(td): """ @@ -133,11 +116,15 @@ class WorkItem(object): AGENT_HEARTBEAT=8 QUERY_COMPLETE=9 METHOD_RESPONSE=10 + SUBSCRIBE_RESPONSE=11 + SUBSCRIBE_INDICATION=12 + RESUBSCRIBE_RESPONSE=13 # Enumeration of the types of WorkItems produced on the Agent METHOD_CALL=1000 QUERY=1001 - SUBSCRIBE=1002 - UNSUBSCRIBE=1003 + SUBSCRIBE_REQUEST=1002 + RESUBSCRIBE_REQUEST=1003 + UNSUBSCRIBE_REQUEST=1004 def __init__(self, kind, handle, _params=None): """ diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py index c13cf70755..afd20c3655 100644 --- a/qpid/extras/qmf/src/py/qmf2/console.py +++ b/qpid/extras/qmf/src/py/qmf2/console.py @@ -24,14 +24,14 @@ import time import datetime import Queue from threading import Thread, Event -from threading import Lock +from threading import RLock from threading import currentThread from threading import Condition from qpid.messaging import Connection, Message, Empty, SendError -from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier, - MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId, +from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType, + QmfData, QmfAddress, SchemaClass, SchemaClassId, SchemaEventClass, SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent, timedelta_to_secs) @@ -141,6 +141,7 @@ class _AsyncMailbox(_Mailbox): console._lock.acquire() try: console._async_mboxes[self.cid] = self + console._next_mbox_expire = None finally: console._lock.release() @@ -150,6 +151,24 @@ class _AsyncMailbox(_Mailbox): console._wake_thread() + def reset_timeout(self, _timeout=None): + """ Reset the expiration date for this mailbox. + """ + if _timeout is None: + _timeout = self.console._reply_timeout + self.console._lock.acquire() + try: + self.expiration_date = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=_timeout)) + self.console._next_mbox_expire = None + finally: + self.console._lock.release() + + # wake the console mgmt thread so it will learn about the mbox + # expiration date (and adjust its idle sleep period correctly) + + self.console._wake_thread() + def deliver(self, msg): """ """ @@ -177,7 +196,7 @@ class _QueryMailbox(_AsyncMailbox): def __init__(self, console, agent_name, context, - target, msgkey, + target, _timeout=None): """ Invoked by application thread. @@ -186,7 +205,6 @@ class _QueryMailbox(_AsyncMailbox): _timeout) self.agent_name = agent_name self.target = target - self.msgkey = msgkey self.context = context self.result = [] @@ -195,11 +213,8 @@ class _QueryMailbox(_AsyncMailbox): Process query response messages delivered to this mailbox. Invoked by Console Management thread only. """ - done = False - objects = reply.content.get(self.msgkey) - if not objects: - done = True - else: + objects = reply.content + if isinstance(objects, type([])): # convert from map to native types if needed if self.target == QmfQuery.TARGET_SCHEMA_ID: for sid_map in objects: @@ -237,8 +252,7 @@ class _QueryMailbox(_AsyncMailbox): # no conversion needed. self.result += objects - if done: - # create workitem + if not "partial" in reply.properties: # logging.error("QUERY COMPLETE for %s" % str(self.context)) wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) @@ -278,8 +292,8 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): Process schema response messages. """ done = False - schemas = reply.content.get(MsgKey.schema) - if schemas: + schemas = reply.content + if schemas and isinstance(schemas, type([])): for schema_map in schemas: # extract schema id, convert based on schema type sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID) @@ -319,8 +333,8 @@ class _MethodMailbox(_AsyncMailbox): Invoked by Console Management thread only. """ - _map = reply.content.get(MsgKey.method) - if not _map: + _map = reply.content + if not _map or not isinstance(_map, type({})): logging.error("Invalid method call reply message") result = None else: @@ -355,6 +369,207 @@ class _MethodMailbox(_AsyncMailbox): +class _SubscriptionMailbox(_AsyncMailbox): + """ + A Mailbox for a single subscription. Allows only sychronous "subscribe" + and "refresh" requests. + """ + def __init__(self, console, context, agent, duration, interval): + """ + Invoked by application thread. + """ + super(_SubscriptionMailbox, self).__init__(console, duration) + self.cv = Condition() + self.data = [] + self.result = [] + self.context = context + self.duration = duration + self.interval = interval + self.agent_name = agent.get_name() + self.agent_subscription_id = None # from agent + + def subscribe(self, query): + agent = self.console.get_agent(self.agent_name) + if not agent: + logging.warning("subscribed failed - unknown agent '%s'" % + self.agent_name) + return False + try: + logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name) + agent._send_subscribe_req(query, self.get_address(), self.interval, + self.duration) + except SendError, e: + logging.error(str(e)) + return False + return True + + def resubscribe(self, duration): + agent = self.console.get_agent(self.agent_name) + if not agent: + logging.warning("resubscribed failed - unknown agent '%s'" % + self.agent_name) + return False + try: + logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name) + agent._send_resubscribe_req(self.get_address(), + self.agent_subscription_id, duration) + except SendError, e: + logging.error(str(e)) + return False + return True + + def deliver(self, msg): + """ + """ + opcode = msg.properties.get("qmf.opcode") + if (opcode == OpCode.subscribe_rsp or + opcode == OpCode.subscribe_refresh_rsp): + + error = msg.content.get("_error") + if error: + try: + e_map = QmfData.from_map(error) + except TypeError: + logging.warning("Invalid QmfData map received: '%s'" + % str(error)) + e_map = QmfData.create({"error":"Unknown error"}) + sp = SubscribeParams(None, None, None, e_map) + else: + self.agent_subscription_id = msg.content.get("_subscription_id") + self.duration = msg.content.get("_duration", self.duration) + self.interval = msg.content.get("_interval", self.interval) + self.reset_timeout(self.duration) + sp = SubscribeParams(self.get_address(), + self.interval, + self.duration, + None) + self.cv.acquire() + try: + self.data.append(sp) + # if was empty, notify waiters + if len(self.data) == 1: + self.cv.notify() + finally: + self.cv.release() + return + + # else: data indication + agent_name = msg.properties.get("qmf.agent") + if not agent_name: + logging.warning("Ignoring data_ind - no agent name given: %s" % + msg) + return + agent = self.console.get_agent(agent_name) + if not agent: + logging.warning("Ignoring data_ind - unknown agent '%s'" % + agent_name) + return + + objects = msg.content + for obj_map in objects: + obj = QmfConsoleData(map_=obj_map, agent=agent) + # start fetch of schema if not known + sid = obj.get_schema_class_id() + if sid: + self.console._prefetch_schema(sid, agent) + self.result.append(obj) + + if not "partial" in msg.properties: + wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result) + self.result = [] + self.console._work_q.put(wi) + self.console._work_q_put = True + + def fetch(self, timeout=None): + """ + Get one data item from a mailbox, with timeout. + Invoked by application thread. + """ + self.cv.acquire() + try: + if len(self.data) == 0: + self.cv.wait(timeout) + if len(self.data): + return self.data.pop(0) + return None + finally: + self.cv.release() + + def expire(self): + """ The subscription expired. + """ + self.destroy() + + + + +class _AsyncSubscriptionMailbox(_SubscriptionMailbox): + """ + A Mailbox for a single subscription. Allows only asychronous "subscribe" + and "refresh" requests. + """ + def __init__(self, console, context, agent, duration, interval): + """ + Invoked by application thread. + """ + super(_AsyncSubscriptionMailbox, self).__init__(console, context, + agent, duration, + interval) + self.subscribe_pending = False + self.resubscribe_pending = False + + + def subscribe(self, query, reply_timeout): + if super(_AsyncSubscriptionMailbox, self).subscribe(query): + self.subscribe_pending = True + self.reset_timeout(reply_timeout) + return True + return False + + def resubscribe(self, duration, reply_timeout): + if super(_AsyncSubscriptionMailbox, self).resubscribe(duration): + self.resubscribe_pending = True + self.reset_timeout(reply_timeout) + return True + return False + + def deliver(self, msg): + """ + """ + super(_AsyncSubscriptionMailbox, self).deliver(msg) + sp = self.fetch(0) + if sp: + # if the message was a reply to a subscribe or + # re-subscribe, then we get here. + if self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, + self.context, sp) + else: + wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, + self.context, sp) + + self.subscribe_pending = False + self.resubscribe_pending = False + + self.console._work_q.put(wi) + self.console._work_q_put = True + + if not sp.succeeded(): + self.destroy() + + + def expire(self): + """ Either the subscription expired, or a request timedout. + """ + if self.subscribe_pending: + wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE, + self.context, None) + elif self.resubscribe_pending: + wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE, + self.context, None) + self.destroy() + + ##============================================================================== ## DATA MODEL ##============================================================================== @@ -481,8 +696,8 @@ class QmfConsoleData(QmfData): logging.debug("Agent method req wait timed-out.") return None - _map = replyMsg.content.get(MsgKey.method) - if not _map: + _map = replyMsg.content + if not _map or not isinstance(_map, type({})): logging.error("Invalid method call reply message") return None @@ -650,8 +865,8 @@ class Agent(object): logging.debug("Agent method req wait timed-out.") return None - _map = replyMsg.content.get(MsgKey.method) - if not _map: + _map = replyMsg.content + if not _map or not isinstance(_map, type({})): logging.error("Invalid method call reply message") return None @@ -676,20 +891,66 @@ class Agent(object): def _send_query(self, query, correlation_id=None): """ """ - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.get_query)}, - content={MsgKey.query: query.map_encode()}) + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.query_req}, + content=query.map_encode()) self._send_msg( msg, correlation_id ) def _send_method_req(self, mr_map, correlation_id=None): """ """ - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.method_req)}, + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.method_req}, content=mr_map) self._send_msg( msg, correlation_id ) + def _send_subscribe_req(self, query, correlation_id, _interval=None, + _lifetime=None): + """ + """ + sr_map = {"_query":query.map_encode()} + if _interval is not None: + sr_map["_interval"] = _interval + if _lifetime is not None: + sr_map["_duration"] = _lifetime + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_req}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + def _send_resubscribe_req(self, correlation_id, + subscription_id, + _lifetime=None): + """ + """ + sr_map = {"_subscription_id":subscription_id} + if _lifetime is not None: + sr_map["_duration"] = _lifetime + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_refresh_req}, + content=sr_map) + self._send_msg(msg, correlation_id) + + + def _send_unsubscribe_ind(self, correlation_id, subscription_id): + """ + """ + sr_map = {"_subscription_id":subscription_id} + + msg = Message(id=QMF_APP_ID, + properties={"method":"request", + "qmf.opcode":OpCode.subscribe_cancel_ind}, + content=sr_map) + self._send_msg(msg, correlation_id) + ##============================================================================== ## METHOD CALL @@ -716,6 +977,36 @@ class MethodResult(object): return arg + + ##============================================================================== + ## SUBSCRIPTION + ##============================================================================== + +class SubscribeParams(object): + """ Represents a standing subscription for this console. + """ + def __init__(self, sid, interval, duration, _error=None): + self._sid = sid + self._interval = interval + self._duration = duration + self._error = _error + + def succeeded(self): + return self._error is None + + def get_error(self): + return self._error + + def get_subscription_id(self): + return self._sid + + def get_publish_interval(self): + return self._interval + + def get_duration(self): + return self._duration + + ##============================================================================== ## CONSOLE ##============================================================================== @@ -753,7 +1044,7 @@ class Console(Thread): self._domain = _domain self._address = QmfAddress.direct(self._name, self._domain) self._notifier = notifier - self._lock = Lock() + self._lock = RLock() self._conn = None self._session = None # dict of "agent-direct-address":class Agent entries @@ -766,6 +1057,7 @@ class Console(Thread): self._agent_discovery_filter = None self._reply_timeout = reply_timeout self._agent_timeout = agent_timeout + self._subscribe_timeout = 300 # @todo: parameterize self._next_agent_expire = None self._next_mbox_expire = None # for passing WorkItems to the application @@ -776,18 +1068,6 @@ class Console(Thread): self._post_office = {} # indexed by cid self._async_mboxes = {} # indexed by cid, used to expire them - ## Old stuff below??? - #self._broker_list = [] - #self.impl = qmfengine.Console() - #self._event = qmfengine.ConsoleEvent() - ##self._cv = Condition() - ##self._sync_count = 0 - ##self._sync_result = None - ##self._select = {} - ##self._cb_cond = Condition() - - - def destroy(self, timeout=None): """ Must be called before the Console is deleted. @@ -801,8 +1081,6 @@ class Console(Thread): self.remove_connection(self._conn, timeout) logging.debug("Console Destroyed") - - def add_connection(self, conn): """ Add a AMQP connection to the console. The console will setup a session over the @@ -934,10 +1212,11 @@ class Console(Thread): cid = mbox.get_address() query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name) - msg = Message(subject="console.ind.locate." + name, + msg = Message(id=QMF_APP_ID, + subject="console.ind.locate." + name, properties={"method":"request", - "qmf.subject":make_subject(OpCode.agent_locate)}, - content={MsgKey.query: query.map_encode()}) + "qmf.opcode":OpCode.agent_locate_req}, + content=query._predicate) msg.reply_to = str(self._address) msg.correlation_id = str(cid) logging.debug("Sending Agent Locate (%s)" % time.time()) @@ -995,23 +1274,13 @@ class Console(Thread): def do_query(self, agent, query, _reply_handle=None, _timeout=None ): """ """ - query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info, - QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id, - QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id, - QmfQuery.TARGET_SCHEMA: MsgKey.schema, - QmfQuery.TARGET_OBJECT: MsgKey.data_obj, - QmfQuery.TARGET_AGENT: MsgKey.agent_info} - target = query.get_target() - msgkey = query_keymap.get(target) - if not msgkey: - raise Exception("Invalid target for query: %s" % str(query)) if _reply_handle is not None: mbox = _QueryMailbox(self, agent.get_name(), _reply_handle, - target, msgkey, + target, _timeout) else: mbox = _SyncMailbox(self) @@ -1045,9 +1314,8 @@ class Console(Thread): logging.debug("Query wait timed-out.") break - objects = reply.content.get(msgkey) - if not objects: - # last response is empty + objects = reply.content + if not objects or not isinstance(objects, type([])): break # convert from map to native types if needed @@ -1081,21 +1349,116 @@ class Console(Thread): # no conversion needed. response += objects + if not "partial" in reply.properties: + # reply not broken up over multiple msgs + break + now = datetime.datetime.utcnow() mbox.destroy() return response + + def create_subscription(self, agent, query, console_handle, + _interval=None, _duration=None, + _blocking=True, _timeout=None): + if not _duration: + _duration = self._subscribe_timeout + + if _timeout is None: + _timeout = self._reply_timeout + + if not _blocking: + mbox = _AsyncSubscriptionMailbox(self, console_handle, agent, + _duration, _interval) + if not mbox.subscribe(query, _timeout): + mbox.destroy() + return False + return True + else: + mbox = _SubscriptionMailbox(self, console_handle, agent, _duration, + _interval) + + if not mbox.subscribe(query): + mbox.destroy() + return None + + logging.debug("Waiting for response to subscription (%s)" % _timeout) + # @todo: what if mbox expires here? + sp = mbox.fetch(_timeout) + + if not sp: + logging.debug("Subscription request wait timed-out.") + mbox.destroy() + return None + + if not sp.succeeded(): + mbox.destroy() + + return sp + + def refresh_subscription(self, subscription_id, + _duration=None, + _timeout=None): + if _timeout is None: + _timeout = self._reply_timeout + + mbox = self._get_mailbox(subscription_id) + if not mbox: + logging.warning("Subscription %s not found." % subscription_id) + return None + + if isinstance(mbox, _AsyncSubscriptionMailbox): + return mbox.resubscribe(_duration, _timeout) + else: + # synchronous - wait for reply + if not mbox.resubscribe(_duration): + # @todo ???? mbox.destroy() + return None + + # wait for reply + + logging.debug("Waiting for response to subscription (%s)" % _timeout) + sp = mbox.fetch(_timeout) + + if not sp: + logging.debug("re-subscribe request wait timed-out.") + # @todo???? mbox.destroy() + return None + + return sp + + + def cancel_subscription(self, subscription_id): + """ + """ + mbox = self._get_mailbox(subscription_id) + if not mbox: + return + + agent = self.get_agent(mbox.agent_name) + if agent: + try: + logging.debug("Sending UnSubscribe to Agent (%s)" % time.time()) + agent._send_unsubscribe_ind(subscription_id, + mbox.agent_subscription_id) + except SendError, e: + logging.error(str(e)) + + mbox.destroy() + + def _wake_thread(self): """ Make the console management thread loop wakeup from its next_receiver sleep. """ logging.debug("Sending noop to wake up [%s]" % self._address) - msg = Message(properties={"method":"request", - "qmf.subject":make_subject(OpCode.noop)}, + msg = Message(id=QMF_APP_ID, subject=self._name, - content={"noop":"noop"}) + properties={"method":"request", + "qmf.opcode":OpCode.noop}, + content={}) try: self._direct_sender.send( msg, sync=True ) except SendError, e: @@ -1152,9 +1515,17 @@ class Console(Thread): # to expire, or a mailbox requrest to time out now = datetime.datetime.utcnow() next_expire = self._next_agent_expire - if (self._next_mbox_expire and - self._next_mbox_expire < next_expire): - next_expire = self._next_mbox_expire + + # the mailbox expire flag may be cleared by the + # app thread(s) + self._lock.acquire() + try: + if (self._next_mbox_expire and + self._next_mbox_expire < next_expire): + next_expire = self._next_mbox_expire + finally: + self._lock.release() + if next_expire > now: timeout = timedelta_to_secs(next_expire - now) try: @@ -1268,13 +1639,14 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ - logging.debug( "Message received from Agent! [%s]" % msg ) - try: - version,opcode = parse_subject(msg.properties.get("qmf.subject")) - # @todo: deal with version mismatch!!! - except: + #logging.debug( "Message received from Agent! [%s]" % msg ) + #logging.error( "Message received from Agent! [%s]" % msg ) + + opcode = msg.properties.get("qmf.opcode") + if not opcode: logging.error("Ignoring unrecognized message '%s'" % msg) return + version = 2 # @todo: fix me cmap = {}; props = {} if msg.content_type == "amqp/map": @@ -1282,20 +1654,23 @@ class Console(Thread): if msg.properties: props = msg.properties - if opcode == OpCode.agent_ind: + if opcode == OpCode.agent_heartbeat_ind: self._handle_agent_ind_msg( msg, cmap, version, _direct ) - elif opcode == OpCode.data_ind: - self._handle_data_ind_msg(msg, cmap, version, _direct) - elif opcode == OpCode.event_ind: - self._handle_event_ind_msg(msg, cmap, version, _direct) - elif opcode == OpCode.managed_object: - logging.warning("!!! managed_object TBD !!!") - elif opcode == OpCode.object_ind: - logging.warning("!!! object_ind TBD !!!") - elif opcode == OpCode.response: + elif opcode == OpCode.agent_locate_rsp: + self._handle_agent_ind_msg( msg, cmap, version, _direct ) + elif opcode == OpCode.query_rsp: + self._handle_response_msg(msg, cmap, version, _direct) + elif opcode == OpCode.subscribe_rsp: self._handle_response_msg(msg, cmap, version, _direct) - elif opcode == OpCode.schema_ind: - logging.warning("!!! schema_ind TBD !!!") + elif opcode == OpCode.subscribe_refresh_rsp: + self._handle_response_msg(msg, cmap, version, _direct) + elif opcode == OpCode.method_rsp: + self._handle_response_msg(msg, cmap, version, _direct) + elif opcode == OpCode.data_ind: + if msg.correlation_id: + self._handle_response_msg(msg, cmap, version, _direct) + else: + self._handle_indication_msg(msg, cmap, version, _direct) elif opcode == OpCode.noop: logging.debug("No-op msg received.") else: @@ -1309,7 +1684,7 @@ class Console(Thread): """ logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) - ai_map = cmap.get(MsgKey.agent_info) + ai_map = msg.content if not ai_map or not isinstance(ai_map, type({})): logging.warning("Bad agent-ind message received: '%s'" % msg) return @@ -1359,29 +1734,10 @@ class Console(Thread): logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) - def _handle_data_ind_msg(self, msg, cmap, version, direct): - """ - Process a received data-ind message. - """ - logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time())) - - mbox = self._get_mailbox(msg.correlation_id) - if not mbox: - logging.debug("Data indicate received with unknown correlation_id" - " msg='%s'" % str(msg)) - return - - # wake up all waiters - logging.debug("waking waiters for correlation id %s" % - msg.correlation_id) - mbox.deliver(msg) - - def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ - # @todo code replication - clean me. logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) mbox = self._get_mailbox(msg.correlation_id) @@ -1394,19 +1750,22 @@ class Console(Thread): logging.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) - def _handle_event_ind_msg(self, msg, cmap, version, _direct): - ei_map = cmap.get(MsgKey.event) - if not ei_map or not isinstance(ei_map, type({})): - logging.warning("Bad event indication message received: '%s'" % msg) - return + def _handle_indication_msg(self, msg, cmap, version, _direct): - aname = ei_map.get("_name") - emap = ei_map.get("_event") + aname = msg.properties.get("qmf.agent") if not aname: - logging.debug("No '_name' field in event indication message.") + logging.debug("No agent name field in indication message.") return - if not emap: - logging.debug("No '_event' field in event indication message.") + + content_type = msg.properties.get("qmf.content") + if (content_type != ContentType.event or + not isinstance(msg.content, type([]))): + logging.warning("Bad event indication message received: '%s'" % msg) + return + + emap = msg.content[0] + if not isinstance(emap, type({})): + logging.debug("Invalid event body in indication message: '%s'" % msg) return agent = None @@ -1439,13 +1798,13 @@ class Console(Thread): """ Check all async mailboxes for outstanding requests that have expired. """ - now = datetime.datetime.utcnow() - if self._next_mbox_expire and now < self._next_mbox_expire: - return - expired_mboxes = [] - self._next_mbox_expire = None self._lock.acquire() try: + now = datetime.datetime.utcnow() + if self._next_mbox_expire and now < self._next_mbox_expire: + return + expired_mboxes = [] + self._next_mbox_expire = None for mbox in self._async_mboxes.itervalues(): if now >= mbox.expiration_date: expired_mboxes.append(mbox) diff --git a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py index 186f09349e..eff9357e1f 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py @@ -27,3 +27,4 @@ import events import multi_response import async_query import async_method +import subscriptions diff --git a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py index 59b65221e0..0e5e595695 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py @@ -52,8 +52,8 @@ class _agentApp(Thread): self.broker_url = broker_url self.notifier = _testNotifier() self.agent = qmf2.agent.Agent(name, - _notifier=self.notifier, - _heartbeat_interval=heartbeat) + _notifier=self.notifier, + heartbeat_interval=heartbeat) # No database needed for this test self.running = False self.ready = Event() diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py index 556b62756f..965a254f26 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py index 3a9a767bf0..7c7b22fdaf 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py index be2bdff9ab..22accb7cfc 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py index dd321cb4bb..be67c36d87 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py @@ -53,7 +53,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/events.py b/qpid/extras/qmf/src/py/qmf2/tests/events.py index e55dc8572e..bc6465f25b 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/events.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/events.py @@ -58,7 +58,7 @@ class _agentApp(Thread): self.notifier = _testNotifier() self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Dynamically construct a management database diff --git a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py index d3d00a70c5..7c24435e79 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py @@ -60,8 +60,8 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat, - _max_msg_size=_MAX_OBJS_PER_MSG) + heartbeat_interval=heartbeat, + max_msg_size=_MAX_OBJS_PER_MSG) # Dynamically construct a management database for i in range(self.schema_count): diff --git a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py index 5b1446bb3a..466457d670 100644 --- a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py +++ b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py @@ -54,7 +54,7 @@ class _agentApp(Thread): self.broker_url = broker_url self.agent = Agent(name, _notifier=self.notifier, - _heartbeat_interval=heartbeat) + heartbeat_interval=heartbeat) # Management Database # - two different schema packages, diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py new file mode 100644 index 0000000000..750952df46 --- /dev/null +++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py @@ -0,0 +1,808 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import logging +import datetime +import time +from threading import Thread, Event + +import qpid.messaging +from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId, + SchemaProperty, qmfTypes, SchemaMethod, QmfQuery, + QmfData, WorkItem) +import qmf2.console +from qmf2.agent import(QmfAgentData, Agent) + + +class _testNotifier(Notifier): + def __init__(self): + self._event = Event() + + def indication(self): + # note: called by qmf daemon thread + self._event.set() + + def wait_for_work(self, timeout): + # note: called by application thread to wait + # for qmf to generate work + self._event.wait(timeout) + timed_out = self._event.isSet() == False + if not timed_out: + self._event.clear() + return True + return False + + +class _agentApp(Thread): + def __init__(self, name, broker_url, heartbeat): + Thread.__init__(self) + self.notifier = _testNotifier() + self.broker_url = broker_url + self.agent = Agent(name, + _notifier=self.notifier, + heartbeat_interval=heartbeat, + max_duration=10, + default_duration=7, + min_duration=5, + min_interval=1, + default_interval=2) + + # Management Database + # - two different schema packages, + # - two classes within one schema package + # - multiple objects per schema package+class + # - two "undescribed" objects + + # "package1/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"), + _desc="A test data schema - one", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32)) + _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, + _values={"key":"p1c1_key1"}, + _schema=_schema) + _obj.set_value("count1", 0) + _obj.set_value("count2", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, + _values={"key":"p1c1_key2"}, + _schema=_schema ) + _obj.set_value("count1", 9) + _obj.set_value("count2", 10) + self.agent.add_object( _obj ) + + # "package1/class2" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"), + _desc="A test data schema - two", + _object_id_names=["name"] ) + # add properties + _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, + _values={"name":"p1c2_name1"}, + _schema=_schema ) + _obj.set_value("string1", "a data string") + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, + _values={"name":"p1c2_name2"}, + _schema=_schema ) + _obj.set_value("string1", "another data string") + self.agent.add_object( _obj ) + + + # "package2/class1" + + _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"), + _desc="A test data schema - second package", + _object_id_names=["key"] ) + + _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR)) + _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32)) + + self.agent.register_object_class(_schema) + + _obj = QmfAgentData( self.agent, + _values={"key":"p2c1_key1"}, + _schema=_schema ) + _obj.set_value("counter", 0) + self.agent.add_object( _obj ) + + _obj = QmfAgentData( self.agent, + _values={"key":"p2c1_key2"}, + _schema=_schema ) + _obj.set_value("counter", 2112) + self.agent.add_object( _obj ) + + + # add two "unstructured" objects to the Agent + + _obj = QmfAgentData(self.agent, _object_id="undesc-1") + _obj.set_value("field1", "a value") + _obj.set_value("field2", 2) + _obj.set_value("field3", {"a":1, "map":2, "value":3}) + _obj.set_value("field4", ["a", "list", "value"]) + self.agent.add_object(_obj) + + + _obj = QmfAgentData(self.agent, _object_id="undesc-2") + _obj.set_value("key-1", "a value") + _obj.set_value("key-2", 2) + self.agent.add_object(_obj) + + self.running = False + self.ready = Event() + + def start_app(self): + self.running = True + self.start() + self.ready.wait(10) + if not self.ready.is_set(): + raise Exception("Agent failed to connect to broker.") + + def stop_app(self): + self.running = False + self.notifier.indication() # hmmm... collide with daemon??? + self.join(10) + if self.isAlive(): + raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!") + + def run(self): + # broker_url = "user/passwd@hostname:port" + self.conn = qpid.messaging.Connection(self.broker_url.host, + self.broker_url.port, + self.broker_url.user, + self.broker_url.password) + self.conn.connect() + self.agent.set_connection(self.conn) + self.ready.set() + + while self.running: + self.notifier.wait_for_work(None) + wi = self.agent.get_next_workitem(timeout=0) + while wi is not None: + logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type()) + self.agent.release_workitem(wi) + wi = self.agent.get_next_workitem(timeout=0) + + if self.conn: + self.agent.remove_connection(10) + self.agent.destroy(10) + + +class BaseTest(unittest.TestCase): + agent_count = 5 + + def configure(self, config): + self.config = config + self.broker = config.broker + self.defines = self.config.defines + + def setUp(self): + self.agents = [] + for i in range(self.agent_count): + agent = _agentApp("agent-" + str(i), self.broker, 1) + agent.start_app() + self.agents.append(agent) + #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow()) + + def tearDown(self): + #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow()) + for agent in self.agents: + if agent is not None: + agent.stop_app() + + + def test_sync_by_schema(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match all objects in schema package1/class1 + sid = SchemaClassId.create("package1", "class1") + t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, + _target_params=t_params) + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now subscribe to agent + + sp = self.console.create_subscription(agent, + query, + index) + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + self.assertTrue(sp.get_duration() == 10) + self.assertTrue(sp.get_publish_interval() == 2) + + subscriptions.append([sp, 0]) + index += 1 + + # now wait for the (2 * interval) and count the updates + r_count = 0 + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 2) + for obj in reply: + self.assertTrue(isinstance(obj, QmfData)) + self.assertTrue(obj.get_object_id() == "p1c1_key2" or + obj.get_object_id() == "p1c1_key1") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package1") + self.assertTrue(sid.get_class_name() == "class1") + + self.assertTrue(wi.get_handle() < len(subscriptions)) + subscriptions[wi.get_handle()][1] += 1 + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 5 * len(subscriptions)) + for ii in range(len(subscriptions)): + self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + def test_sync_by_obj_id(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match all objects in schema package1/class1 + # sid = SchemaClassId.create("package1", "class1") + # t_params = {QmfData.KEY_SCHEMA_ID: sid} + query = QmfQuery.create_id_object("undesc-2") + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now subscribe to agent + + sp = self.console.create_subscription(agent, + query, + index) + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + + subscriptions.append([sp, 0]) + index += 1 + + # now wait for all subscriptions to expire (2x interval w/o + # indications) + r_count = 0 + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "undesc-2") + # print("!!! get_params() = %s" % wi.get_params()) + self.assertTrue(wi.get_handle() < len(subscriptions)) + subscriptions[wi.get_handle()][1] += 1 + # self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) + # self.assertTrue(reply.succeeded()) + # self.assertTrue(reply.get_argument("cookie") == + # wi.get_handle()) + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 5 * len(subscriptions)) + #for ii in range(len(subscriptions)): + # self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + def test_sync_by_obj_id_schema(self): + # create console + # find all agents + # subscribe to changes to any object in package1/class1 + # should succeed + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + subscriptions = [] + index = 0 + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + for agent_app in self.agents: + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # now subscribe to agent + + sp = self.console.create_subscription(agent, + query, + index) + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + + subscriptions.append([sp, 0]) + index += 1 + + # now wait for all subscriptions to expire (2x interval w/o + # indications) + r_count = 0 + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() < len(subscriptions)) + subscriptions[wi.get_handle()][1] += 1 + # self.assertTrue(isinstance(reply, qmf2.console.MethodResult)) + # self.assertTrue(reply.succeeded()) + # self.assertTrue(reply.get_argument("cookie") == + # wi.get_handle()) + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 5 * len(subscriptions)) + #for ii in range(len(subscriptions)): + # self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + + def test_sync_refresh(self): + # create console + # find one agent + # subscribe to changes to any object in package1/class1 + # after 3 data indications, refresh + # verify > 5 more data indications received + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + sp = self.console.create_subscription(agent, + query, + "my-handle") + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + + # refresh after three subscribe indications, count all + # indications to verify refresh worked + r_count = 0 + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + self.console.release_workitem(wi) + + if r_count == 3: + rp = self.console.refresh_subscription(sp.get_subscription_id()) + self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams)) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count > 5) + # print("!!! total r_count=%d" % r_count) + #for ii in range(len(subscriptions)): + # self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + + def test_sync_cancel(self): + # create console + # find one agent + # subscribe to changes to any object in package1/class1 + # after 2 data indications, cancel subscription + # verify < 5 data indications received + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + sp = self.console.create_subscription(agent, + query, + "my-handle") + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + + # refresh after three subscribe indications, count all + # indications to verify refresh worked + r_count = 0 + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + self.console.release_workitem(wi) + + if r_count == 3: + self.console.cancel_subscription(sp.get_subscription_id()) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription full duration + self.assertTrue(r_count < 5) + #for ii in range(len(subscriptions)): + # self.assertTrue(subscriptions[ii][1] == 5) + + self.console.destroy(10) + + + def test_async_by_obj_id_schema(self): + # create console + # find one agent + # async subscribe to changes to any object in package1/class1 + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + rc = self.console.create_subscription(agent, + query, + "my-handle", + _blocking=False) + self.assertTrue(rc) + + r_count = 0 + sp = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + sp = wi.get_params() + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + else: + self.assertTrue(wi.get_type() == + WorkItem.SUBSCRIBE_INDICATION) + # sp better be set up by now! + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription + self.assertTrue(r_count == 6) + + self.console.destroy(10) + + def test_async_refresh(self): + # create console + # find one agent + # async subscribe to changes to any object in package1/class1 + # refresh after third data indication + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + rc = self.console.create_subscription(agent, + query, + "my-handle", + _blocking=False) + self.assertTrue(rc) + + # refresh after three subscribe indications, count all + # indications to verify refresh worked + r_count = 0 + sp = None + rp = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + sp = wi.get_params() + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + rp = wi.get_params() + self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams)) + self.assertTrue(rp.succeeded()) + self.assertTrue(rp.get_error() == None) + else: + self.assertTrue(wi.get_type() == + WorkItem.SUBSCRIBE_INDICATION) + # sp better be set up by now! + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + if r_count == 4: # + 1 for subscribe reply + rp = self.console.refresh_subscription(sp.get_subscription_id()) + self.assertTrue(rp) + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 5 publish per subscription, + 2 replys + self.assertTrue(r_count > 7) + + self.console.destroy(10) + + + def test_async_cancel(self): + # create console + # find one agent + # async subscribe to changes to any object in package1/class1 + # cancel after first data indication + self.notifier = _testNotifier() + self.console = qmf2.console.Console(notifier=self.notifier, + agent_timeout=3) + self.conn = qpid.messaging.Connection(self.broker.host, + self.broker.port, + self.broker.user, + self.broker.password) + self.conn.connect() + self.console.add_connection(self.conn) + + # query to match object "p2c1_key2" in schema package2/class1 + sid = SchemaClassId.create("package2", "class1") + query = QmfQuery.create_id_object("p2c1_key2", sid) + + agent_app = self.agents[0] + aname = agent_app.agent.get_name() + agent = self.console.find_agent(aname, timeout=3) + self.assertTrue(agent and agent.get_name() == aname) + + # setup subscription on agent + + rc = self.console.create_subscription(agent, + query, + "my-handle", + _blocking=False) + self.assertTrue(rc) + + # refresh after three subscribe indications, count all + # indications to verify refresh worked + r_count = 0 + sp = None + rp = None + while self.notifier.wait_for_work(4): + wi = self.console.get_next_workitem(timeout=0) + while wi is not None: + r_count += 1 + if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE: + self.assertTrue(wi.get_handle() == "my-handle") + sp = wi.get_params() + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + self.assertTrue(sp.succeeded()) + self.assertTrue(sp.get_error() == None) + else: + self.assertTrue(wi.get_type() == + WorkItem.SUBSCRIBE_INDICATION) + # sp better be set up by now! + self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams)) + reply = wi.get_params() + self.assertTrue(isinstance(reply, type([]))) + self.assertTrue(len(reply) == 1) + self.assertTrue(isinstance(reply[0], QmfData)) + self.assertTrue(reply[0].get_object_id() == "p2c1_key2") + sid = reply[0].get_schema_class_id() + self.assertTrue(isinstance(sid, SchemaClassId)) + self.assertTrue(sid.get_package_name() == "package2") + self.assertTrue(sid.get_class_name() == "class1") + self.assertTrue(wi.get_handle() == "my-handle") + + self.console.cancel_subscription(sp.get_subscription_id()) + + self.console.release_workitem(wi) + + wi = self.console.get_next_workitem(timeout=0) + + # for now, I expect 1 subscribe reply and 1 data_indication + self.assertTrue(r_count == 2) + + self.console.destroy(10) diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties index a60e3964ad..c76acbd8b9 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties @@ -32,6 +32,7 @@ queue.MyQueue = example.MyQueue # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] topic.ibmStocks = stocks.nyse.ibm +topic.MyTopic = example.MyTopic # Register an AMQP destination in JNDI # NOTE: Qpid currently only supports direct,topics and headers diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 820f7ce036..96401f2178 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -273,6 +273,7 @@ + diff --git a/qpid/java/perftests/etc/scripts/drainBroker.sh b/qpid/java/perftests/etc/scripts/drainBroker.sh new file mode 100755 index 0000000000..eea7209f03 --- /dev/null +++ b/qpid/java/perftests/etc/scripts/drainBroker.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0)))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Parse arguements taking all - prefixed args as JAVA_OPTS +for arg in "$@"; do + if [[ $arg == -java:* ]]; then + JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " + else + ARGS="${ARGS}$arg " + fi +done + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS + +. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 consumeOnly=true consumeOnly=true${ARGS} diff --git a/qpid/java/perftests/etc/scripts/fillBroker.sh b/qpid/java/perftests/etc/scripts/fillBroker.sh new file mode 100755 index 0000000000..5b7de6f999 --- /dev/null +++ b/qpid/java/perftests/etc/scripts/fillBroker.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0)))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Parse arguements taking all - prefixed args as JAVA_OPTS +for arg in "$@"; do + if [[ $arg == -java:* ]]; then + JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " + else + ARGS="${ARGS}$arg " + fi +done + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS + +. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=0 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 ${ARGS} diff --git a/qpid/java/perftests/etc/scripts/testWithPreFill.sh b/qpid/java/perftests/etc/scripts/testWithPreFill.sh new file mode 100755 index 0000000000..721ecf6ecc --- /dev/null +++ b/qpid/java/perftests/etc/scripts/testWithPreFill.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0)))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Parse arguements taking all - prefixed args as JAVA_OPTS +for arg in "$@"; do + if [[ $arg == -java:* ]]; then + JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " + else + ARGS="${ARGS}$arg " + fi +done + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS + +. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 preFill=1000 delayBeforeConsume=1000 ${ARGS} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 89fc805a34..d8fea85477 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -133,8 +133,11 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA { // _logger.debug("public void testAsyncPingOk(int numPings): called"); + // get prefill count to update the expected count + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + // Ensure that at least one ping was requeusted. - if (numPings == 0) + if (numPings + preFill == 0) { _logger.error("Number of pings requested was zero."); fail("Number of pings requested was zero."); @@ -149,16 +152,24 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // String messageCorrelationId = perThreadSetup._correlationId; // _logger.debug("messageCorrelationId = " + messageCorrelationId); + // Initialize the count and timing controller for the new correlation id. PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; - perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill); perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + // Start the client that will have been paused due to preFill requirement. + // or if we have not yet started client because messages are sitting on broker. + if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + { + pingClient.start(); + } + // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); + int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId); // Check that all the replies were received and log a fail if they were not. if (numReplies < perCorrelationId._expectedCount) diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 0afec83b19..dcfc67d4fc 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -104,4 +104,9 @@ public class PingClient extends PingPongProducer return _pingClientCount * _noOfConsumers; } } + + public int getClientCount() + { + return _pingClientCount; + } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index 94b8ea662e..4c5df0a471 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -141,9 +141,65 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware perThreadSetup._pingClient = new PingClient(testParameters); perThreadSetup._pingClient.establishConnection(true, true); } - // Start the client connection - perThreadSetup._pingClient.start(); + // Prefill the broker unless we are in consume only mode. + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) + { + // Manually set the correlation ID to 1. This is not ideal but it is the + // value that the main test loop will use. + perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, String.valueOf(perThreadSetup._pingClient.getClientCount())); + + // Note with a large preFill and non-tx session the messages will be + // rapidly pushed in to the mina buffers. OOM's are a real risk here. + // Should perhaps consider using a TX session for the prefill. + + long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME); + + // Only delay if we have consumers and a delayBeforeConsume + if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) + && delayBeforeConsume > 0) + { + + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + // Only do logging if in verbose mode. + if (verbose) + { + if (delayBeforeConsume > 60000) + { + long minutes = delayBeforeConsume / 60000; + long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000; + long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000); + _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); + } + else + { + _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); + } + } + + Thread.sleep(delayBeforeConsume); + + if (verbose) + { + _logger.info("Starting Test."); + } + } + + // We can't start the client's here as the test client has not yet been configured to receieve messages. + // only when the test method is executed will the correlationID map be set up and ready to consume + // the messages we have sent here. + } + else //Only start the consumer if we are not preFilling. + { + // Only start the consumer if we don't have messages waiting to be received. + // we need to set up the correlationID mapping first + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + { + // Start the client connection + perThreadSetup._pingClient.start(); + } + } // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } @@ -157,7 +213,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware * Performs test fixture clean */ public void threadTearDown() - { + { _logger.debug("public void threadTearDown(): called"); try diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index f994cd138e..e3769e415e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -324,6 +324,25 @@ public class PingPongProducer implements Runnable, ExceptionListener /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */ + public static final String PREFILL_PROPNAME = "preFill"; + + /** Defines the default value for the number of messages to prefill. 0,default, no messages. */ + public static final int PREFILL_DEFAULT = 0; + + /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */ + public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume"; + + /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */ + public static final long DELAY_BEFORE_CONSUME = 0; + + /** Holds the name of the property to get when no messasges should be sent. */ + public static final String CONSUME_ONLY_PROPNAME = "consumeOnly"; + + /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ + public static final boolean CONSUME_ONLY_DEFAULT = false; + + /** Holds the default configuration properties. */ public static ParsedProperties defaults = new ParsedProperties(); @@ -360,6 +379,9 @@ public class PingPongProducer implements Runnable, ExceptionListener defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT); + defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME); + defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); } /** Allows setting of client ID on the connection, rather than through the connection URL. */ @@ -455,6 +477,24 @@ public class PingPongProducer implements Runnable, ExceptionListener */ protected int _maxPendingSize; + /** + * Holds the number of messages to send during the setup phase, before the clients start consuming. + */ + private Integer _preFill; + + /** + * Holds the time in ms to wait after preFilling before starting thet test. + */ + private Long _delayBeforeConsume; + + /** + * Holds a boolean value of wither this test should just consume, i.e. skips + * sending messages, but still expects to receive the specified number. + * Use in conjuction with numConsumers=0 to fill the broker. + */ + private boolean _consumeOnly; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -588,6 +628,9 @@ public class PingPongProducer implements Runnable, ExceptionListener _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); + _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME); + _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME); + _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME); // Check that one or more destinations were specified. if (_noOfDestinations < 1) @@ -638,7 +681,10 @@ public class PingPongProducer implements Runnable, ExceptionListener } // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession[0].createTemporaryQueue(); + if (_noOfConsumers > 0) + { + _replyDestination = _consumerSession[0].createTemporaryQueue(); + } createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); // Create the message producer only if instructed to. @@ -871,6 +917,14 @@ public class PingPongProducer implements Runnable, ExceptionListener { _consumer = new MessageConsumer[_noOfConsumers]; + // If we don't have consumers then ensure we have created the + // destination. + if (_noOfConsumers == 0) + { + _producerSession.createConsumer(destination, selector, + NO_LOCAL_DEFAULT).close(); + } + for (int i = 0; i < _noOfConsumers; i++) { // Create a consumer for the destination and set this pinger to listen to its messages. @@ -980,6 +1034,11 @@ public class PingPongProducer implements Runnable, ExceptionListener // When running in client ack mode, an ack is done instead of a commit, on the commit batch // size boundaries. long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // _noOfConsumers can be set to 0 on the command line but we will not get here to + // divide by 0 as this is executed by the onMessage code when a message is recevied. + // no consumers means no message reception. + + // log.debug("commitCount = " + commitCount); if ((commitCount % _txBatchSize) == 0) @@ -1014,6 +1073,7 @@ public class PingPongProducer implements Runnable, ExceptionListener else { log.warn("Got unexpected message with correlationId: " + correlationID); + log.warn("Map contains:" + perCorrelationIds.entrySet()); } } else @@ -1037,13 +1097,18 @@ public class PingPongProducer implements Runnable, ExceptionListener * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify * the correlation id. * + * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination + * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings. + * + * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur. + * * @param message The message to send. If this is null, one is generated. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. * @param messageCorrelationId The message correlation id. If this is null, one is generated. * * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait - * for all prematurely. + * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent. * * @throws JMSException All underlying JMSExceptions are allowed to fall through. * @throws InterruptedException When interrupted by a timeout @@ -1051,6 +1116,16 @@ public class PingPongProducer implements Runnable, ExceptionListener public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) throws JMSException, InterruptedException { + return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId); + } + + public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + + // If we are runnning a consumeOnly test then don't send any messages + + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ @@ -1071,29 +1146,41 @@ public class PingPongProducer implements Runnable, ExceptionListener // countdown needs to be done before the chained listener can be called. PerCorrelationId perCorrelationId = new PerCorrelationId(); - perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); + int totalPingsRequested = numPings + preFill; + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Set up the current time as the start time for pinging on the correlation id. This is used to determine // timeouts. perCorrelationId.timeOutStart = System.nanoTime(); - // Send the specifed number of messages. + // Send the specifed number of messages for this test pingNoWaitForReply(message, numPings, messageCorrelationId); boolean timedOut; boolean allMessagesReceived; int numReplies; + // We don't have a consumer so don't try and wait for the messages. + // this does mean that if the producerSession is !TXed then we may + // get to exit before all msgs have been received. + // + // Return the number of requested messages, this will let the test + // report a pass. + if (_noOfConsumers == 0) + { + return totalPingsRequested; + } + do { // Block the current thread until replies to all the messages are received, or it times out. perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); // Work out how many replies were receieved. - numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount(); - allMessagesReceived = numReplies == getExpectedNumPings(numPings); + allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested); // log.debug("numReplies = " + numReplies); // log.debug("allMessagesReceived = " + allMessagesReceived); @@ -1108,7 +1195,7 @@ public class PingPongProducer implements Runnable, ExceptionListener } while (!timedOut && !allMessagesReceived); - if ((numReplies < getExpectedNumPings(numPings)) && _verbose) + if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose) { log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } @@ -1146,6 +1233,12 @@ public class PingPongProducer implements Runnable, ExceptionListener /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + // If we are runnning a consumeOnly test then don't send any messages + if (_consumeOnly) + { + return; + } + if (message == null) { message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); @@ -1667,6 +1760,10 @@ public class PingPongProducer implements Runnable, ExceptionListener /** * Calculates how many pings are expected to be received for the given number sent. * + * Note : that if you have set noConsumers to 0 then this will also return 0 + * in the case of PubSub testing. This is correct as without consumers there + * will be no-one to receive the sent messages so they will be unable to respond. + * * @param numpings The number of pings that will be sent. * * @return The number that should be received, for the test to pass. diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index c73959676d..970b08f629 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -28,12 +28,16 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TopicSubscriber; import junit.framework.Assert; import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQQueue; import org.apache.qpid.test.utils.QpidTestCase; import java.util.concurrent.locks.ReentrantLock; @@ -154,6 +158,7 @@ public class TimeToLiveTest extends QpidTestCase { Message send = producerSession.createTextMessage("Message " + msg); send.setBooleanProperty("first", first); + send.setStringProperty("testprop", "TimeToLiveTest"); send.setLongProperty("TTL", producer.getTimeToLive()); return send; } @@ -206,5 +211,160 @@ public class TimeToLiveTest extends QpidTestCase producerSession.close(); producerConnection.close(); } + + public void testPassiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + + producerConnection.start(); + + // Move to a Transacted session to ensure that all messages have been delivered to broker before + // we start waiting for TTL + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer producer = producerSession.createProducer(topic); + + //Set TTL + int msg = 0; + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); + + producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + } + + //Reset TTL + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); + + producerSession.commit(); + + //resubscribe + durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName()); + + // Ensure we sleep the required amount of time. + ReentrantLock waitLock = new ReentrantLock(); + Condition wait = waitLock.newCondition(); + final long MILLIS = 1000000L; + + long waitTime = TIME_TO_LIVE * MILLIS; + while (waitTime > 0) + { + try + { + waitLock.lock(); + + waitTime = wait.awaitNanos(waitTime); + } + catch (InterruptedException e) + { + //Stop if we are interrupted + fail(e.getMessage()); + } + finally + { + waitLock.unlock(); + } + + } + + clientConnection.start(); + + //Receive Message 0 + // Set 5s receive time for messages we expect to receive. + Message receivedFirst = durableSubscriber.receive(5000); + Message receivedSecond = durableSubscriber.receive(5000); + Message receivedThird = durableSubscriber.receive(1000); + + // Log the messages to help diagnosis incase of failure + _logger.info("First:"+receivedFirst); + _logger.info("Second:"+receivedSecond); + _logger.info("Third:"+receivedThird); + + // Only first and last messages sent should survive expiry + Assert.assertNull("More messages received", receivedThird); + + Assert.assertNotNull("First message not received", receivedFirst); + Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL")); + + Assert.assertNotNull("Final message not received", receivedSecond); + Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL")); + + clientSession.unsubscribe(getTestQueueName()); + clientConnection.close(); + + producerConnection.close(); + } + + public void testActiveTTLwithDurableSubscription() throws Exception + { + //Create Client 1 + Connection clientConnection = getConnection(); + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create and close the durable subscriber + AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName()); + TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false); + durableSubscriber.close(); + + //Create Producer + Connection producerConnection = getConnection(); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(topic); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms. + long messageCount = MSG_COUNT; + long lastPass; + AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription"); + do + { + lastPass = messageCount; + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue); + + // If we have received messages in the last loop then extend the timeout time. + // if we get messages stuck that are not expiring then the failureTime will occur + // failing the test. This will help with the scenario when the broker does not + // have enough CPU cycles to process the TTLs. + if (lastPass != messageCount) + { + failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT; + } + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + + clientSession.unsubscribe("MyDurableTTLSubscription"); + clientSession.close(); + clientConnection.close(); + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java index 19b73fcc7c..cf815fbc05 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java @@ -38,7 +38,7 @@ public class DurableSubscriberTest extends QpidTestCase */ public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception { - if (!isBroker08()) + if (isBrokerStorePersistent() || !isBroker08()) { TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); @@ -102,7 +102,7 @@ public class DurableSubscriberTest extends QpidTestCase */ public void testDurSubRestoresMessageSelector() throws Exception { - if (!isBroker08()) + if (isBrokerStorePersistent() || !isBroker08()) { TopicConnectionFactory factory = getConnectionFactory(); Topic topic = (Topic) getInitialContext().lookup(_topicName); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index a907e6a29d..91bb5d2529 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -115,10 +115,26 @@ public class DurableSubscriptionTest extends QpidTestCase _logger.info("Close connection"); con.close(); } + + public void testDurabilityNOACK() throws Exception + { + durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false); + } public void testDurabilityAUTOACK() throws Exception { - durabilityImpl(Session.AUTO_ACKNOWLEDGE); + durabilityImpl(Session.AUTO_ACKNOWLEDGE, false); + } + + public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception + { + if(!isBrokerStorePersistent()) + { + System.out.println("The broker store is not persistent, skipping this test."); + return; + } + + durabilityImpl(Session.AUTO_ACKNOWLEDGE, true); } public void testDurabilityNOACKSessionPerConnection() throws Exception @@ -131,56 +147,85 @@ public class DurableSubscriptionTest extends QpidTestCase durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE); } - private void durabilityImpl(int ackMode) throws Exception - { + private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception + { AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con, "MyTopic"); Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); - Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session sessionProd = con.createSession(false, ackMode); MessageProducer producer = sessionProd.createProducer(topic); - Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session session2 = con.createSession(false, ackMode); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); con.start(); + //send message A and check both consumers receive producer.send(session1.createTextMessage("A")); Message msg; + _logger.info("Receive message on consumer 1 :expecting A"); msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); + _logger.info("Receive message on consumer 2 :expecting A"); msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("A", ((TextMessage) msg).getText()); msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - consumer2.close(); - session2.close(); - + //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK) producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(500); assertNotNull("Consumer 1 should get message 'B'.", msg); - assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText()); + assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(500); assertNull("There should be no more messages for consumption on consumer1.", msg); + consumer2.close(); + session2.close(); + + //Send message C, then connect consumer 3 to durable subscription and get + //message B if not using NO_ACK, then receive C with consumer 1 and 3 + producer.send(session1.createTextMessage("C")); + Session session3 = con.createSession(false, ackMode); MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); - _logger.info("Receive message on consumer 3 :expecting B"); + if(ackMode == AMQSession.NO_ACKNOWLEDGE) + { + //Do nothing if NO_ACK was used, as prefetch means the message was dropped + //when we didn't call receive() to get it before closing consumer 2 + } + else + { + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(500); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText()); + } + + _logger.info("Receive message on consumer 1 :expecting C"); + msg = consumer1.receive(500); + assertNotNull("Consumer 1 should get message 'C'.", msg); + assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(500); + assertNull("There should be no more messages for consumption on consumer1.", msg); + + _logger.info("Receive message on consumer 3 :expecting C"); msg = consumer3.receive(500); - assertNotNull("Consumer 3 should get message 'B'.", msg); - assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); + assertNotNull("Consumer 3 should get message 'C'.", msg); + assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); msg = consumer3.receive(500); assertNull("There should be no more messages for consumption on consumer3.", msg); @@ -191,6 +236,18 @@ public class DurableSubscriptionTest extends QpidTestCase session3.unsubscribe("MySubscription"); con.close(); + + if(restartBroker) + { + try + { + restartBroker(); + } + catch (Exception e) + { + fail("Error restarting the broker"); + } + } } private void durabilityImplSessionPerConnection(int ackMode) throws Exception @@ -211,7 +268,7 @@ public class DurableSubscriptionTest extends QpidTestCase con1.start(); Session session1 = con1.createSession(false, ackMode); - MessageConsumer consumer1 = session0.createConsumer(topic); + MessageConsumer consumer1 = session1.createConsumer(topic); // Create consumer 2. AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); @@ -232,37 +289,60 @@ public class DurableSubscriptionTest extends QpidTestCase msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT); assertNotNull("Message should have been received",msg); assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); - msg = consumer2.receive(500); + msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("There should be no more messages for consumption on consumer2.", msg); + // Send message and receive on consumer 1. + producer.send(session0.createTextMessage("B")); + + _logger.info("Receive message on consumer 1 :expecting B"); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting null"); + msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); + assertEquals(null, msg); + // Detach the durable subscriber. consumer2.close(); session2.close(); con2.close(); + + // Send message C and receive on consumer 1 + producer.send(session0.createTextMessage("C")); - // Send message and receive on open consumer. - producer.send(session0.createTextMessage("B")); - - _logger.info("Receive message on consumer 1 :expecting B"); - msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); - assertEquals("B", ((TextMessage) msg).getText()); + _logger.info("Receive message on consumer 1 :expecting C"); + msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT); + assertEquals("C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT); assertEquals(null, msg); - // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed. + // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK) + // and also gets message C sent after it was disconnected. AMQConnection con3 = (AMQConnection) getConnection("guest", "guest"); con3.start(); Session session3 = con3.createSession(false, ackMode); TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); - _logger.info("Receive message on consumer 3 :expecting B"); - msg = consumer3.receive(500); - assertNotNull("Consumer 3 should get message 'B'.", msg); - assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); + if(ackMode == AMQSession.NO_ACKNOWLEDGE) + { + //Do nothing if NO_ACK was used, as prefetch means the message was dropped + //when we didn't call receive() to get it before closing consumer 2 + } + else + { + _logger.info("Receive message on consumer 3 :expecting B"); + msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); + assertEquals("B", ((TextMessage) msg).getText()); + } + + _logger.info("Receive message on consumer 3 :expecting C"); + msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT); + assertNotNull("Consumer 3 should get message 'C'.", msg); + assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); - msg = consumer3.receive(500); + msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT); assertNull("There should be no more messages for consumption on consumer3.", msg); consumer1.close(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java index 84ff7055c5..d693c7f6c1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -166,6 +166,7 @@ public class QpidTestCase extends TestCase private static final String TEST_OUTPUT = "test.output"; private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave"; private static final String BROKER_LOG_PREFIX = "broker.log.prefix"; + private static final String BROKER_PERSITENT = "broker.persistent"; // values protected static final String JAVA = "java"; @@ -187,6 +188,7 @@ public class QpidTestCase extends TestCase private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS); private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); private String _output = System.getProperty(TEST_OUTPUT); + protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT); private static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: "); protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE); @@ -929,6 +931,11 @@ public class QpidTestCase extends TestCase { return !_broker.equals("vm"); } + + protected boolean isBrokerStorePersistent() + { + return _brokerPersistent; + } public void restartBroker() throws Exception { diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index 6f3898384d..184a6f9e78 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -1,4 +1,3 @@ -org.apache.qpid.test.unit.ct.DurableSubscriberTests#* // Those tests are not finished org.apache.qpid.test.testcases.TTLTest#* org.apache.qpid.test.testcases.FailoverTest#* @@ -18,3 +17,5 @@ org.apache.qpid.client.SessionCreateTest#* org.apache.qpid.management.jmx.ManagementActorLoggingTest#* org.apache.qpid.server.queue.ModelTest#* +//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart +org.apache.qpid.test.unit.ct.DurableSubscriberTest#* diff --git a/qpid/java/test-profiles/JavaStandaloneExcludes b/qpid/java/test-profiles/JavaStandaloneExcludes index ed12973498..a9e2a058f8 100644 --- a/qpid/java/test-profiles/JavaStandaloneExcludes +++ b/qpid/java/test-profiles/JavaStandaloneExcludes @@ -1,4 +1,3 @@ -org.apache.qpid.test.unit.ct.DurableSubscriberTests#* // Those tests are not finished org.apache.qpid.test.testcases.TTLTest#* org.apache.qpid.test.testcases.FailoverTest#* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index f81e9c213c..b4e5282228 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -1 +1,3 @@ +//These tests require a persistent store org.apache.qpid.server.store.PersistentStoreTest#* +org.apache.qpid.test.unit.ct.DurableSubscriberTest#* diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile index 1238a44c84..618dd5b45d 100644 --- a/qpid/java/test-profiles/java-derby.testprofile +++ b/qpid/java/test-profiles/java-derby.testprofile @@ -7,3 +7,4 @@ broker.config=${project.root}/build/etc/config-systests-derby.xml qpid.amqp.version=0-9 profile.excludes=JavaStandaloneExcludes broker.clean.between.tests=true +broker.persistent=true diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test index f7955a4a9d..a47f633565 100755 --- a/qpid/python/qpid-python-test +++ b/qpid/python/qpid-python-test @@ -20,7 +20,7 @@ # TODO: summarize, test harness preconditions (e.g. broker is alive) -import logging, optparse, os, struct, sys, traceback, types +import logging, optparse, os, struct, sys, time, traceback, types from fnmatch import fnmatchcase as match from getopt import GetoptError from logging import getLogger, StreamHandler, Formatter, Filter, \ @@ -60,6 +60,8 @@ parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append", help="ignore tests matching patterns in IFILE") parser.add_option("-H", "--halt-on-error", action="store_true", default=False, dest="hoe", help="halt if an error is encountered") +parser.add_option("-t", "--time", action="store_true", default=False, + help="report timing information on test run") parser.add_option("-D", "--define", metavar="DEFINE", dest="defines", action="append", default=[], help="define test parameters") @@ -165,7 +167,9 @@ KEYWORDS = {"pass": (32,), "start": (34,), "total": (34,), "ignored": (33,), - "selected": (34,)} + "selected": (34,), + "elapsed": (34,), + "average": (34,)} COLORIZE = is_smart() @@ -525,6 +529,7 @@ total = len(filtered) + len(ignored) passed = 0 failed = 0 skipped = 0 +start = time.time() for t in filtered: if list_only: print t.name() @@ -538,6 +543,7 @@ for t in filtered: failed += 1 if opts.hoe: break +end = time.time() run = passed + failed @@ -558,16 +564,22 @@ if not list_only: skip = "skip" else: skip = "pass" - print colorize("Totals:", 1), \ - colorize_word("total", "%s tests" % total) + ",", \ - colorize_word(_pass, "%s passed" % passed) + ",", \ - colorize_word(skip, "%s skipped" % skipped) + ",", \ - colorize_word(ign, "%s ignored" % len(ignored)) + ",", \ - colorize_word(outcome, "%s failed" % failed), + print colorize("Totals:", 1), + totals = [colorize_word("total", "%s tests" % total), + colorize_word(_pass, "%s passed" % passed), + colorize_word(skip, "%s skipped" % skipped), + colorize_word(ign, "%s ignored" % len(ignored)), + colorize_word(outcome, "%s failed" % failed)] + print ", ".join(totals), if opts.hoe and failed > 0: print " -- (halted after %s)" % run else: print + if opts.time and run > 0: + print colorize("Timing:", 1), + timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)), + colorize_word("average", "%.2fs average" % ((end - start)/run))] + print ", ".join(timing) if failed or skipped: sys.exit(1) diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py index a09686badd..3712bdd221 100644 --- a/qpid/python/qpid/messaging/driver.py +++ b/qpid/python/qpid/messaging/driver.py @@ -283,6 +283,11 @@ EMPTY_MP = MessageProperties() SUBJECT = "qpid.subject" TO = "qpid.to" +CLOSED = "CLOSED" +READ_ONLY = "READ_ONLY" +WRITE_ONLY = "WRITE_ONLY" +OPEN = "OPEN" + class Driver: def __init__(self, connection): @@ -290,24 +295,158 @@ class Driver: self.log_id = "%x" % id(self.connection) self._lock = self.connection._lock - self._in = LinkIn() - self._out = LinkOut() - self._selector = Selector.default() self._attempts = 0 self._hosts = [(self.connection.host, self.connection.port)] + \ self.connection.backups self._host = 0 self._retrying = False + self._socket = None + + self._timeout = None + + self.engine = None + + @synchronized + def wakeup(self): + self.dispatch() + self._selector.wakeup() + + def start(self): + self._selector.register(self) + + def fileno(self): + return self._socket.fileno() - self.reset() + @synchronized + def reading(self): + return self._socket is not None - def reset(self): - self._opening = False + @synchronized + def writing(self): + return self._socket is not None and self.engine.pending() + + @synchronized + def timing(self): + return self._timeout + + @synchronized + def readable(self): + try: + data = self._socket.recv(64*1024) + if data: + rawlog.debug("READ[%s]: %r", self.log_id, data) + self.engine.write(data) + else: + self.close_engine() + except socket.error, e: + self.close_engine(e) + + self.update_status() + + self.connection._waiter.notifyAll() + + def close_engine(self, e=None): + if e is None: + e = "connection aborted" + + if (self.connection.reconnect and + (self.connection.reconnect_limit is None or + self.connection.reconnect_limit <= 0 or + self._attempts <= self.connection.reconnect_limit)): + if self._host > 0: + delay = 0 + else: + delay = self.connection.reconnect_delay + self._timeout = time.time() + delay + log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) + if delay > 0: + log.warn("sleeping %s seconds" % delay) + self._retrying = True + self.engine.close() + else: + self.engine.close(e) + + def update_status(self): + status = self.engine.status() + return getattr(self, "st_%s" % status.lower())() + + def st_closed(self): + # XXX: this log statement seems to sometimes hit when the socket is not connected + # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) + self._socket.close() + self._socket = None + self.engine = None + return True + + def st_open(self): + return False + + @synchronized + def writeable(self): + notify = False + try: + n = self._socket.send(self.engine.peek()) + sent = self.engine.read(n) + rawlog.debug("SENT[%s]: %r", self.log_id, sent) + except socket.error, e: + self.close_engine(e) + notify = True + + if self.update_status() or notify: + self.connection._waiter.notifyAll() + + @synchronized + def timeout(self): + self.dispatch() + self.connection._waiter.notifyAll() + + def dispatch(self): + try: + if self._socket is None: + if self.connection._connected: + self.connect() + else: + self.engine.dispatch() + except: + # XXX: Does socket get leaked if this occurs? + msg = compat.format_exc() + self.connection.error = (msg,) + + def connect(self): + try: + # XXX: should make this non blocking + if self._host == 0: + self._attempts += 1 + host, port = self._hosts[self._host] + if self._retrying: + log.warn("trying: %s:%s", host, port) + self.engine = Engine(self.connection) + self.engine.open() + rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) + self._socket = connect(host, port) + if self._retrying: + log.warn("reconnect succeeded: %s:%s", host, port) + self._timeout = None + self._attempts = 0 + self._host = 0 + self._retrying = False + except socket.error, e: + self._host = (self._host + 1) % len(self._hosts) + self.close_engine(e) + +class Engine: + + def __init__(self, connection): + self.connection = connection + self.log_id = "%x" % id(self.connection) self._closing = False self._connected = False self._attachments = {} + self._in = LinkIn() + self._out = LinkOut() + self._channel_max = 65536 self._channels = 0 self._sessions = {} @@ -316,7 +455,7 @@ class Driver: self.address_cache = Cache(options.get("address_ttl", 60)) - self._socket = None + self._status = CLOSED self._buf = "" self._hdr = "" self._op_enc = OpEncoder() @@ -325,7 +464,6 @@ class Driver: self._frame_dec = FrameDecoder() self._seg_dec = SegmentDecoder() self._op_dec = OpDecoder() - self._timeout = None self._sasl = sasl.Client() if self.connection.username: @@ -343,6 +481,9 @@ class Driver: self._sasl_encode = False self._sasl_decode = False + def _reset(self): + self.connection._transport_connected = False + for ssn in self.connection.sessions.values(): for m in ssn.acked + ssn.unacked + ssn.incoming: m._transfer_id = None @@ -352,76 +493,40 @@ class Driver: rcv.impending = rcv.received rcv.linked = False - @synchronized - def wakeup(self): - self.dispatch() - self._selector.wakeup() - - def start(self): - self._selector.register(self) - - def fileno(self): - return self._socket.fileno() - - @synchronized - def reading(self): - return self._socket is not None - - @synchronized - def writing(self): - return self._socket is not None and self._buf - - @synchronized - def timing(self): - return self._timeout + def status(self): + return self._status - @synchronized - def readable(self): - error = None - recoverable = False + def write(self, data): try: - data = self._socket.recv(64*1024) - if data: - rawlog.debug("READ[%s]: %r", self.log_id, data) - if self._sasl_decode: - data = self._sasl.decode(data) - else: - rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername()) - error = "connection aborted" - recoverable = True - except socket.error, e: - error = e - recoverable = True - - if not error: - try: - if len(self._hdr) < 8: - r = 8 - len(self._hdr) - self._hdr += data[:r] - data = data[r:] - - if len(self._hdr) == 8: - self.do_header(self._hdr) - - self._frame_dec.write(data) - self._seg_dec.write(*self._frame_dec.read()) - self._op_dec.write(*self._seg_dec.read()) - for op in self._op_dec.read(): - self.assign_id(op) - opslog.debug("RCVD[%s]: %r", self.log_id, op) - op.dispatch(self) - except VersionError, e: - error = e - except: - msg = compat.format_exc() - error = msg - - if error: - self._error(error, recoverable) - else: + if self._sasl_decode: + data = self._sasl.decode(data) + + if len(self._hdr) < 8: + r = 8 - len(self._hdr) + self._hdr += data[:r] + data = data[r:] + + if len(self._hdr) == 8: + self.do_header(self._hdr) + + self._frame_dec.write(data) + self._seg_dec.write(*self._frame_dec.read()) + self._op_dec.write(*self._seg_dec.read()) + for op in self._op_dec.read(): + self.assign_id(op) + opslog.debug("RCVD[%s]: %r", self.log_id, op) + op.dispatch(self) self.dispatch() + except VersionError, e: + self.close(e) + except: + self.close(compat.format_exc()) - self.connection._waiter.notifyAll() + def close(self, e=None): + self._reset() + if e: + self.connection.error = (e,) + self._status = CLOSED def assign_id(self, op): if isinstance(op, Command): @@ -429,40 +534,16 @@ class Driver: op.id = sst.received sst.received += 1 - @synchronized - def writeable(self): - try: - n = self._socket.send(self._buf) - rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n]) - self._buf = self._buf[n:] - except socket.error, e: - self._error(e, True) - self.connection._waiter.notifyAll() + def pending(self): + return len(self._buf) - @synchronized - def timeout(self): - self.dispatch() - self.connection._waiter.notifyAll() + def read(self, n): + result = self._buf[:n] + self._buf = self._buf[n:] + return result - def _error(self, err, recoverable): - if self._socket is not None: - self._socket.close() - self.reset() - if (recoverable and self.connection.reconnect and - (self.connection.reconnect_limit is None or - self.connection.reconnect_limit <= 0 or - self._attempts <= self.connection.reconnect_limit)): - if self._host > 0: - delay = 0 - else: - delay = self.connection.reconnect_delay - self._timeout = time.time() + delay - log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err)) - if delay > 0: - log.warn("sleeping %s seconds" % delay) - self._retrying = True - else: - self.connection.error = (err,) + def peek(self): + return self._buf def write_op(self, op): opslog.debug("SENT[%s]: %r", self.log_id, op) @@ -507,6 +588,7 @@ class Driver: def do_connection_open_ok(self, open_ok): self._connected = True self._sasl_decode = True + self.connection._transport_connected = True def connection_heartbeat(self, hrt): self.write_op(ConnectionHeartbeat()) @@ -522,8 +604,7 @@ class Driver: # probably the right thing to do def do_connection_close_ok(self, close_ok): - self._socket.close() - self.reset() + self.close() def do_session_attached(self, atc): pass @@ -576,40 +657,18 @@ class Driver: sst.session.error = (ex,) def dispatch(self): - try: - if self._socket is None and self.connection._connected and not self._opening: - self.connect() - elif self._socket is not None and not self.connection._connected and not self._closing: - self.disconnect() - - if self._connected and not self._closing: - for ssn in self.connection.sessions.values(): - self.attach(ssn) - self.process(ssn) - except: - msg = compat.format_exc() - self.connection.error = (msg,) + if not self.connection._connected and not self._closing and self._status != CLOSED: + self.disconnect() - def connect(self): - try: - # XXX: should make this non blocking - if self._host == 0: - self._attempts += 1 - host, port = self._hosts[self._host] - if self._retrying: - log.warn("trying: %s:%s", host, port) - self._socket = connect(host, port) - if self._retrying: - log.warn("reconnect succeeded: %s:%s", host, port) - self._timeout = None - self._attempts = 0 - self._host = 0 - self._retrying = False - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) - self._opening = True - except socket.error, e: - self._host = (self._host + 1) % len(self._hosts) - self._error(e, True) + if self._connected and not self._closing: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + + def open(self): + self._reset() + self._status = OPEN + self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) def disconnect(self): self.write_op(ConnectionClose(close_code.normal)) @@ -1023,7 +1082,6 @@ class Driver: rcv.received += 1 log.debug("RCVD[%s]: %s", ssn.log_id, msg) ssn.incoming.append(msg) - self.connection._waiter.notifyAll() def _decode(self, xfr): dp = EMPTY_DP diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py index 596866de66..004cee5f88 100644 --- a/qpid/python/qpid/messaging/endpoints.py +++ b/qpid/python/qpid/messaging/endpoints.py @@ -94,6 +94,7 @@ class Connection: self.session_counter = 0 self.sessions = {} self._connected = False + self._transport_connected = False self._lock = RLock() self._condition = Condition(self._lock) self._waiter = Waiter(self._condition) @@ -157,7 +158,7 @@ class Connection: """ self._connected = True self._wakeup() - self._ewait(lambda: self._driver._connected and not self._unlinked(), + self._ewait(lambda: self._transport_connected and not self._unlinked(), exc=ConnectError) def _unlinked(self): @@ -173,7 +174,7 @@ class Connection: """ self._connected = False self._wakeup() - self._ewait(lambda: not self._driver._connected) + self._ewait(lambda: not self._transport_connected) @synchronized def connected(self): -- cgit v1.2.1