summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-27 00:38:13 +0000
committerTed Ross <tross@apache.org>2010-02-27 00:38:13 +0000
commitacf3a1931ec404d1b02a2e115ef18e531d3924e4 (patch)
tree2a0b998795a676bae4ddc53cdacc82197885f771
parent3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (diff)
downloadqpid-python-acf3a1931ec404d1b02a2e115ef18e531d3924e4.tar.gz
Rebased the wmf branch to the trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916887 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qpid/Exception.h2
-rw-r--r--qpid/cpp/include/qpid/sys/posix/check.h1
-rwxr-xr-xqpid/cpp/include/qpid/sys/windows/check.h1
-rw-r--r--qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb1
-rwxr-xr-xqpid/cpp/rubygen/framing.0-10/constants.rb2
-rw-r--r--qpid/cpp/src/CMakeLists.txt5
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.cpp2
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp1
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp1
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.h6
-rw-r--r--qpid/cpp/src/qpid/cluster/Cpg.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.cpp7
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.h6
-rw-r--r--qpid/cpp/src/qpid/console/Value.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/Array.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/BodyHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/FieldTable.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/FieldValue.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/List.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceSet.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/Uuid.cpp1
-rw-r--r--qpid/cpp/src/qpid/messaging/Variant.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/AggregateOutput.cpp4
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Shlib.cpp1
-rw-r--r--qpid/cpp/src/qpid/sys/ssl/check.h2
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/TxMocks.h1
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py54
-rw-r--r--qpid/cpp/src/tests/failover_soak.cpp159
-rwxr-xr-xqpid/cpp/src/tests/run_failover_soak4
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in3
-rwxr-xr-x[-rw-r--r--]qpid/cpp/src/tests/verify_cluster_objects34
-rw-r--r--qpid/extras/qmf/src/py/qmf2/agent.py687
-rw-r--r--qpid/extras/qmf/src/py/qmf2/common.py83
-rw-r--r--qpid/extras/qmf/src/py/qmf2/console.py589
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/__init__.py1
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py4
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/async_method.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/async_query.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/basic_method.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/basic_query.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/events.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/multi_response.py4
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py2
-rw-r--r--qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py808
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties1
-rw-r--r--qpid/java/module.xml1
-rwxr-xr-xqpid/java/perftests/etc/scripts/drainBroker.sh41
-rwxr-xr-xqpid/java/perftests/etc/scripts/fillBroker.sh41
-rwxr-xr-xqpid/java/perftests/etc/scripts/testWithPreFill.sh41
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java17
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java5
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java62
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java111
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java160
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java132
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java7
-rw-r--r--qpid/java/test-profiles/JavaExcludes3
-rw-r--r--qpid/java/test-profiles/JavaStandaloneExcludes1
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes2
-rw-r--r--qpid/java/test-profiles/java-derby.testprofile1
-rwxr-xr-xqpid/python/qpid-python-test28
-rw-r--r--qpid/python/qpid/messaging/driver.py342
-rw-r--r--qpid/python/qpid/messaging/endpoints.py5
68 files changed, 2858 insertions, 655 deletions
diff --git a/qpid/cpp/include/qpid/Exception.h b/qpid/cpp/include/qpid/Exception.h
index 7b937c242a..fa7111160c 100644
--- a/qpid/cpp/include/qpid/Exception.h
+++ b/qpid/cpp/include/qpid/Exception.h
@@ -26,9 +26,7 @@
#include "qpid/framing/constants.h"
#include "qpid/framing/enum.h"
#include "qpid/sys/StrError.h"
-#include "qpid/Msg.h"
#include "qpid/CommonImportExport.h"
-#include <memory>
#include <string>
#include <errno.h>
diff --git a/qpid/cpp/include/qpid/sys/posix/check.h b/qpid/cpp/include/qpid/sys/posix/check.h
index bbc66d389b..1bfe5d6d78 100644
--- a/qpid/cpp/include/qpid/sys/posix/check.h
+++ b/qpid/cpp/include/qpid/sys/posix/check.h
@@ -23,6 +23,7 @@
*/
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <cerrno>
#include <assert.h>
diff --git a/qpid/cpp/include/qpid/sys/windows/check.h b/qpid/cpp/include/qpid/sys/windows/check.h
index aba38814b2..2a8e439bed 100755
--- a/qpid/cpp/include/qpid/sys/windows/check.h
+++ b/qpid/cpp/include/qpid/sys/windows/check.h
@@ -23,6 +23,7 @@
*/
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include "qpid/sys/StrError.h"
#define QPID_WINDOWS_ERROR(ERRVAL) qpid::Exception(QPID_MSG(qpid::sys::strError(ERRVAL)))
diff --git a/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb b/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb
index 95c79fd1a4..28a5d94e32 100644
--- a/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb
+++ b/qpid/cpp/rubygen/framing.0-10/MethodBodyFactory.rb
@@ -35,6 +35,7 @@ class MethodBodyFactoryGen < CppGen
include "qpid/framing/BodyFactory"
@amqp.methods_.each { |m| include "qpid/framing/#{m.body_name}" }
include "qpid/Exception.h"
+ include "qpid/Msg.h"
genl
namespace(@namespace) {
scope("boost::intrusive_ptr<AMQMethodBody> #{@classname}::create(ClassId c, MethodId m) {") {
diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb
index 5c1c1047f7..85bfb96ac0 100755
--- a/qpid/cpp/rubygen/framing.0-10/constants.rb
+++ b/qpid/cpp/rubygen/framing.0-10/constants.rb
@@ -78,6 +78,7 @@ EOS
cpp_file(path) {
include(path);
include("qpid/Exception.h")
+ include("qpid/Msg.h")
include("<ostream>")
namespace(@namespace) {
scope("const char* typeName(TypeCode t) {") {
@@ -181,6 +182,7 @@ EOS
def reply_exceptions_cpp()
cpp_file("#{@dir}/reply_exceptions") {
include "#{@dir}/reply_exceptions"
+ include "qpid/Msg.h"
include "<sstream>"
include "<assert.h>"
namespace("qpid::framing") {
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 62bab239be..a7078bea47 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -855,7 +855,6 @@ set (qmfengine_SOURCES
qmf/engine/Protocol.h
qmf/engine/QueryImpl.cpp
qmf/engine/QueryImpl.h
- qmf/engine/ResilientConnection.cpp
qmf/engine/SequenceManager.cpp
qmf/engine/SequenceManager.h
qmf/engine/SchemaImpl.cpp
@@ -863,6 +862,10 @@ set (qmfengine_SOURCES
qmf/engine/ValueImpl.cpp
qmf/engine/ValueImpl.h
)
+if (NOT WIN32)
+ list(APPEND qmfengine_SOURCES qmf/engine/ResilientConnection.cpp)
+endif (NOT WIN32)
+
add_library (qmfengine SHARED ${qmfengine_SOURCES})
target_link_libraries (qmfengine qpidclient)
set_target_properties (qmfengine PROPERTIES
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp
index 79d5a491fc..27501cc396 100644
--- a/qpid/cpp/src/qmf/engine/EventImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/EventImpl.cpp
@@ -20,6 +20,8 @@
#include <qmf/engine/EventImpl.h>
#include <qmf/engine/ValueImpl.h>
+#include <sstream>
+
using namespace std;
using namespace qmf::engine;
using qpid::framing::Buffer;
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
index 670ee385a3..9216f7bac0 100644
--- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
@@ -19,6 +19,7 @@
#include "qmf/engine/ObjectIdImpl.h"
#include <stdlib.h>
+#include <sstream>
using namespace std;
using namespace qmf::engine;
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
index a4f56464a7..e276df8cec 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -23,6 +23,7 @@
#include <string>
#include <vector>
#include <assert.h>
+#include <sstream>
using namespace std;
using namespace qmf::engine;
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index d78c921c67..71a10559cf 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -25,6 +25,7 @@
#include <string>
#include <vector>
#include <exception>
+#include <memory>
namespace qmf {
namespace engine {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index e718819f48..f49fbb03a5 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -285,6 +285,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
}
Cluster::~Cluster() {
+ broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
}
@@ -914,6 +915,12 @@ void Cluster::memberUpdate(Lock& l) {
size_t size = urls.size();
failoverExchange->updateUrls(urls);
+ if (store.hasStore()) {
+ // Mark store clean if I am the only broker, dirty otherwise.
+ if (size == 1) store.clean(Uuid(true));
+ else store.dirty(clusterId);
+ }
+
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
@@ -996,7 +1003,6 @@ void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNu
}
void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
- QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name)
timer->deliverWakeup(name);
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.h b/qpid/cpp/src/qpid/cluster/ClusterTimer.h
index 395e505451..69f6c622e4 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.h
@@ -30,6 +30,12 @@ namespace cluster {
class Cluster;
+/**
+ * Timer implementation that executes tasks consistently in the
+ * deliver thread across a cluster. Task is not executed when timer
+ * fires, instead the elder multicasts a wakeup. The task is executed
+ * when the wakeup is delivered.
+ */
class ClusterTimer : public sys::Timer {
public:
ClusterTimer(Cluster&);
diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp
index 3ae0c970c7..0856bcd824 100644
--- a/qpid/cpp/src/qpid/cluster/Cpg.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp
@@ -54,7 +54,6 @@ void Cpg::callCpg ( CpgOp & c ) {
unsigned int snooze = 10;
for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) {
if ( CPG_OK == (result = c.op(handle, & group))) {
- QPID_LOG(info, c.opName << " successful.");
break;
}
else if ( result == CPG_ERR_TRY_AGAIN ) {
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
index cf75cd3b5f..648fcfbbd5 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -20,6 +20,7 @@
*/
#include "StoreStatus.h"
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/filesystem/operations.hpp>
@@ -113,7 +114,12 @@ void StoreStatus::save() {
}
}
+bool StoreStatus::hasStore() const {
+ return state != framing::cluster::STORE_STATE_NO_STORE;
+}
+
void StoreStatus::dirty(const Uuid& clusterId_) {
+ if (!hasStore()) return;
assert(clusterId_);
clusterId = clusterId_;
shutdownId = Uuid();
@@ -122,6 +128,7 @@ void StoreStatus::dirty(const Uuid& clusterId_) {
}
void StoreStatus::clean(const Uuid& shutdownId_) {
+ if (!hasStore()) return;
assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
index 911b3a2ba2..2371f0424e 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.h
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -46,14 +46,14 @@ class StoreStatus
const Uuid& getShutdownId() const { return shutdownId; }
framing::SequenceNumber getConfigSeq() const { return configSeq; }
- void dirty(const Uuid& start); // Start using the store.
- void clean(const Uuid& stop); // Stop using the store.
+ void dirty(const Uuid& clusterId); // Mark the store in use by clusterId.
+ void clean(const Uuid& shutdownId); // Mark the store clean at shutdownId
void setConfigSeq(framing::SequenceNumber seq); // Update the config seq number.
void load();
void save();
- bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
+ bool hasStore() const;
private:
framing::cluster::StoreState state;
diff --git a/qpid/cpp/src/qpid/console/Value.cpp b/qpid/cpp/src/qpid/console/Value.cpp
index c30660f1dc..47c6a4ce57 100644
--- a/qpid/cpp/src/qpid/console/Value.cpp
+++ b/qpid/cpp/src/qpid/console/Value.cpp
@@ -22,6 +22,8 @@
#include "qpid/console/Value.h"
#include "qpid/framing/Buffer.h"
+#include <sstream>
+
using namespace qpid;
using namespace qpid::console;
using namespace std;
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
index 5c5920d786..d863970ece 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
@@ -24,6 +24,8 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/BodyFactory.h"
#include "qpid/framing/MethodBodyFactory.h"
+#include "qpid/Msg.h"
+
#include <boost/format.hpp>
#include <iostream>
diff --git a/qpid/cpp/src/qpid/framing/Array.cpp b/qpid/cpp/src/qpid/framing/Array.cpp
index d95e0d167d..454e8e298f 100644
--- a/qpid/cpp/src/qpid/framing/Array.cpp
+++ b/qpid/cpp/src/qpid/framing/Array.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
#include <assert.h>
namespace qpid {
diff --git a/qpid/cpp/src/qpid/framing/BodyHandler.cpp b/qpid/cpp/src/qpid/framing/BodyHandler.cpp
index e2128596ed..db302b1e4c 100644
--- a/qpid/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/qpid/cpp/src/qpid/framing/BodyHandler.cpp
@@ -25,6 +25,7 @@
#include "qpid/framing/AMQHeartbeatBody.h"
#include <boost/cast.hpp>
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
using namespace qpid::framing;
using namespace boost;
diff --git a/qpid/cpp/src/qpid/framing/FieldTable.cpp b/qpid/cpp/src/qpid/framing/FieldTable.cpp
index e2e91e450a..023e4af819 100644
--- a/qpid/cpp/src/qpid/framing/FieldTable.cpp
+++ b/qpid/cpp/src/qpid/framing/FieldTable.cpp
@@ -25,6 +25,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
#include <assert.h>
namespace qpid {
diff --git a/qpid/cpp/src/qpid/framing/FieldValue.cpp b/qpid/cpp/src/qpid/framing/FieldValue.cpp
index 0b49748de8..fd911645f4 100644
--- a/qpid/cpp/src/qpid/framing/FieldValue.cpp
+++ b/qpid/cpp/src/qpid/framing/FieldValue.cpp
@@ -24,6 +24,7 @@
#include "qpid/framing/Endian.h"
#include "qpid/framing/List.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
namespace qpid {
namespace framing {
diff --git a/qpid/cpp/src/qpid/framing/List.cpp b/qpid/cpp/src/qpid/framing/List.cpp
index bde7dabbac..963ebc206b 100644
--- a/qpid/cpp/src/qpid/framing/List.cpp
+++ b/qpid/cpp/src/qpid/framing/List.cpp
@@ -23,6 +23,7 @@
#include "qpid/framing/FieldValue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
namespace qpid {
namespace framing {
diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
index dcfb4689b6..72fcd8a9e2 100644
--- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
@@ -22,6 +22,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
using namespace qpid::framing;
using std::max;
diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp
index 432c7ab94e..67ca96d53f 100644
--- a/qpid/cpp/src/qpid/framing/Uuid.cpp
+++ b/qpid/cpp/src/qpid/framing/Uuid.cpp
@@ -22,6 +22,7 @@
#include "qpid/Exception.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/Msg.h"
namespace qpid {
namespace framing {
diff --git a/qpid/cpp/src/qpid/messaging/Variant.cpp b/qpid/cpp/src/qpid/messaging/Variant.cpp
index 116018f797..ba93f160ec 100644
--- a/qpid/cpp/src/qpid/messaging/Variant.cpp
+++ b/qpid/cpp/src/qpid/messaging/Variant.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/messaging/Variant.h"
+#include "qpid/Msg.h"
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
#include <algorithm>
diff --git a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
index 4f0a4fa5af..fc95f46fb9 100644
--- a/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
+++ b/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
@@ -34,6 +34,7 @@ void AggregateOutput::activateOutput() { control.activateOutput(); }
void AggregateOutput::giveReadCredit(int32_t credit) { control.giveReadCredit(credit); }
+namespace {
// Clear the busy flag and notify waiting threads in destructor.
struct ScopedBusy {
bool& flag;
@@ -41,7 +42,8 @@ struct ScopedBusy {
ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; }
~ScopedBusy() { flag = false; monitor.notifyAll(); }
};
-
+}
+
bool AggregateOutput::doOutput() {
Mutex::ScopedLock l(lock);
ScopedBusy sb(busy, lock);
diff --git a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
index 299331103c..3fb685d5b8 100644
--- a/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -20,6 +20,7 @@
#include "qpid/sys/Shlib.h"
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include <dlfcn.h>
diff --git a/qpid/cpp/src/qpid/sys/ssl/check.h b/qpid/cpp/src/qpid/sys/ssl/check.h
index 984c338a18..94db120afa 100644
--- a/qpid/cpp/src/qpid/sys/ssl/check.h
+++ b/qpid/cpp/src/qpid/sys/ssl/check.h
@@ -21,6 +21,8 @@
* under the License.
*
*/
+#include "qpid/Msg.h"
+
#include <iostream>
#include <string>
#include <nspr.h>
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 4d65803ac1..4a931ab680 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -298,7 +298,6 @@ TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
- BOOST_TEST_SHOW_PROGRESS=yes \
$(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
diff --git a/qpid/cpp/src/tests/TxMocks.h b/qpid/cpp/src/tests/TxMocks.h
index a34d864bae..72cb50cd21 100644
--- a/qpid/cpp/src/tests/TxMocks.h
+++ b/qpid/cpp/src/tests/TxMocks.h
@@ -23,6 +23,7 @@
#include "qpid/Exception.h"
+#include "qpid/Msg.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
#include <iostream>
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index b3274b1b1e..22b7c8f5b8 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -305,21 +305,47 @@ class StoreTests(BrokerTest):
self.assertRaises(Exception, lambda: a.ready())
self.assertRaises(Exception, lambda: b.ready())
- def test_total_failure(self):
- # Verify we abort with sutiable error message if no clean stores.
- cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=True)
- a.kill()
- b.kill()
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
- self.assertRaises(Exception, lambda: a.ready())
- self.assertRaises(Exception, lambda: b.ready())
+ def assert_dirty_store(self, broker):
+ self.assertRaises(Exception, lambda: broker.ready())
msg = re.compile("critical.*no clean store")
- assert msg.search(readfile(a.log))
- assert msg.search(readfile(b.log))
+ assert msg.search(readfile(broker.log))
+
+ def test_solo_store_clean(self):
+ # A single node cluster should always leave a clean store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
+
+ def test_last_store_clean(self):
+
+ # Verify that only the last node in a cluster to shut down has
+ # a clean store. Start with cluster of 3, reduce to 1 then
+ # increase again to ensure that a node that was once alone but
+ # finally did not finish as the last node does not get a clean
+ # store.
+ cluster = self.cluster(0, self.args())
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL)
+ a.send_message("q", Message("x", durable=True))
+ a.kill()
+ b.kill() # c is last man
+ time.sleep(0.1) # pause for c to find out hes last.
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL) # c no longer last man
+ c.kill() # a is now last man
+ time.sleep(0.1) # pause for a to find out hes last.
+ a.kill() # really last
+ # b & c should be dirty
+ b = cluster.start("b", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(b)
+ c = cluster.start("c", wait=False, expect=EXPECT_EXIT_OK)
+ self.assert_dirty_store(c)
+ # a should be clean
+ a = cluster.start("a")
+ self.assertEqual(a.get_message("q").content, "x")
- # FIXME aconway 2009-12-03: verify manual restore procedure
diff --git a/qpid/cpp/src/tests/failover_soak.cpp b/qpid/cpp/src/tests/failover_soak.cpp
index ed5c9cbee5..8bf6eca9e6 100644
--- a/qpid/cpp/src/tests/failover_soak.cpp
+++ b/qpid/cpp/src/tests/failover_soak.cpp
@@ -54,6 +54,8 @@ using namespace qpid::client;
namespace qpid {
namespace tests {
+vector<pid_t> pids;
+
typedef vector<ForkedBroker *> brokerVector;
typedef enum
@@ -184,17 +186,29 @@ struct children : public vector<child *>
int
checkChildren ( )
{
- vector<child *>::iterator i;
- for ( i = begin(); i != end(); ++ i )
- if ( (COMPLETED == (*i)->status) && (0 != (*i)->retval) )
- {
- cerr << "checkChildren: error on child of type "
- << (*i)->type
- << endl;
- return (*i)->retval;
- }
+ for ( unsigned int i = 0; i < pids.size(); ++ i )
+ {
+ int pid = pids[i];
+ int returned_pid;
+ int status;
- return 0;
+ child * kid = get ( pid );
+
+ if ( kid->status != COMPLETED )
+ {
+ returned_pid = waitpid ( pid, &status, WNOHANG );
+
+ if ( returned_pid == pid )
+ {
+ int exit_status = WEXITSTATUS(status);
+ exited ( pid, exit_status );
+ if ( exit_status ) // this is a child error.
+ return exit_status;
+ }
+ }
+ }
+
+ return 0;
}
@@ -323,6 +337,7 @@ startNewBroker ( brokerVector & brokers,
int verbosity,
int durable )
{
+ // ("--log-enable=notice+")
static int brokerId = 0;
stringstream path, prefix;
prefix << "soak-" << brokerId;
@@ -516,6 +531,7 @@ startReceivingClient ( brokerVector brokers,
argv.push_back ( 0 );
pid_t pid = fork();
+ pids.push_back ( pid );
if ( ! pid ) {
execv ( receiverPath, const_cast<char * const *>(&argv[0]) );
@@ -571,6 +587,7 @@ startSendingClient ( brokerVector brokers,
argv.push_back ( 0 );
pid_t pid = fork();
+ pids.push_back ( pid );
if ( ! pid ) {
execv ( senderPath, const_cast<char * const *>(&argv[0]) );
@@ -602,6 +619,7 @@ using namespace qpid::tests;
int
main ( int argc, char const ** argv )
{
+ int brokerKills = 0;
if ( argc != 11 ) {
cerr << "Usage: "
<< argv[0]
@@ -625,7 +643,6 @@ main ( int argc, char const ** argv )
int n_brokers = atoi(argv[i++]);
char const * host = "127.0.0.1";
- int maxBrokers = 50;
allMyChildren.verbosity = verbosity;
@@ -722,104 +739,86 @@ main ( int argc, char const ** argv )
int minSleep = 2,
- maxSleep = 4;
+ maxSleep = 6;
+ int totalBrokers = n_brokers;
- for ( int totalBrokers = n_brokers;
- totalBrokers < maxBrokers;
- ++ totalBrokers
- )
+ int loop = 0;
+
+ while ( 1 )
{
+ ++ loop;
+
+ /*
+ if ( verbosity > 1 )
+ std::cerr << "------- loop " << loop << " --------\n";
+
if ( verbosity > 0 )
cout << totalBrokers << " brokers have been added to the cluster.\n\n\n";
+ */
// Sleep for a while. -------------------------
int sleepyTime = mrand ( minSleep, maxSleep );
- if ( verbosity > 0 )
- cout << "Sleeping for " << sleepyTime << " seconds.\n";
sleep ( sleepyTime );
- // Kill the oldest broker. --------------------------
- if ( ! killFrontBroker ( brokers, verbosity ) )
+ int bullet = mrand ( 100 );
+ if ( bullet >= 95 )
+ {
+ fprintf ( stderr, "Killing oldest broker...\n" );
+
+ // Kill the oldest broker. --------------------------
+ if ( ! killFrontBroker ( brokers, verbosity ) )
+ {
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers, 5 );
+ std::cerr << "END_OF_TEST ERROR_BROKER\n";
+ return ERROR_KILLING_BROKER;
+ }
+ ++ brokerKills;
+
+ // Start a new broker. --------------------------
+ if ( verbosity > 0 )
+ cout << "Starting new broker.\n\n";
+
+ startNewBroker ( brokers,
+ moduleOrDir,
+ clusterName,
+ verbosity,
+ durable );
+ ++ totalBrokers;
+ printBrokers ( brokers );
+ cerr << brokerKills << " brokers have been killed.\n\n\n";
+ }
+
+ int retval = allMyChildren.checkChildren();
+ if ( retval )
{
- allMyChildren.killEverybody();
- killAllBrokers ( brokers, 5 );
- std::cerr << "END_OF_TEST ERROR_BROKER\n";
- return ERROR_KILLING_BROKER;
+ std::cerr << "END_OF_TEST ERROR_CLIENT\n";
+ allMyChildren.killEverybody();
+ killAllBrokers ( brokers, 5 );
+ return ERROR_ON_CHILD;
}
- // Sleep for a while. -------------------------
- sleepyTime = mrand ( minSleep, maxSleep );
- if ( verbosity > 1 )
- cerr << "Sleeping for " << sleepyTime << " seconds.\n";
- sleep ( sleepyTime );
-
- // Start a new broker. --------------------------
- if ( verbosity > 0 )
- cout << "Starting new broker.\n\n";
-
- startNewBroker ( brokers,
- moduleOrDir,
- clusterName,
- verbosity,
- durable );
-
- if ( verbosity > 1 )
- printBrokers ( brokers );
-
// If all children have exited, quit.
int unfinished = allMyChildren.unfinished();
- if ( ! unfinished ) {
+ if ( unfinished == 0 ) {
killAllBrokers ( brokers, 5 );
if ( verbosity > 1 )
cout << "failoverSoak: all children have exited.\n";
- int retval = allMyChildren.checkChildren();
- if ( verbosity > 1 )
- std::cerr << "failoverSoak: checkChildren: " << retval << endl;
-
- if ( retval )
- {
- std::cerr << "END_OF_TEST ERROR_CLIENT\n";
- return ERROR_ON_CHILD;
- }
- else
- {
- std::cerr << "END_OF_TEST SUCCESSFUL\n";
- return HUNKY_DORY;
- }
- }
- // Even if some are still running, if there's an error, quit.
- if ( allMyChildren.checkChildren() )
- {
- if ( verbosity > 0 )
- cout << "failoverSoak: error on child.\n";
- allMyChildren.killEverybody();
- killAllBrokers ( brokers, 5 );
- std::cerr << "END_OF_TEST ERROR_CLIENT\n";
- return ERROR_ON_CHILD;
+ std::cerr << "END_OF_TEST SUCCESSFUL\n";
+ return HUNKY_DORY;
}
- if ( verbosity > 1 ) {
- std::cerr << "------- next kill-broker loop --------\n";
- allMyChildren.print();
- }
}
- retval = allMyChildren.checkChildren();
- if ( verbosity > 1 )
- std::cerr << "failoverSoak: checkChildren: " << retval << endl;
-
- if ( verbosity > 1 )
- cout << "failoverSoak: maxBrokers reached.\n";
-
allMyChildren.killEverybody();
killAllBrokers ( brokers, 5 );
std::cerr << "END_OF_TEST SUCCESSFUL\n";
- return retval ? ERROR_ON_CHILD : HUNKY_DORY;
+ return HUNKY_DORY;
}
diff --git a/qpid/cpp/src/tests/run_failover_soak b/qpid/cpp/src/tests/run_failover_soak
index 69551a51c2..c276e9cc2f 100755
--- a/qpid/cpp/src/tests/run_failover_soak
+++ b/qpid/cpp/src/tests/run_failover_soak
@@ -26,12 +26,12 @@ host=127.0.0.1
unset QPID_NO_MODULE_DIR # failover_soak uses --module-dir, dont want clash
MODULES=${MODULES:-$moduledir}
-MESSAGES=${MESSAGES:-1000000}
+MESSAGES=${MESSAGES:-500000}
REPORT_FREQUENCY=${REPORT_FREQUENCY:-20000}
VERBOSITY=${VERBOSITY:-10}
DURABILITY=${DURABILITY:-0}
N_QUEUES=${N_QUEUES:-1}
-N_BROKERS=${N_BROKERS:-3}
+N_BROKERS=${N_BROKERS:-4}
rm -f soak-*.log
exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY $N_QUEUES $N_BROKERS
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 87fbbd128b..07bd4b2bee 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -73,3 +73,6 @@ exportmodule XML_LIB xml.so
export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules
export QPID_DATA_DIR= # Default to no data dir, not ~/.qpidd
+# Options for boost test framework
+export BOOST_TEST_SHOW_PROGRESS=yes
+export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
diff --git a/qpid/cpp/src/tests/verify_cluster_objects b/qpid/cpp/src/tests/verify_cluster_objects
index cea875662f..664b88cb3b 100644..100755
--- a/qpid/cpp/src/tests/verify_cluster_objects
+++ b/qpid/cpp/src/tests/verify_cluster_objects
@@ -75,6 +75,12 @@ class IpAddr:
bestAddr = addrPort
return bestAddr
+class ObjectId:
+ """Object identity, use for dictionaries by object id"""
+ def __init__(self, object): self.object = object
+ def __eq__(self, other): return self.object is other.object
+ def __hash__(self): return hash(id(self.object))
+
class Broker(object):
def __init__(self, qmf, broker):
self.broker = broker
@@ -94,6 +100,7 @@ class Broker(object):
self.uptime = 0
self.tablesByName = {}
self.package = "org.apache.qpid.broker"
+ self.id_cache = {} # Cache for getAbstractId
def getUrl(self):
return self.broker.getUrl()
@@ -114,13 +121,14 @@ class Broker(object):
#
def getAbstractId(self, object):
""" return a string the of the hierarchical name """
+ if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)]
global _debug_recursion
result = u""
valstr = u""
_debug_recursion += 1
debug_prefix = _debug_recursion
if (_verbose > 9):
- print debug_prefix, " enter gai: props ", self._properties
+ print debug_prefix, " enter gai: props ", object._properties
for property, value in object._properties:
# we want to recurse on things which are refs. we tell by
@@ -138,6 +146,7 @@ class Broker(object):
if property.name == "systemRef":
_debug_recursion -= 1
+ self.id_cache[ObjectId(object)] = ""
return ""
if property.index:
@@ -176,6 +185,7 @@ class Broker(object):
if (_verbose > 9):
print debug_prefix, " id ", self, " -> ", result
_debug_recursion -= 1
+ self.id_cache[ObjectId(object)] = result
return result
def loadTable(self, cls):
@@ -196,13 +206,12 @@ class Broker(object):
# error (ie, the name-generation code is busted) if we do
key = self.getAbstractId(obj)
if key in self.tablesByName[cls.getClassName()]:
- print "internal error: collision for %s on key %s\n" % (obj, key)
- sys.exit(1)
+ raise Exception("internal error: collision for %s on key %s\n"
+ % (obj, key))
- self.tablesByName[cls.getClassName()][self.getAbstractId(obj)] = obj
-# sys.exit(1)
+ self.tablesByName[cls.getClassName()][key] = obj
if _verbose > 1:
- print " ", obj.getObjectId(), " ", obj.getIndex(), " ", self.getAbstractId(obj)
+ print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key
class BrokerManager:
@@ -253,9 +262,10 @@ class BrokerManager:
raise Exception("Invalid URL 2")
addrList.append((tokens[1], tokens[2]))
- # Find the address in the list that is most likely to be in the same subnet as the address
- # with which we made the original QMF connection. This increases the probability that we will
- # be able to reach the cluster member.
+ # Find the address in the list that is most likely to be
+ # in the same subnet as the address with which we made the
+ # original QMF connection. This increases the probability
+ # that we will be able to reach the cluster member.
best = hostAddr.bestAddr(addrList)
bestUrl = best[0] + ":" + best[1]
@@ -284,8 +294,7 @@ class BrokerManager:
if _verbose > 0:
print " ", b
else:
- print "Failed - Not a cluster"
- sys.exit(1)
+ raise Exception("Failed - Not a cluster")
failures = []
@@ -348,11 +357,10 @@ class BrokerManager:
print "Failures:"
for failure in failures:
print " %s" % failure
- sys.exit(1)
+ raise Exception("Failures")
if _verbose > 0:
print "Success"
- sys.exit(0)
##
## Main Program
diff --git a/qpid/extras/qmf/src/py/qmf2/agent.py b/qpid/extras/qmf/src/py/qmf2/agent.py
index a6b3c39ad1..d884325071 100644
--- a/qpid/extras/qmf/src/py/qmf2/agent.py
+++ b/qpid/extras/qmf/src/py/qmf2/agent.py
@@ -21,18 +21,19 @@ import logging
import datetime
import time
import Queue
-from threading import Thread, Lock, currentThread, Event
+from threading import Thread, RLock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from common import (make_subject, parse_subject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
- timedelta_to_secs)
+from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem,
+ SchemaMethod, timedelta_to_secs, QMF_APP_ID)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
_callback_thread=None
+
+
##==============================================================================
## METHOD CALL
##==============================================================================
@@ -78,14 +79,73 @@ class MethodCallParams(object):
return self._user_id
+ ##==============================================================================
+ ## SUBSCRIPTIONS
+ ##==============================================================================
+
+class _ConsoleHandle(object):
+ """
+ """
+ def __init__(self, handle, reply_to):
+ self.console_handle = handle
+ self.reply_to = reply_to
+
+class SubscriptionParams(object):
+ """
+ """
+ def __init__(self, console_handle, query, interval, duration, user_id):
+ self._console_handle = console_handle
+ self._query = query
+ self._interval = interval
+ self._duration = duration
+ self._user_id = user_id
+
+ def get_console_handle(self):
+ return self._console_handle
+
+ def get_query(self):
+ return self._query
+
+ def get_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+ def get_user_id(self):
+ return self._user_id
+
+class _SubscriptionState(object):
+ """
+ An internally-managed subscription.
+ """
+ def __init__(self, reply_to, cid, query, interval, duration):
+ self.reply_to = reply_to
+ self.correlation_id = cid
+ self.query = query
+ self.interval = interval
+ self.duration = duration
+ now = datetime.datetime.utcnow()
+ self.next_update = now # do an immediate update
+ self.expiration = now + datetime.timedelta(seconds=duration)
+ self.id = 0
+
+ def resubscribe(self, now, _duration=None):
+ if _duration is not None:
+ self.duration = _duration
+ self.expiration = now + datetime.timedelta(seconds=self.duration)
+
+ def reset_interval(self, now):
+ self.next_update = now + datetime.timedelta(seconds=self.interval)
+
+
##==============================================================================
## AGENT
##==============================================================================
class Agent(Thread):
- def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
- _max_msg_size=0, _capacity=10):
+ def __init__(self, name, _domain=None, _notifier=None, **options):
Thread.__init__(self)
self._running = False
self._ready = Event()
@@ -94,11 +154,20 @@ class Agent(Thread):
self._domain = _domain
self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
- self._heartbeat_interval = _heartbeat_interval
+
+ # configurable parameters
+ #
+ self._heartbeat_interval = options.get("heartbeat_interval", 30)
+ self._capacity = options.get("capacity", 10)
+ self._default_duration = options.get("default_duration", 300)
+ self._max_duration = options.get("max_duration", 3600)
+ self._min_duration = options.get("min_duration", 10)
+ self._default_interval = options.get("default_interval", 30)
+ self._min_interval = options.get("min_interval", 5)
+
# @todo: currently, max # of objects in a single reply message, would
# be better if it were max bytesize of per-msg content...
- self._max_msg_size = _max_msg_size
- self._capacity = _capacity
+ self._max_msg_size = options.get("max_msg_size", 0)
self._conn = None
self._session = None
@@ -107,7 +176,7 @@ class Agent(Thread):
self._direct_sender = None
self._topic_sender = None
- self._lock = Lock()
+ self._lock = RLock()
self._packages = {}
self._schema_timestamp = long(0)
self._schema = {}
@@ -119,6 +188,10 @@ class Agent(Thread):
self._undescribed_data = {}
self._work_q = Queue.Queue()
self._work_q_put = False
+ # subscriptions
+ self._subscription_id = long(time.time())
+ self._subscriptions = {}
+ self._next_subscribe_event = None
def destroy(self, timeout=None):
@@ -192,10 +265,11 @@ class Agent(Thread):
if self.isAlive():
# kick my thread to wake it up
try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self.name,
- content={"noop":"noop"})
+ properties={ "method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
# TRACE
#logging.error("!!! sending wakeup to myself: %s" % msg)
@@ -258,13 +332,14 @@ class Agent(Thread):
raise Exception("No connection available")
# @todo: should we validate against the schema?
- _map = {"_name": self.get_name(),
- "_event": qmfEvent.map_encode()}
- msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ msg = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
qmfEvent.get_severity() + "." + self.name,
- properties={"method":"response",
- "qmf.subject":make_subject(OpCode.event_ind)},
- content={MsgKey.event:_map})
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content": ContentType.event,
+ "qmf.agent":self.name},
+ content=[qmfEvent.map_encode()])
# TRACE
# logging.error("!!! Agent %s sending Event (%s)" %
# (self.name, str(msg)))
@@ -330,9 +405,10 @@ class Agent(Thread):
raise TypeError("Invalid type for error - must be QmfData")
_map[SchemaMethod.KEY_ERROR] = _error.map_encode()
- msg = Message( properties={"method":"response",
- "qmf.subject":make_subject(OpCode.response)},
- content={MsgKey.method:_map})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.method_rsp},
+ content=_map)
msg.correlation_id = handle.correlation_id
self._send_reply(msg, handle.reply_to)
@@ -370,25 +446,10 @@ class Agent(Thread):
while self._running:
- now = datetime.datetime.utcnow()
- # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
- if now >= next_heartbeat:
- ind = self._makeAgentIndMsg()
- ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
- # TRACE
- #logging.error("!!! Agent %s sending Heartbeat (%s)" %
- # (self.name, str(ind)))
- self._topic_sender.send(ind)
- logging.debug("Agent Indication Sent")
- next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
- timeout = timedelta_to_secs(next_heartbeat - now)
- # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
- try:
- self._session.next_receiver(timeout=timeout)
- except Empty:
- continue
-
+ #
+ # Process inbound messages
+ #
+ logging.debug("%s processing inbound messages..." % self.name)
for i in range(batch_limit):
try:
msg = self._topic_receiver.fetch(timeout=0)
@@ -409,7 +470,71 @@ class Agent(Thread):
# (self.name, self._direct_receiver.source, msg))
self._dispatch(msg, _direct=True)
+ #
+ # Send Heartbeat Notification
+ #
+ now = datetime.datetime.utcnow()
+ if now >= next_heartbeat:
+ logging.debug("%s sending heartbeat..." % self.name)
+ ind = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.agent_heartbeat_ind,
+ "qmf.agent":self.name},
+ content=self._makeAgentInfoBody())
+ # TRACE
+ #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
+ logging.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+
+
+ #
+ # Monitor Subscriptions
+ #
+ if (self._next_subscribe_event is None or
+ now >= self._next_subscribe_event):
+
+ logging.debug("%s polling subscriptions..." % self.name)
+ self._next_subscribe_event = now + datetime.timedelta(seconds=
+ self._max_duration)
+ self._lock.acquire()
+ try:
+ dead_ss = []
+ for sid,ss in self._subscriptions.iteritems():
+ if now >= ss.expiration:
+ dead_ss.append(sid)
+ continue
+ if now >= ss.next_update:
+ response = []
+ objs = self._queryData(ss.query)
+ if objs:
+ for obj in objs:
+ response.append(obj.map_encode())
+ logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
+ self._send_query_response( ContentType.data,
+ ss.correlation_id,
+ ss.reply_to,
+ response)
+ ss.reset_interval(now)
+
+ next_timeout = min(ss.expiration, ss.next_update)
+ if next_timeout < self._next_subscribe_event:
+ self._next_subscribe_event = next_timeout
+
+ for sid in dead_ss:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+ #
+ # notify application of pending WorkItems
+ #
+
if self._work_q_put and self._notifier:
+ logging.debug("%s notifying application..." % self.name)
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
@@ -417,19 +542,33 @@ class Agent(Thread):
self._notifier.indication()
_callback_thread = None
+ #
+ # Sleep until messages arrive or something times out
+ #
+ next_timeout = min(next_heartbeat, self._next_subscribe_event)
+ timeout = timedelta_to_secs(next_timeout -
+ datetime.datetime.utcnow())
+ if timeout > 0.0:
+ logging.debug("%s sleeping %s seconds..." % (self.name,
+ timeout))
+ try:
+ self._session.next_receiver(timeout=timeout)
+ except Empty:
+ pass
+
+
+
+
#
# Private:
#
- def _makeAgentIndMsg(self):
+ def _makeAgentInfoBody(self):
"""
- Create an agent indication message identifying this agent
+ Create an agent indication message body identifying this agent
"""
- _map = {"_name": self.get_name(),
+ return {"_name": self.get_name(),
"_schema_timestamp": self._schema_timestamp}
- return Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.agent_ind)},
- content={MsgKey.agent_info: _map})
def _send_reply(self, msg, reply_to):
"""
@@ -458,7 +597,7 @@ class Agent(Thread):
except SendError, e:
logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
- def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+ def _send_query_response(self, content_type, cid, reply_to, objects):
"""
Send a response to a query, breaking the result into multiple
messages based on the agent's _max_msg_size config parameter
@@ -472,24 +611,28 @@ class Agent(Thread):
start = 0
end = min(total, max_count)
- while end <= total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
+ # send partial response if too many objects present
+ while end < total:
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "partial":None,
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
correlation_id = cid,
- content={msgkey:objects[start:end]})
+ content=objects[start:end])
self._send_reply(m, reply_to)
- if end == total:
- break;
start = end
end = min(total, end + max_count)
- # response terminator - last message has empty object array
- if total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
- correlation_id = cid,
- content={msgkey: []} )
- self._send_reply(m, reply_to)
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
+ correlation_id = cid,
+ content=objects[start:end])
+ self._send_reply(m, reply_to)
def _dispatch(self, msg, _direct=False):
"""
@@ -497,33 +640,32 @@ class Agent(Thread):
@param _direct: True if msg directly addressed to this agent.
"""
- logging.debug( "Message received from Console! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- except:
- logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
- return
+ # logging.debug( "Message received from Console! [%s]" % msg )
+ # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) )
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
+ logging.warning("Ignoring unrecognized message '%s'" % msg)
+ return
+ version = 2 # @todo: fix me
cmap = {}; props={}
if msg.content_type == "amqp/map":
cmap = msg.content
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_locate:
+ if opcode == OpCode.agent_locate_req:
self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
- elif opcode == OpCode.get_query:
+ elif opcode == OpCode.query_req:
self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
self._handleMethodReqMsg(msg, cmap, props, version, _direct)
- elif opcode == OpCode.cancel_subscription:
- logging.warning("!!! CANCEL_SUB TBD !!!")
- elif opcode == OpCode.create_subscription:
- logging.warning("!!! CREATE_SUB TBD !!!")
- elif opcode == OpCode.renew_subscription:
- logging.warning("!!! RENEW_SUB TBD !!!")
- elif opcode == OpCode.schema_query:
- logging.warning("!!! SCHEMA_QUERY TBD !!!")
+ elif opcode == OpCode.subscribe_req:
+ self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_refresh_req:
+ self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_cancel_ind:
+ self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -536,18 +678,28 @@ class Agent(Thread):
"""
logging.debug("_handleAgentLocateMsg")
- reply = True
- if "method" in props and props["method"] == "request":
- query = cmap.get(MsgKey.query)
- if query is not None:
- # fake a QmfData containing my identifier for the query compare
- tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
- self.get_name()},
- _object_id="my-name")
- reply = QmfQuery.from_map(query).evaluate(tmpData)
+ reply = False
+ if props.get("method") == "request":
+ # if the message is addressed to me or wildcard, process it
+ if (msg.subject == "console.ind" or
+ msg.subject == "console.ind.locate" or
+ msg.subject == "console.ind.locate." + self.name):
+ pred = msg.content
+ if not pred:
+ reply = True
+ elif isinstance(pred, type([])):
+ # fake a QmfData containing my identifier for the query compare
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred)
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
+ reply = query.evaluate(tmpData)
if reply:
- m = self._makeAgentIndMsg()
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.agent_locate_rsp},
+ content=self._makeAgentInfoBody())
m.correlation_id = msg.correlation_id
self._send_reply(m, msg.reply_to)
else:
@@ -561,22 +713,25 @@ class Agent(Thread):
logging.debug("_handleQueryMsg")
if "method" in props and props["method"] == "request":
- qmap = cmap.get(MsgKey.query)
- if qmap:
- query = QmfQuery.from_map(qmap)
+ if cmap:
+ try:
+ query = QmfQuery.from_map(cmap)
+ except TypeError:
+ logging.error("Invalid Query format: '%s'" % str(cmap))
+ return
target = query.get_target()
if target == QmfQuery.TARGET_PACKAGES:
- self._queryPackages( msg, query )
+ self._queryPackagesReply( msg, query )
elif target == QmfQuery.TARGET_SCHEMA_ID:
- self._querySchema( msg, query, _idOnly=True )
+ self._querySchemaReply( msg, query, _idOnly=True )
elif target == QmfQuery.TARGET_SCHEMA:
- self._querySchema( msg, query)
+ self._querySchemaReply( msg, query)
elif target == QmfQuery.TARGET_AGENT:
logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
elif target == QmfQuery.TARGET_OBJECT_ID:
- self._queryData(msg, query, _idOnly=True)
+ self._queryDataReply(msg, query, _idOnly=True)
elif target == QmfQuery.TARGET_OBJECT:
- self._queryData(msg, query)
+ self._queryDataReply(msg, query)
else:
logging.warning("Unrecognized query target: '%s'" % str(target))
@@ -634,7 +789,173 @@ class Agent(Thread):
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
- def _queryPackages(self, msg, query):
+ def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Subscription Request
+ """
+ if "method" in props and props["method"] == "request":
+ query_map = cmap.get("_query")
+ interval = cmap.get("_interval")
+ duration = cmap.get("_duration")
+
+ try:
+ query = QmfQuery.from_map(query_map)
+ except TypeError:
+ logging.warning("Invalid query for subscription: %s" %
+ str(query_map))
+ return
+
+ if isinstance(self, AgentExternal):
+ # param = SubscriptionParams(_ConsoleHandle(console_handle,
+ # msg.reply_to),
+ # query,
+ # interval,
+ # duration,
+ # msg.user_id)
+ # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
+ # msg.correlation_id, param))
+ # self._work_q_put = True
+ logging.error("External Subscription TBD")
+ return
+
+ # validate the query - only specific objects, or
+ # objects wildcard, are currently supported.
+ if (query.get_target() != QmfQuery.TARGET_OBJECT or
+ (query.get_selector() == QmfQuery.PREDICATE and
+ query.get_predicate())):
+ logging.error("Subscriptions only support (wildcard) Object"
+ " Queries.")
+ err = QmfData.create(
+ {"reason": "Unsupported Query type for subscription.",
+ "query": str(query.map_encode())})
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content={"_error": err.map_encode()})
+ self._send_reply(m, msg.reply_to)
+ return
+
+ if duration is None:
+ duration = self._default_duration
+ else:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.warning("Bad duration value: %s" % str(msg))
+ duration = self._default_duration
+
+ if interval is None:
+ interval = self._default_interval
+ else:
+ try:
+ interval = float(interval)
+ if interval < self._min_interval:
+ interval = self._min_interval
+ except:
+ logging.warning("Bad interval value: %s" % str(msg))
+ interval = self._default_interval
+
+ ss = _SubscriptionState(msg.reply_to,
+ msg.correlation_id,
+ query,
+ interval,
+ duration)
+ self._lock.acquire()
+ try:
+ sid = self._subscription_id
+ self._subscription_id += 1
+ ss.id = sid
+ self._subscriptions[sid] = ss
+ self._next_subscribe_event = None
+ finally:
+ self._lock.release()
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": interval,
+ "_duration": duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+
+ def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Renew Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.error("Invalid subscription refresh msg: %s" %
+ str(msg))
+ return
+
+ self._lock.acquire()
+ try:
+ ss = self._subscriptions.get(sid)
+ if not ss:
+ logging.error("Ignoring unknown subscription: %s" %
+ str(sid))
+ return
+ duration = cmap.get("_duration")
+ if duration is not None:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.error("Bad duration value: %s" % str(msg))
+ duration = None # use existing duration
+
+ ss.resubscribe(datetime.datetime.utcnow(), duration)
+
+ new_duration = ss.duration
+ new_interval = ss.interval
+
+ finally:
+ self._lock.release()
+
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": new_interval,
+ "_duration": new_duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_refresh_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+ def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Cancel Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.warning("No subscription id supplied: %s" % msg)
+ return
+
+ self._lock.acquire()
+ try:
+ if sid in self._subscriptions:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+
+ def _queryPackagesReply(self, msg, query):
"""
Run a query against the list of known packages
"""
@@ -646,58 +967,83 @@ class Agent(Thread):
_object_id="_package")
if query.evaluate(qmfData):
pnames.append(name)
+
+ self._send_query_response(ContentType.schema_package,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
finally:
self._lock.release()
- self._send_query_response(make_subject(OpCode.data_ind),
- MsgKey.package_info,
- msg.correlation_id,
- msg.reply_to,
- pnames)
- def _querySchema( self, msg, query, _idOnly=False ):
+ def _querySchemaReply( self, msg, query, _idOnly=False ):
"""
"""
schemas = []
- # if querying for a specific schema, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
found = self._schema.get(query.get_id())
- finally:
- self._lock.release()
- if found:
- if _idOnly:
- schemas.append(query.get_id().map_encode())
- else:
- schemas.append(found.map_encode())
- else: # otherwise, evaluate all schema
- self._lock.acquire()
- try:
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
for sid,val in self._schema.iteritems():
if query.evaluate(val):
if _idOnly:
schemas.append(sid.map_encode())
else:
schemas.append(val.map_encode())
- finally:
- self._lock.release()
+ if _idOnly:
+ msgkey = ContentType.schema_id
+ else:
+ msgkey = ContentType.schema_class
- if _idOnly:
- msgkey = MsgKey.schema_id
- else:
- msgkey = MsgKey.schema
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
+ finally:
+ self._lock.release()
+
+
+ def _queryDataReply( self, msg, query, _idOnly=False ):
+ """
+ """
+ # hold the (recursive) lock for the duration so the Agent
+ # won't send data that is currently being modified by the
+ # app.
+ self._lock.acquire()
+ try:
+ response = []
+ data_objs = self._queryData(query)
+ if _idOnly:
+ for obj in data_objs:
+ response.append(obj.get_object_id())
+ else:
+ for obj in data_objs:
+ response.append(obj.map_encode())
+
+ if _idOnly:
+ msgkey = ContentType.object_id
+ else:
+ msgkey = ContentType.data
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- schemas)
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ response)
+ finally:
+ self._lock.release()
- def _queryData( self, msg, query, _idOnly=False ):
+ def _queryData(self, query):
"""
+ Return a list of QmfData objects that match a given query
"""
data_objs = []
# extract optional schema_id from target params
@@ -705,12 +1051,12 @@ class Agent(Thread):
t_params = query.get_target_param()
if t_params:
sid = t_params.get(QmfData.KEY_SCHEMA_ID)
- # if querying for a specific object, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- oid = query.get_id()
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -718,11 +1064,9 @@ class Agent(Thread):
and name.get_package_name() == sid.get_package_name()):
found = db.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
+ data_objs.append(found)
else:
+ found = None
if sid:
db = self._described_data.get(sid)
if db:
@@ -730,15 +1074,9 @@ class Agent(Thread):
else:
found = self._undescribed_data.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
- finally:
- self._lock.release()
- else: # otherwise, evaluate all data
- self._lock.acquire()
- try:
+ data_objs.append(found)
+
+ else: # otherwise, evaluate all data
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -746,10 +1084,7 @@ class Agent(Thread):
and name.get_package_name() == sid.get_package_name()):
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
+ data_objs.append(data)
else:
if sid:
db = self._described_data.get(sid)
@@ -759,23 +1094,28 @@ class Agent(Thread):
if db:
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
- finally:
- self._lock.release()
+ data_objs.append(data)
+ finally:
+ self._lock.release()
- if _idOnly:
- msgkey = MsgKey.object_id
- else:
- msgkey = MsgKey.data_obj
+ return data_objs
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- data_objs)
+
+
+ ##==============================================================================
+ ## EXTERNAL DATABASE AGENT
+ ##==============================================================================
+
+class AgentExternal(Agent):
+ """
+ An Agent which uses an external management database.
+ """
+ def __init__(self, name, _domain=None, _notifier=None,
+ _heartbeat_interval=30, _max_msg_size=0, _capacity=10):
+ super(AgentExternal, self).__init__(name, _domain, _notifier,
+ _heartbeat_interval,
+ _max_msg_size, _capacity)
+ logging.error("AgentExternal TBD")
@@ -823,9 +1163,11 @@ class QmfAgentData(QmfData):
_schema_id=schema_id, _const=False)
self._agent = agent
self._validated = False
+ self._modified = True
def destroy(self):
self._dtime = long(time.time() * 1000)
+ self._touch()
# @todo: publish change
def is_deleted(self):
@@ -833,6 +1175,7 @@ class QmfAgentData(QmfData):
def set_value(self, _name, _value, _subType=None):
super(QmfAgentData, self).set_value(_name, _value, _subType)
+ self._touch()
# @todo: publish change
def inc_value(self, name, delta=1):
@@ -849,6 +1192,7 @@ class QmfAgentData(QmfData):
""" subtract the delta from the property """
# @todo: need to take write-lock
logging.error(" TBD!!!")
+ self._touch()
def validate(self):
"""
@@ -868,6 +1212,13 @@ class QmfAgentData(QmfData):
raise Exception("Required property '%s' not present." % name)
self._validated = True
+ def _touch(self):
+ """
+ Mark this object as modified. Used to force a publish of this object
+ if on subscription.
+ """
+ self._modified = True
+
################################################################################
diff --git a/qpid/extras/qmf/src/py/qmf2/common.py b/qpid/extras/qmf/src/py/qmf2/common.py
index 8107b86666..8070add806 100644
--- a/qpid/extras/qmf/src/py/qmf2/common.py
+++ b/qpid/extras/qmf/src/py/qmf2/common.py
@@ -34,61 +34,44 @@ log_query = getLogger("qmf.query")
## Constants
##
-AMQP_QMF_SUBJECT = "qmf"
-AMQP_QMF_VERSION = 4
-AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
-
-class MsgKey(object):
- agent_info = "agent_info"
- query = "query"
- package_info = "package_info"
- schema_id = "schema_id"
- schema = "schema"
- object_id="object_id"
- data_obj="object"
- method="method"
- event="event"
+QMF_APP_ID="qmf2"
-class OpCode(object):
- noop = "noop"
-
- # codes sent by a console and processed by the agent
- agent_locate = "agent-locate"
- cancel_subscription = "cancel-subscription"
- create_subscription = "create-subscription"
- get_query = "get-query"
- method_req = "method"
- renew_subscription = "renew-subscription"
- schema_query = "schema-query" # @todo: deprecate
-
- # codes sent by the agent to a console
- agent_ind = "agent"
- data_ind = "data"
- event_ind = "event"
- managed_object = "managed-object"
- object_ind = "object"
- response = "response"
- schema_ind="schema" # @todo: deprecate
+class ContentType(object):
+ """ Values for the 'qmf.content' message header
+ """
+ schema_package = "_schema_package"
+ schema_id = "_schema_id"
+ schema_class = "_schema_class"
+ object_id = "_object_id"
+ data = "_data"
+ event = "_event"
+class OpCode(object):
+ """ Values for the 'qmf.opcode' message header.
+ """
+ noop = "_noop"
+ # codes sent by a console and processed by the agent
+ agent_locate_req = "_agent_locate_request"
+ subscribe_req = "_subscribe_request"
+ subscribe_cancel_ind = "_subscribe_cancel_indication"
+ subscribe_refresh_req = "_subscribe_refresh_indication"
+ query_req = "_query_request"
+ method_req = "_method_request"
-def make_subject(_code):
- """
- Create a message subject field value.
- """
- return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
+ # codes sent by the agent to a console
+ agent_locate_rsp = "_agent_locate_response"
+ agent_heartbeat_ind = "_agent_heartbeat_indication"
+ query_rsp = "_query_response"
+ subscribe_rsp = "_subscribe_response"
+ subscribe_refresh_rsp = "_subscribe_refresh_response"
+ data_ind = "_data_indication"
+ method_rsp = "_method_response"
-def parse_subject(_sub):
- """
- Deconstruct a subject field, return version,opcode values
- """
- if _sub[:3] != "qmf":
- raise Exception("Non-QMF message received")
- return _sub[3:].split('.', 1)
def timedelta_to_secs(td):
"""
@@ -133,11 +116,15 @@ class WorkItem(object):
AGENT_HEARTBEAT=8
QUERY_COMPLETE=9
METHOD_RESPONSE=10
+ SUBSCRIBE_RESPONSE=11
+ SUBSCRIBE_INDICATION=12
+ RESUBSCRIBE_RESPONSE=13
# Enumeration of the types of WorkItems produced on the Agent
METHOD_CALL=1000
QUERY=1001
- SUBSCRIBE=1002
- UNSUBSCRIBE=1003
+ SUBSCRIBE_REQUEST=1002
+ RESUBSCRIBE_REQUEST=1003
+ UNSUBSCRIBE_REQUEST=1004
def __init__(self, kind, handle, _params=None):
"""
diff --git a/qpid/extras/qmf/src/py/qmf2/console.py b/qpid/extras/qmf/src/py/qmf2/console.py
index c13cf70755..afd20c3655 100644
--- a/qpid/extras/qmf/src/py/qmf2/console.py
+++ b/qpid/extras/qmf/src/py/qmf2/console.py
@@ -24,14 +24,14 @@ import time
import datetime
import Queue
from threading import Thread, Event
-from threading import Lock
+from threading import RLock
from threading import currentThread
from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
-from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId,
SchemaEventClass, SchemaObjectClass, WorkItem,
SchemaMethod, QmfEvent, timedelta_to_secs)
@@ -141,6 +141,7 @@ class _AsyncMailbox(_Mailbox):
console._lock.acquire()
try:
console._async_mboxes[self.cid] = self
+ console._next_mbox_expire = None
finally:
console._lock.release()
@@ -150,6 +151,24 @@ class _AsyncMailbox(_Mailbox):
console._wake_thread()
+ def reset_timeout(self, _timeout=None):
+ """ Reset the expiration date for this mailbox.
+ """
+ if _timeout is None:
+ _timeout = self.console._reply_timeout
+ self.console._lock.acquire()
+ try:
+ self.expiration_date = (datetime.datetime.utcnow() +
+ datetime.timedelta(seconds=_timeout))
+ self.console._next_mbox_expire = None
+ finally:
+ self.console._lock.release()
+
+ # wake the console mgmt thread so it will learn about the mbox
+ # expiration date (and adjust its idle sleep period correctly)
+
+ self.console._wake_thread()
+
def deliver(self, msg):
"""
"""
@@ -177,7 +196,7 @@ class _QueryMailbox(_AsyncMailbox):
def __init__(self, console,
agent_name,
context,
- target, msgkey,
+ target,
_timeout=None):
"""
Invoked by application thread.
@@ -186,7 +205,6 @@ class _QueryMailbox(_AsyncMailbox):
_timeout)
self.agent_name = agent_name
self.target = target
- self.msgkey = msgkey
self.context = context
self.result = []
@@ -195,11 +213,8 @@ class _QueryMailbox(_AsyncMailbox):
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
- done = False
- objects = reply.content.get(self.msgkey)
- if not objects:
- done = True
- else:
+ objects = reply.content
+ if isinstance(objects, type([])):
# convert from map to native types if needed
if self.target == QmfQuery.TARGET_SCHEMA_ID:
for sid_map in objects:
@@ -237,8 +252,7 @@ class _QueryMailbox(_AsyncMailbox):
# no conversion needed.
self.result += objects
- if done:
- # create workitem
+ if not "partial" in reply.properties:
# logging.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -278,8 +292,8 @@ class _SchemaPrefetchMailbox(_AsyncMailbox):
Process schema response messages.
"""
done = False
- schemas = reply.content.get(MsgKey.schema)
- if schemas:
+ schemas = reply.content
+ if schemas and isinstance(schemas, type([])):
for schema_map in schemas:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
@@ -319,8 +333,8 @@ class _MethodMailbox(_AsyncMailbox):
Invoked by Console Management thread only.
"""
- _map = reply.content.get(MsgKey.method)
- if not _map:
+ _map = reply.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
result = None
else:
@@ -355,6 +369,207 @@ class _MethodMailbox(_AsyncMailbox):
+class _SubscriptionMailbox(_AsyncMailbox):
+ """
+ A Mailbox for a single subscription. Allows only sychronous "subscribe"
+ and "refresh" requests.
+ """
+ def __init__(self, console, context, agent, duration, interval):
+ """
+ Invoked by application thread.
+ """
+ super(_SubscriptionMailbox, self).__init__(console, duration)
+ self.cv = Condition()
+ self.data = []
+ self.result = []
+ self.context = context
+ self.duration = duration
+ self.interval = interval
+ self.agent_name = agent.get_name()
+ self.agent_subscription_id = None # from agent
+
+ def subscribe(self, query):
+ agent = self.console.get_agent(self.agent_name)
+ if not agent:
+ logging.warning("subscribed failed - unknown agent '%s'" %
+ self.agent_name)
+ return False
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+ agent._send_subscribe_req(query, self.get_address(), self.interval,
+ self.duration)
+ except SendError, e:
+ logging.error(str(e))
+ return False
+ return True
+
+ def resubscribe(self, duration):
+ agent = self.console.get_agent(self.agent_name)
+ if not agent:
+ logging.warning("resubscribed failed - unknown agent '%s'" %
+ self.agent_name)
+ return False
+ try:
+ logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ agent._send_resubscribe_req(self.get_address(),
+ self.agent_subscription_id, duration)
+ except SendError, e:
+ logging.error(str(e))
+ return False
+ return True
+
+ def deliver(self, msg):
+ """
+ """
+ opcode = msg.properties.get("qmf.opcode")
+ if (opcode == OpCode.subscribe_rsp or
+ opcode == OpCode.subscribe_refresh_rsp):
+
+ error = msg.content.get("_error")
+ if error:
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ logging.warning("Invalid QmfData map received: '%s'"
+ % str(error))
+ e_map = QmfData.create({"error":"Unknown error"})
+ sp = SubscribeParams(None, None, None, e_map)
+ else:
+ self.agent_subscription_id = msg.content.get("_subscription_id")
+ self.duration = msg.content.get("_duration", self.duration)
+ self.interval = msg.content.get("_interval", self.interval)
+ self.reset_timeout(self.duration)
+ sp = SubscribeParams(self.get_address(),
+ self.interval,
+ self.duration,
+ None)
+ self.cv.acquire()
+ try:
+ self.data.append(sp)
+ # if was empty, notify waiters
+ if len(self.data) == 1:
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ return
+
+ # else: data indication
+ agent_name = msg.properties.get("qmf.agent")
+ if not agent_name:
+ logging.warning("Ignoring data_ind - no agent name given: %s" %
+ msg)
+ return
+ agent = self.console.get_agent(agent_name)
+ if not agent:
+ logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ agent_name)
+ return
+
+ objects = msg.content
+ for obj_map in objects:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self.console._prefetch_schema(sid, agent)
+ self.result.append(obj)
+
+ if not "partial" in msg.properties:
+ wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result)
+ self.result = []
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ def fetch(self, timeout=None):
+ """
+ Get one data item from a mailbox, with timeout.
+ Invoked by application thread.
+ """
+ self.cv.acquire()
+ try:
+ if len(self.data) == 0:
+ self.cv.wait(timeout)
+ if len(self.data):
+ return self.data.pop(0)
+ return None
+ finally:
+ self.cv.release()
+
+ def expire(self):
+ """ The subscription expired.
+ """
+ self.destroy()
+
+
+
+
+class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
+ """
+ A Mailbox for a single subscription. Allows only asychronous "subscribe"
+ and "refresh" requests.
+ """
+ def __init__(self, console, context, agent, duration, interval):
+ """
+ Invoked by application thread.
+ """
+ super(_AsyncSubscriptionMailbox, self).__init__(console, context,
+ agent, duration,
+ interval)
+ self.subscribe_pending = False
+ self.resubscribe_pending = False
+
+
+ def subscribe(self, query, reply_timeout):
+ if super(_AsyncSubscriptionMailbox, self).subscribe(query):
+ self.subscribe_pending = True
+ self.reset_timeout(reply_timeout)
+ return True
+ return False
+
+ def resubscribe(self, duration, reply_timeout):
+ if super(_AsyncSubscriptionMailbox, self).resubscribe(duration):
+ self.resubscribe_pending = True
+ self.reset_timeout(reply_timeout)
+ return True
+ return False
+
+ def deliver(self, msg):
+ """
+ """
+ super(_AsyncSubscriptionMailbox, self).deliver(msg)
+ sp = self.fetch(0)
+ if sp:
+ # if the message was a reply to a subscribe or
+ # re-subscribe, then we get here.
+ if self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ self.context, sp)
+ else:
+ wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ self.context, sp)
+
+ self.subscribe_pending = False
+ self.resubscribe_pending = False
+
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ if not sp.succeeded():
+ self.destroy()
+
+
+ def expire(self):
+ """ Either the subscription expired, or a request timedout.
+ """
+ if self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ self.context, None)
+ elif self.resubscribe_pending:
+ wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ self.context, None)
+ self.destroy()
+
+
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -481,8 +696,8 @@ class QmfConsoleData(QmfData):
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -650,8 +865,8 @@ class Agent(object):
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -676,20 +891,66 @@ class Agent(object):
def _send_query(self, query, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.get_query)},
- content={MsgKey.query: query.map_encode()})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.query_req},
+ content=query.map_encode())
self._send_msg( msg, correlation_id )
def _send_method_req(self, mr_map, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.method_req)},
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.method_req},
content=mr_map)
self._send_msg( msg, correlation_id )
+ def _send_subscribe_req(self, query, correlation_id, _interval=None,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_query":query.map_encode()}
+ if _interval is not None:
+ sr_map["_interval"] = _interval
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_resubscribe_req(self, correlation_id,
+ subscription_id,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_refresh_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_unsubscribe_ind(self, correlation_id, subscription_id):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_cancel_ind},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
##==============================================================================
## METHOD CALL
@@ -716,6 +977,36 @@ class MethodResult(object):
return arg
+
+ ##==============================================================================
+ ## SUBSCRIPTION
+ ##==============================================================================
+
+class SubscribeParams(object):
+ """ Represents a standing subscription for this console.
+ """
+ def __init__(self, sid, interval, duration, _error=None):
+ self._sid = sid
+ self._interval = interval
+ self._duration = duration
+ self._error = _error
+
+ def succeeded(self):
+ return self._error is None
+
+ def get_error(self):
+ return self._error
+
+ def get_subscription_id(self):
+ return self._sid
+
+ def get_publish_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+
##==============================================================================
## CONSOLE
##==============================================================================
@@ -753,7 +1044,7 @@ class Console(Thread):
self._domain = _domain
self._address = QmfAddress.direct(self._name, self._domain)
self._notifier = notifier
- self._lock = Lock()
+ self._lock = RLock()
self._conn = None
self._session = None
# dict of "agent-direct-address":class Agent entries
@@ -766,6 +1057,7 @@ class Console(Thread):
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
+ self._subscribe_timeout = 300 # @todo: parameterize
self._next_agent_expire = None
self._next_mbox_expire = None
# for passing WorkItems to the application
@@ -776,18 +1068,6 @@ class Console(Thread):
self._post_office = {} # indexed by cid
self._async_mboxes = {} # indexed by cid, used to expire them
- ## Old stuff below???
- #self._broker_list = []
- #self.impl = qmfengine.Console()
- #self._event = qmfengine.ConsoleEvent()
- ##self._cv = Condition()
- ##self._sync_count = 0
- ##self._sync_result = None
- ##self._select = {}
- ##self._cb_cond = Condition()
-
-
-
def destroy(self, timeout=None):
"""
Must be called before the Console is deleted.
@@ -801,8 +1081,6 @@ class Console(Thread):
self.remove_connection(self._conn, timeout)
logging.debug("Console Destroyed")
-
-
def add_connection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
@@ -934,10 +1212,11 @@ class Console(Thread):
cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject="console.ind.locate." + name,
+ msg = Message(id=QMF_APP_ID,
+ subject="console.ind.locate." + name,
properties={"method":"request",
- "qmf.subject":make_subject(OpCode.agent_locate)},
- content={MsgKey.query: query.map_encode()})
+ "qmf.opcode":OpCode.agent_locate_req},
+ content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
logging.debug("Sending Agent Locate (%s)" % time.time())
@@ -995,23 +1274,13 @@ class Console(Thread):
def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
"""
"""
- query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
- QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
- QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
- QmfQuery.TARGET_SCHEMA: MsgKey.schema,
- QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
- QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
target = query.get_target()
- msgkey = query_keymap.get(target)
- if not msgkey:
- raise Exception("Invalid target for query: %s" % str(query))
if _reply_handle is not None:
mbox = _QueryMailbox(self,
agent.get_name(),
_reply_handle,
- target, msgkey,
+ target,
_timeout)
else:
mbox = _SyncMailbox(self)
@@ -1045,9 +1314,8 @@ class Console(Thread):
logging.debug("Query wait timed-out.")
break
- objects = reply.content.get(msgkey)
- if not objects:
- # last response is empty
+ objects = reply.content
+ if not objects or not isinstance(objects, type([])):
break
# convert from map to native types if needed
@@ -1081,21 +1349,116 @@ class Console(Thread):
# no conversion needed.
response += objects
+ if not "partial" in reply.properties:
+ # reply not broken up over multiple msgs
+ break
+
now = datetime.datetime.utcnow()
mbox.destroy()
return response
+
+ def create_subscription(self, agent, query, console_handle,
+ _interval=None, _duration=None,
+ _blocking=True, _timeout=None):
+ if not _duration:
+ _duration = self._subscribe_timeout
+
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ if not _blocking:
+ mbox = _AsyncSubscriptionMailbox(self, console_handle, agent,
+ _duration, _interval)
+ if not mbox.subscribe(query, _timeout):
+ mbox.destroy()
+ return False
+ return True
+ else:
+ mbox = _SubscriptionMailbox(self, console_handle, agent, _duration,
+ _interval)
+
+ if not mbox.subscribe(query):
+ mbox.destroy()
+ return None
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ # @todo: what if mbox expires here?
+ sp = mbox.fetch(_timeout)
+
+ if not sp:
+ logging.debug("Subscription request wait timed-out.")
+ mbox.destroy()
+ return None
+
+ if not sp.succeeded():
+ mbox.destroy()
+
+ return sp
+
+ def refresh_subscription(self, subscription_id,
+ _duration=None,
+ _timeout=None):
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ logging.warning("Subscription %s not found." % subscription_id)
+ return None
+
+ if isinstance(mbox, _AsyncSubscriptionMailbox):
+ return mbox.resubscribe(_duration, _timeout)
+ else:
+ # synchronous - wait for reply
+ if not mbox.resubscribe(_duration):
+ # @todo ???? mbox.destroy()
+ return None
+
+ # wait for reply
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ sp = mbox.fetch(_timeout)
+
+ if not sp:
+ logging.debug("re-subscribe request wait timed-out.")
+ # @todo???? mbox.destroy()
+ return None
+
+ return sp
+
+
+ def cancel_subscription(self, subscription_id):
+ """
+ """
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ return
+
+ agent = self.get_agent(mbox.agent_name)
+ if agent:
+ try:
+ logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ agent._send_unsubscribe_ind(subscription_id,
+ mbox.agent_subscription_id)
+ except SendError, e:
+ logging.error(str(e))
+
+ mbox.destroy()
+
+
def _wake_thread(self):
"""
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
logging.debug("Sending noop to wake up [%s]" % self._address)
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self._name,
- content={"noop":"noop"})
+ properties={"method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
@@ -1152,9 +1515,17 @@ class Console(Thread):
# to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
next_expire = self._next_agent_expire
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
+
+ # the mailbox expire flag may be cleared by the
+ # app thread(s)
+ self._lock.acquire()
+ try:
+ if (self._next_mbox_expire and
+ self._next_mbox_expire < next_expire):
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
+
if next_expire > now:
timeout = timedelta_to_secs(next_expire - now)
try:
@@ -1268,13 +1639,14 @@ class Console(Thread):
"""
PRIVATE: Process a message received from an Agent
"""
- logging.debug( "Message received from Agent! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- # @todo: deal with version mismatch!!!
- except:
+ #logging.debug( "Message received from Agent! [%s]" % msg )
+ #logging.error( "Message received from Agent! [%s]" % msg )
+
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
logging.error("Ignoring unrecognized message '%s'" % msg)
return
+ version = 2 # @todo: fix me
cmap = {}; props = {}
if msg.content_type == "amqp/map":
@@ -1282,20 +1654,23 @@ class Console(Thread):
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_ind:
+ if opcode == OpCode.agent_heartbeat_ind:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
- elif opcode == OpCode.data_ind:
- self._handle_data_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.event_ind:
- self._handle_event_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.managed_object:
- logging.warning("!!! managed_object TBD !!!")
- elif opcode == OpCode.object_ind:
- logging.warning("!!! object_ind TBD !!!")
- elif opcode == OpCode.response:
+ elif opcode == OpCode.agent_locate_rsp:
+ self._handle_agent_ind_msg( msg, cmap, version, _direct )
+ elif opcode == OpCode.query_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.subscribe_rsp:
self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.schema_ind:
- logging.warning("!!! schema_ind TBD !!!")
+ elif opcode == OpCode.subscribe_refresh_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.method_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.data_ind:
+ if msg.correlation_id:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ else:
+ self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -1309,7 +1684,7 @@ class Console(Thread):
"""
logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
- ai_map = cmap.get(MsgKey.agent_info)
+ ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
logging.warning("Bad agent-ind message received: '%s'" % msg)
return
@@ -1359,29 +1734,10 @@ class Console(Thread):
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_data_ind_msg(self, msg, cmap, version, direct):
- """
- Process a received data-ind message.
- """
- logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
-
- mbox = self._get_mailbox(msg.correlation_id)
- if not mbox:
- logging.debug("Data indicate received with unknown correlation_id"
- " msg='%s'" % str(msg))
- return
-
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" %
- msg.correlation_id)
- mbox.deliver(msg)
-
-
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- # @todo code replication - clean me.
logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
@@ -1394,19 +1750,22 @@ class Console(Thread):
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_event_ind_msg(self, msg, cmap, version, _direct):
- ei_map = cmap.get(MsgKey.event)
- if not ei_map or not isinstance(ei_map, type({})):
- logging.warning("Bad event indication message received: '%s'" % msg)
- return
+ def _handle_indication_msg(self, msg, cmap, version, _direct):
- aname = ei_map.get("_name")
- emap = ei_map.get("_event")
+ aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No '_name' field in event indication message.")
+ logging.debug("No agent name field in indication message.")
return
- if not emap:
- logging.debug("No '_event' field in event indication message.")
+
+ content_type = msg.properties.get("qmf.content")
+ if (content_type != ContentType.event or
+ not isinstance(msg.content, type([]))):
+ logging.warning("Bad event indication message received: '%s'" % msg)
+ return
+
+ emap = msg.content[0]
+ if not isinstance(emap, type({})):
+ logging.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1439,13 +1798,13 @@ class Console(Thread):
"""
Check all async mailboxes for outstanding requests that have expired.
"""
- now = datetime.datetime.utcnow()
- if self._next_mbox_expire and now < self._next_mbox_expire:
- return
- expired_mboxes = []
- self._next_mbox_expire = None
self._lock.acquire()
try:
+ now = datetime.datetime.utcnow()
+ if self._next_mbox_expire and now < self._next_mbox_expire:
+ return
+ expired_mboxes = []
+ self._next_mbox_expire = None
for mbox in self._async_mboxes.itervalues():
if now >= mbox.expiration_date:
expired_mboxes.append(mbox)
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
index 186f09349e..eff9357e1f 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
@@ -27,3 +27,4 @@ import events
import multi_response
import async_query
import async_method
+import subscriptions
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
index 59b65221e0..0e5e595695 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
@@ -52,8 +52,8 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat)
# No database needed for this test
self.running = False
self.ready = Event()
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
index 556b62756f..965a254f26 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
index 3a9a767bf0..7c7b22fdaf 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
index be2bdff9ab..22accb7cfc 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
index dd321cb4bb..be67c36d87 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
@@ -53,7 +53,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/events.py b/qpid/extras/qmf/src/py/qmf2/tests/events.py
index e55dc8572e..bc6465f25b 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/events.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/events.py
@@ -58,7 +58,7 @@ class _agentApp(Thread):
self.notifier = _testNotifier()
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
index d3d00a70c5..7c24435e79 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
@@ -60,8 +60,8 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat,
- _max_msg_size=_MAX_OBJS_PER_MSG)
+ heartbeat_interval=heartbeat,
+ max_msg_size=_MAX_OBJS_PER_MSG)
# Dynamically construct a management database
for i in range(self.schema_count):
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
index 5b1446bb3a..466457d670 100644
--- a/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
+++ b/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
@@ -54,7 +54,7 @@ class _agentApp(Thread):
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Management Database
# - two different schema packages,
diff --git a/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
new file mode 100644
index 0000000000..750952df46
--- /dev/null
+++ b/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
@@ -0,0 +1,808 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+import datetime
+import time
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, WorkItem)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.broker_url = broker_url
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat,
+ max_duration=10,
+ default_duration=7,
+ min_duration=5,
+ min_interval=1,
+ default_interval=2)
+
+ # Management Database
+ # - two different schema packages,
+ # - two classes within one schema package
+ # - multiple objects per schema package+class
+ # - two "undescribed" objects
+
+ # "package1/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class1"),
+ _desc="A test data schema - one",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p1c1_key1"},
+ _schema=_schema)
+ _obj.set_value("count1", 0)
+ _obj.set_value("count2", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p1c1_key2"},
+ _schema=_schema )
+ _obj.set_value("count1", 9)
+ _obj.set_value("count2", 10)
+ self.agent.add_object( _obj )
+
+ # "package1/class2"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package1", "class2"),
+ _desc="A test data schema - two",
+ _object_id_names=["name"] )
+ # add properties
+ _schema.add_property( "name", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "string1", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent,
+ _values={"name":"p1c2_name1"},
+ _schema=_schema )
+ _obj.set_value("string1", "a data string")
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent,
+ _values={"name":"p1c2_name2"},
+ _schema=_schema )
+ _obj.set_value("string1", "another data string")
+ self.agent.add_object( _obj )
+
+
+ # "package2/class1"
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("package2", "class1"),
+ _desc="A test data schema - second package",
+ _object_id_names=["key"] )
+
+ _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "counter", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ self.agent.register_object_class(_schema)
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p2c1_key1"},
+ _schema=_schema )
+ _obj.set_value("counter", 0)
+ self.agent.add_object( _obj )
+
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p2c1_key2"},
+ _schema=_schema )
+ _obj.set_value("counter", 2112)
+ self.agent.add_object( _obj )
+
+
+ # add two "unstructured" objects to the Agent
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-1")
+ _obj.set_value("field1", "a value")
+ _obj.set_value("field2", 2)
+ _obj.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj.set_value("field4", ["a", "list", "value"])
+ self.agent.add_object(_obj)
+
+
+ _obj = QmfAgentData(self.agent, _object_id="undesc-2")
+ _obj.set_value("key-1", "a value")
+ _obj.set_value("key-2", 2)
+ self.agent.add_object(_obj)
+
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
+ self.running = True
+ self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
+
+ def stop_app(self):
+ self.running = False
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
+
+
+class BaseTest(unittest.TestCase):
+ agent_count = 5
+
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ self.agents = []
+ for i in range(self.agent_count):
+ agent = _agentApp("agent-" + str(i), self.broker, 1)
+ agent.start_app()
+ self.agents.append(agent)
+ #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow())
+
+ def tearDown(self):
+ #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow())
+ for agent in self.agents:
+ if agent is not None:
+ agent.stop_app()
+
+
+ def test_sync_by_schema(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now subscribe to agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ index)
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ self.assertTrue(sp.get_duration() == 10)
+ self.assertTrue(sp.get_publish_interval() == 2)
+
+ subscriptions.append([sp, 0])
+ index += 1
+
+ # now wait for the (2 * interval) and count the updates
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 2)
+ for obj in reply:
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+ obj.get_object_id() == "p1c1_key1")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package1")
+ self.assertTrue(sid.get_class_name() == "class1")
+
+ self.assertTrue(wi.get_handle() < len(subscriptions))
+ subscriptions[wi.get_handle()][1] += 1
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 5 * len(subscriptions))
+ for ii in range(len(subscriptions)):
+ self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+ def test_sync_by_obj_id(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ # sid = SchemaClassId.create("package1", "class1")
+ # t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_id_object("undesc-2")
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now subscribe to agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ index)
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ subscriptions.append([sp, 0])
+ index += 1
+
+ # now wait for all subscriptions to expire (2x interval w/o
+ # indications)
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "undesc-2")
+ # print("!!! get_params() = %s" % wi.get_params())
+ self.assertTrue(wi.get_handle() < len(subscriptions))
+ subscriptions[wi.get_handle()][1] += 1
+ # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
+ # self.assertTrue(reply.succeeded())
+ # self.assertTrue(reply.get_argument("cookie") ==
+ # wi.get_handle())
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 5 * len(subscriptions))
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+ def test_sync_by_obj_id_schema(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now subscribe to agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ index)
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ subscriptions.append([sp, 0])
+ index += 1
+
+ # now wait for all subscriptions to expire (2x interval w/o
+ # indications)
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() < len(subscriptions))
+ subscriptions[wi.get_handle()][1] += 1
+ # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
+ # self.assertTrue(reply.succeeded())
+ # self.assertTrue(reply.get_argument("cookie") ==
+ # wi.get_handle())
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 5 * len(subscriptions))
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+
+ def test_sync_refresh(self):
+ # create console
+ # find one agent
+ # subscribe to changes to any object in package1/class1
+ # after 3 data indications, refresh
+ # verify > 5 more data indications received
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "my-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.release_workitem(wi)
+
+ if r_count == 3:
+ rp = self.console.refresh_subscription(sp.get_subscription_id())
+ self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count > 5)
+ # print("!!! total r_count=%d" % r_count)
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+
+ def test_sync_cancel(self):
+ # create console
+ # find one agent
+ # subscribe to changes to any object in package1/class1
+ # after 2 data indications, cancel subscription
+ # verify < 5 data indications received
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "my-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.release_workitem(wi)
+
+ if r_count == 3:
+ self.console.cancel_subscription(sp.get_subscription_id())
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription full duration
+ self.assertTrue(r_count < 5)
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+
+ self.console.destroy(10)
+
+
+ def test_async_by_obj_id_schema(self):
+ # create console
+ # find one agent
+ # async subscribe to changes to any object in package1/class1
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ rc = self.console.create_subscription(agent,
+ query,
+ "my-handle",
+ _blocking=False)
+ self.assertTrue(rc)
+
+ r_count = 0
+ sp = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ sp = wi.get_params()
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ else:
+ self.assertTrue(wi.get_type() ==
+ WorkItem.SUBSCRIBE_INDICATION)
+ # sp better be set up by now!
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 6)
+
+ self.console.destroy(10)
+
+ def test_async_refresh(self):
+ # create console
+ # find one agent
+ # async subscribe to changes to any object in package1/class1
+ # refresh after third data indication
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ rc = self.console.create_subscription(agent,
+ query,
+ "my-handle",
+ _blocking=False)
+ self.assertTrue(rc)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ sp = None
+ rp = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ sp = wi.get_params()
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ rp = wi.get_params()
+ self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+ self.assertTrue(rp.succeeded())
+ self.assertTrue(rp.get_error() == None)
+ else:
+ self.assertTrue(wi.get_type() ==
+ WorkItem.SUBSCRIBE_INDICATION)
+ # sp better be set up by now!
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ if r_count == 4: # + 1 for subscribe reply
+ rp = self.console.refresh_subscription(sp.get_subscription_id())
+ self.assertTrue(rp)
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription, + 2 replys
+ self.assertTrue(r_count > 7)
+
+ self.console.destroy(10)
+
+
+ def test_async_cancel(self):
+ # create console
+ # find one agent
+ # async subscribe to changes to any object in package1/class1
+ # cancel after first data indication
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ rc = self.console.create_subscription(agent,
+ query,
+ "my-handle",
+ _blocking=False)
+ self.assertTrue(rc)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ sp = None
+ rp = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ sp = wi.get_params()
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ else:
+ self.assertTrue(wi.get_type() ==
+ WorkItem.SUBSCRIBE_INDICATION)
+ # sp better be set up by now!
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.cancel_subscription(sp.get_subscription_id())
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 1 subscribe reply and 1 data_indication
+ self.assertTrue(r_count == 2)
+
+ self.console.destroy(10)
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
index a60e3964ad..c76acbd8b9 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
@@ -32,6 +32,7 @@ queue.MyQueue = example.MyQueue
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.ibmStocks = stocks.nyse.ibm
+topic.MyTopic = example.MyTopic
# Register an AMQP destination in JNDI
# NOTE: Qpid currently only supports direct,topics and headers
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 820f7ce036..96401f2178 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -273,6 +273,7 @@
<sysproperty key="broker" value="${broker}"/>
<sysproperty key="broker.clean" value="${broker.clean}"/>
<sysproperty key="broker.clean.between.tests" value="${broker.clean.between.tests}"/>
+ <sysproperty key="broker.persistent" value="${broker.persistent}"/>
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="broker.ready" value="${broker.ready}" />
<sysproperty key="broker.stopped" value="${broker.stopped}" />
diff --git a/qpid/java/perftests/etc/scripts/drainBroker.sh b/qpid/java/perftests/etc/scripts/drainBroker.sh
new file mode 100755
index 0000000000..eea7209f03
--- /dev/null
+++ b/qpid/java/perftests/etc/scripts/drainBroker.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0))))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 consumeOnly=true consumeOnly=true${ARGS}
diff --git a/qpid/java/perftests/etc/scripts/fillBroker.sh b/qpid/java/perftests/etc/scripts/fillBroker.sh
new file mode 100755
index 0000000000..5b7de6f999
--- /dev/null
+++ b/qpid/java/perftests/etc/scripts/fillBroker.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0))))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=0 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 ${ARGS}
diff --git a/qpid/java/perftests/etc/scripts/testWithPreFill.sh b/qpid/java/perftests/etc/scripts/testWithPreFill.sh
new file mode 100755
index 0000000000..721ecf6ecc
--- /dev/null
+++ b/qpid/java/perftests/etc/scripts/testWithPreFill.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0))))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
+
+# Parse arguements taking all - prefixed args as JAVA_OPTS
+for arg in "$@"; do
+ if [[ $arg == -java:* ]]; then
+ JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` "
+ else
+ ARGS="${ARGS}$arg "
+ fi
+done
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 preFill=1000 delayBeforeConsume=1000 ${ARGS}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 89fc805a34..d8fea85477 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -133,8 +133,11 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
{
// _logger.debug("public void testAsyncPingOk(int numPings): called");
+ // get prefill count to update the expected count
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+
// Ensure that at least one ping was requeusted.
- if (numPings == 0)
+ if (numPings + preFill == 0)
{
_logger.error("Number of pings requested was zero.");
fail("Number of pings requested was zero.");
@@ -149,16 +152,24 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// String messageCorrelationId = perThreadSetup._correlationId;
// _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
// Initialize the count and timing controller for the new correlation id.
PerCorrelationId perCorrelationId = new PerCorrelationId();
TimingController tc = getTimingController().getControllerForCurrentThread();
perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
+ perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
+ // Start the client that will have been paused due to preFill requirement.
+ // or if we have not yet started client because messages are sitting on broker.
+ if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ {
+ pingClient.start();
+ }
+
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index 0afec83b19..dcfc67d4fc 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -104,4 +104,9 @@ public class PingClient extends PingPongProducer
return _pingClientCount * _noOfConsumers;
}
}
+
+ public int getClientCount()
+ {
+ return _pingClientCount;
+ }
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
index 94b8ea662e..4c5df0a471 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
@@ -141,9 +141,65 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
perThreadSetup._pingClient = new PingClient(testParameters);
perThreadSetup._pingClient.establishConnection(true, true);
}
- // Start the client connection
- perThreadSetup._pingClient.start();
+ // Prefill the broker unless we are in consume only mode.
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+ {
+ // Manually set the correlation ID to 1. This is not ideal but it is the
+ // value that the main test loop will use.
+ perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, String.valueOf(perThreadSetup._pingClient.getClientCount()));
+
+ // Note with a large preFill and non-tx session the messages will be
+ // rapidly pushed in to the mina buffers. OOM's are a real risk here.
+ // Should perhaps consider using a TX session for the prefill.
+
+ long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
+
+ // Only delay if we have consumers and a delayBeforeConsume
+ if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+ && delayBeforeConsume > 0)
+ {
+
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+ // Only do logging if in verbose mode.
+ if (verbose)
+ {
+ if (delayBeforeConsume > 60000)
+ {
+ long minutes = delayBeforeConsume / 60000;
+ long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
+ long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
+ _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+ }
+ else
+ {
+ _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+ }
+ }
+
+ Thread.sleep(delayBeforeConsume);
+
+ if (verbose)
+ {
+ _logger.info("Starting Test.");
+ }
+ }
+
+ // We can't start the client's here as the test client has not yet been configured to receieve messages.
+ // only when the test method is executed will the correlationID map be set up and ready to consume
+ // the messages we have sent here.
+ }
+ else //Only start the consumer if we are not preFilling.
+ {
+ // Only start the consumer if we don't have messages waiting to be received.
+ // we need to set up the correlationID mapping first
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ {
+ // Start the client connection
+ perThreadSetup._pingClient.start();
+ }
+ }
// Attach the per-thread set to the thread.
threadSetup.set(perThreadSetup);
}
@@ -157,7 +213,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
* Performs test fixture clean
*/
public void threadTearDown()
- {
+ {
_logger.debug("public void threadTearDown(): called");
try
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index f994cd138e..e3769e415e 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -324,6 +324,25 @@ public class PingPongProducer implements Runnable, ExceptionListener
/** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+ /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */
+ public static final String PREFILL_PROPNAME = "preFill";
+
+ /** Defines the default value for the number of messages to prefill. 0,default, no messages. */
+ public static final int PREFILL_DEFAULT = 0;
+
+ /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */
+ public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume";
+
+ /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */
+ public static final long DELAY_BEFORE_CONSUME = 0;
+
+ /** Holds the name of the property to get when no messasges should be sent. */
+ public static final String CONSUME_ONLY_PROPNAME = "consumeOnly";
+
+ /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+ public static final boolean CONSUME_ONLY_DEFAULT = false;
+
+
/** Holds the default configuration properties. */
public static ParsedProperties defaults = new ParsedProperties();
@@ -360,6 +379,9 @@ public class PingPongProducer implements Runnable, ExceptionListener
defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+ defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
+ defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
+ defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
}
/** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -455,6 +477,24 @@ public class PingPongProducer implements Runnable, ExceptionListener
*/
protected int _maxPendingSize;
+ /**
+ * Holds the number of messages to send during the setup phase, before the clients start consuming.
+ */
+ private Integer _preFill;
+
+ /**
+ * Holds the time in ms to wait after preFilling before starting thet test.
+ */
+ private Long _delayBeforeConsume;
+
+ /**
+ * Holds a boolean value of wither this test should just consume, i.e. skips
+ * sending messages, but still expects to receive the specified number.
+ * Use in conjuction with numConsumers=0 to fill the broker.
+ */
+ private boolean _consumeOnly;
+
+
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -588,6 +628,9 @@ public class PingPongProducer implements Runnable, ExceptionListener
_ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
_consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
+ _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
+ _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
+ _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
// Check that one or more destinations were specified.
if (_noOfDestinations < 1)
@@ -638,7 +681,10 @@ public class PingPongProducer implements Runnable, ExceptionListener
}
// Create the destinations to send pings to and receive replies from.
- _replyDestination = _consumerSession[0].createTemporaryQueue();
+ if (_noOfConsumers > 0)
+ {
+ _replyDestination = _consumerSession[0].createTemporaryQueue();
+ }
createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
@@ -871,6 +917,14 @@ public class PingPongProducer implements Runnable, ExceptionListener
{
_consumer = new MessageConsumer[_noOfConsumers];
+ // If we don't have consumers then ensure we have created the
+ // destination.
+ if (_noOfConsumers == 0)
+ {
+ _producerSession.createConsumer(destination, selector,
+ NO_LOCAL_DEFAULT).close();
+ }
+
for (int i = 0; i < _noOfConsumers; i++)
{
// Create a consumer for the destination and set this pinger to listen to its messages.
@@ -980,6 +1034,11 @@ public class PingPongProducer implements Runnable, ExceptionListener
// When running in client ack mode, an ack is done instead of a commit, on the commit batch
// size boundaries.
long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+ // _noOfConsumers can be set to 0 on the command line but we will not get here to
+ // divide by 0 as this is executed by the onMessage code when a message is recevied.
+ // no consumers means no message reception.
+
+
// log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
@@ -1014,6 +1073,7 @@ public class PingPongProducer implements Runnable, ExceptionListener
else
{
log.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Map contains:" + perCorrelationIds.entrySet());
}
}
else
@@ -1037,13 +1097,18 @@ public class PingPongProducer implements Runnable, ExceptionListener
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
* the correlation id.
*
+ * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination
+ * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings.
+ *
+ * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.
+ *
* @param message The message to send. If this is null, one is generated.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
- * for all prematurely.
+ * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
* @throws InterruptedException When interrupted by a timeout
@@ -1051,6 +1116,16 @@ public class PingPongProducer implements Runnable, ExceptionListener
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
throws JMSException, InterruptedException
{
+ return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId);
+ }
+
+ public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+
+ // If we are runnning a consumeOnly test then don't send any messages
+
+
/*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
@@ -1071,29 +1146,41 @@ public class PingPongProducer implements Runnable, ExceptionListener
// countdown needs to be done before the chained listener can be called.
PerCorrelationId perCorrelationId = new PerCorrelationId();
- perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
+ int totalPingsRequested = numPings + preFill;
+ perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
// timeouts.
perCorrelationId.timeOutStart = System.nanoTime();
- // Send the specifed number of messages.
+ // Send the specifed number of messages for this test
pingNoWaitForReply(message, numPings, messageCorrelationId);
boolean timedOut;
boolean allMessagesReceived;
int numReplies;
+ // We don't have a consumer so don't try and wait for the messages.
+ // this does mean that if the producerSession is !TXed then we may
+ // get to exit before all msgs have been received.
+ //
+ // Return the number of requested messages, this will let the test
+ // report a pass.
+ if (_noOfConsumers == 0)
+ {
+ return totalPingsRequested;
+ }
+
do
{
// Block the current thread until replies to all the messages are received, or it times out.
perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
- numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+ numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount();
- allMessagesReceived = numReplies == getExpectedNumPings(numPings);
+ allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested);
// log.debug("numReplies = " + numReplies);
// log.debug("allMessagesReceived = " + allMessagesReceived);
@@ -1108,7 +1195,7 @@ public class PingPongProducer implements Runnable, ExceptionListener
}
while (!timedOut && !allMessagesReceived);
- if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
+ if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose)
{
log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
@@ -1146,6 +1233,12 @@ public class PingPongProducer implements Runnable, ExceptionListener
/*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
+ // If we are runnning a consumeOnly test then don't send any messages
+ if (_consumeOnly)
+ {
+ return;
+ }
+
if (message == null)
{
message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
@@ -1667,6 +1760,10 @@ public class PingPongProducer implements Runnable, ExceptionListener
/**
* Calculates how many pings are expected to be received for the given number sent.
*
+ * Note : that if you have set noConsumers to 0 then this will also return 0
+ * in the case of PubSub testing. This is correct as without consumers there
+ * will be no-one to receive the sent messages so they will be unable to respond.
+ *
* @param numpings The number of pings that will be sent.
*
* @return The number that should be received, for the test to pass.
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index c73959676d..970b08f629 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -28,12 +28,16 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TopicSubscriber;
import junit.framework.Assert;
import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.concurrent.locks.ReentrantLock;
@@ -154,6 +158,7 @@ public class TimeToLiveTest extends QpidTestCase
{
Message send = producerSession.createTextMessage("Message " + msg);
send.setBooleanProperty("first", first);
+ send.setStringProperty("testprop", "TimeToLiveTest");
send.setLongProperty("TTL", producer.getTimeToLive());
return send;
}
@@ -206,5 +211,160 @@ public class TimeToLiveTest extends QpidTestCase
producerSession.close();
producerConnection.close();
}
+
+ public void testPassiveTTLwithDurableSubscription() throws Exception
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create and close the durable subscriber
+ AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+ TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
+ durableSubscriber.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+
+ producerConnection.start();
+
+ // Move to a Transacted session to ensure that all messages have been delivered to broker before
+ // we start waiting for TTL
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer producer = producerSession.createProducer(topic);
+
+ //Set TTL
+ int msg = 0;
+ producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
+
+ producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+ }
+
+ //Reset TTL
+ producer.setTimeToLive(0L);
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
+
+ producerSession.commit();
+
+ //resubscribe
+ durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName());
+
+ // Ensure we sleep the required amount of time.
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ final long MILLIS = 1000000L;
+
+ long waitTime = TIME_TO_LIVE * MILLIS;
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ clientConnection.start();
+
+ //Receive Message 0
+ // Set 5s receive time for messages we expect to receive.
+ Message receivedFirst = durableSubscriber.receive(5000);
+ Message receivedSecond = durableSubscriber.receive(5000);
+ Message receivedThird = durableSubscriber.receive(1000);
+
+ // Log the messages to help diagnosis incase of failure
+ _logger.info("First:"+receivedFirst);
+ _logger.info("Second:"+receivedSecond);
+ _logger.info("Third:"+receivedThird);
+
+ // Only first and last messages sent should survive expiry
+ Assert.assertNull("More messages received", receivedThird);
+
+ Assert.assertNotNull("First message not received", receivedFirst);
+ Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
+
+ Assert.assertNotNull("Final message not received", receivedSecond);
+ Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
+
+ clientSession.unsubscribe(getTestQueueName());
+ clientConnection.close();
+
+ producerConnection.close();
+ }
+
+ public void testActiveTTLwithDurableSubscription() throws Exception
+ {
+ //Create Client 1
+ Connection clientConnection = getConnection();
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create and close the durable subscriber
+ AMQTopic topic = new AMQTopic((AMQConnection) clientConnection, getTestQueueName());
+ TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false);
+ durableSubscriber.close();
+
+ //Create Producer
+ Connection producerConnection = getConnection();
+ AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(topic);
+ producer.setTimeToLive(1000L);
+
+ // send Messages
+ for(int i = 0; i < MSG_COUNT; i++)
+ {
+ producer.send(producerSession.createTextMessage("Message: "+i));
+ }
+ long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+
+ // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
+ long messageCount = MSG_COUNT;
+ long lastPass;
+ AMQQueue subcriptionQueue = new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription");
+ do
+ {
+ lastPass = messageCount;
+ Thread.sleep(100);
+ messageCount = producerSession.getQueueDepth((AMQDestination) subcriptionQueue);
+
+ // If we have received messages in the last loop then extend the timeout time.
+ // if we get messages stuck that are not expiring then the failureTime will occur
+ // failing the test. This will help with the scenario when the broker does not
+ // have enough CPU cycles to process the TTLs.
+ if (lastPass != messageCount)
+ {
+ failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
+ }
+ }
+ while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+ assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ clientSession.unsubscribe("MyDurableTTLSubscription");
+ clientSession.close();
+ clientConnection.close();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
index 19b73fcc7c..cf815fbc05 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
@@ -38,7 +38,7 @@ public class DurableSubscriberTest extends QpidTestCase
*/
public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
{
- if (!isBroker08())
+ if (isBrokerStorePersistent() || !isBroker08())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
@@ -102,7 +102,7 @@ public class DurableSubscriberTest extends QpidTestCase
*/
public void testDurSubRestoresMessageSelector() throws Exception
{
- if (!isBroker08())
+ if (isBrokerStorePersistent() || !isBroker08())
{
TopicConnectionFactory factory = getConnectionFactory();
Topic topic = (Topic) getInitialContext().lookup(_topicName);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index a907e6a29d..91bb5d2529 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -115,10 +115,26 @@ public class DurableSubscriptionTest extends QpidTestCase
_logger.info("Close connection");
con.close();
}
+
+ public void testDurabilityNOACK() throws Exception
+ {
+ durabilityImpl(AMQSession.NO_ACKNOWLEDGE, false);
+ }
public void testDurabilityAUTOACK() throws Exception
{
- durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE, false);
+ }
+
+ public void testDurabilityAUTOACKwithRestartIfPersistent() throws Exception
+ {
+ if(!isBrokerStorePersistent())
+ {
+ System.out.println("The broker store is not persistent, skipping this test.");
+ return;
+ }
+
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE, true);
}
public void testDurabilityNOACKSessionPerConnection() throws Exception
@@ -131,56 +147,85 @@ public class DurableSubscriptionTest extends QpidTestCase
durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
}
- private void durabilityImpl(int ackMode) throws Exception
- {
+ private void durabilityImpl(int ackMode, boolean restartBroker) throws Exception
+ {
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con, "MyTopic");
Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session sessionProd = con.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
+ //send message A and check both consumers receive
producer.send(session1.createTextMessage("A"));
Message msg;
+ _logger.info("Receive message on consumer 1 :expecting A");
msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
+ _logger.info("Receive message on consumer 2 :expecting A");
msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("A", ((TextMessage) msg).getText());
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- consumer2.close();
- session2.close();
-
+ //send message B, receive with consumer 1, and disconnect consumer 2 to leave the message behind (if not NO_ACK)
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(500);
assertNotNull("Consumer 1 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+ assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(500);
assertNull("There should be no more messages for consumption on consumer1.", msg);
+ consumer2.close();
+ session2.close();
+
+ //Send message C, then connect consumer 3 to durable subscription and get
+ //message B if not using NO_ACK, then receive C with consumer 1 and 3
+ producer.send(session1.createTextMessage("C"));
+
Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- _logger.info("Receive message on consumer 3 :expecting B");
+ if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+ {
+ //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+ //when we didn't call receive() to get it before closing consumer 2
+ }
+ else
+ {
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
+ }
+
+ _logger.info("Receive message on consumer 1 :expecting C");
+ msg = consumer1.receive(500);
+ assertNotNull("Consumer 1 should get message 'C'.", msg);
+ assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ _logger.info("Receive message on consumer 3 :expecting C");
msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 3 should get message 'C'.", msg);
+ assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(500);
assertNull("There should be no more messages for consumption on consumer3.", msg);
@@ -191,6 +236,18 @@ public class DurableSubscriptionTest extends QpidTestCase
session3.unsubscribe("MySubscription");
con.close();
+
+ if(restartBroker)
+ {
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ fail("Error restarting the broker");
+ }
+ }
}
private void durabilityImplSessionPerConnection(int ackMode) throws Exception
@@ -211,7 +268,7 @@ public class DurableSubscriptionTest extends QpidTestCase
con1.start();
Session session1 = con1.createSession(false, ackMode);
- MessageConsumer consumer1 = session0.createConsumer(topic);
+ MessageConsumer consumer1 = session1.createConsumer(topic);
// Create consumer 2.
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
@@ -232,37 +289,60 @@ public class DurableSubscriptionTest extends QpidTestCase
msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
assertNotNull("Message should have been received",msg);
assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
- msg = consumer2.receive(500);
+ msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("There should be no more messages for consumption on consumer2.", msg);
+ // Send message and receive on consumer 1.
+ producer.send(session0.createTextMessage("B"));
+
+ _logger.info("Receive message on consumer 1 :expecting B");
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertEquals(null, msg);
+
// Detach the durable subscriber.
consumer2.close();
session2.close();
con2.close();
+
+ // Send message C and receive on consumer 1
+ producer.send(session0.createTextMessage("C"));
- // Send message and receive on open consumer.
- producer.send(session0.createTextMessage("B"));
-
- _logger.info("Receive message on consumer 1 :expecting B");
- msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals("B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting C");
+ msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+ // Re-attach a new consumer to the durable subscription, and check that it gets message B it left (if not NO_ACK)
+ // and also gets message C sent after it was disconnected.
AMQConnection con3 = (AMQConnection) getConnection("guest", "guest");
con3.start();
Session session3 = con3.createSession(false, ackMode);
TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- _logger.info("Receive message on consumer 3 :expecting B");
- msg = consumer3.receive(500);
- assertNotNull("Consumer 3 should get message 'B'.", msg);
- assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ if(ackMode == AMQSession.NO_ACKNOWLEDGE)
+ {
+ //Do nothing if NO_ACK was used, as prefetch means the message was dropped
+ //when we didn't call receive() to get it before closing consumer 2
+ }
+ else
+ {
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertEquals("B", ((TextMessage) msg).getText());
+ }
+
+ _logger.info("Receive message on consumer 3 :expecting C");
+ msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
+ assertNotNull("Consumer 3 should get message 'C'.", msg);
+ assertEquals("Incorrect Message recevied on consumer3.", "C", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(500);
+ msg = consumer3.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull("There should be no more messages for consumption on consumer3.", msg);
consumer1.close();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 84ff7055c5..d693c7f6c1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -166,6 +166,7 @@ public class QpidTestCase extends TestCase
private static final String TEST_OUTPUT = "test.output";
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
+ private static final String BROKER_PERSITENT = "broker.persistent";
// values
protected static final String JAVA = "java";
@@ -187,6 +188,7 @@ public class QpidTestCase extends TestCase
private Boolean _brokerCleanBetweenTests = Boolean.getBoolean(BROKER_CLEAN_BETWEEN_TESTS);
private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
private String _output = System.getProperty(TEST_OUTPUT);
+ protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
private static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
@@ -929,6 +931,11 @@ public class QpidTestCase extends TestCase
{
return !_broker.equals("vm");
}
+
+ protected boolean isBrokerStorePersistent()
+ {
+ return _brokerPersistent;
+ }
public void restartBroker() throws Exception
{
diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes
index 6f3898384d..184a6f9e78 100644
--- a/qpid/java/test-profiles/JavaExcludes
+++ b/qpid/java/test-profiles/JavaExcludes
@@ -1,4 +1,3 @@
-org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
// Those tests are not finished
org.apache.qpid.test.testcases.TTLTest#*
org.apache.qpid.test.testcases.FailoverTest#*
@@ -18,3 +17,5 @@ org.apache.qpid.client.SessionCreateTest#*
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
+//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support causes exclusivity mismatch after restart
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
diff --git a/qpid/java/test-profiles/JavaStandaloneExcludes b/qpid/java/test-profiles/JavaStandaloneExcludes
index ed12973498..a9e2a058f8 100644
--- a/qpid/java/test-profiles/JavaStandaloneExcludes
+++ b/qpid/java/test-profiles/JavaStandaloneExcludes
@@ -1,4 +1,3 @@
-org.apache.qpid.test.unit.ct.DurableSubscriberTests#*
// Those tests are not finished
org.apache.qpid.test.testcases.TTLTest#*
org.apache.qpid.test.testcases.FailoverTest#*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index f81e9c213c..b4e5282228 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -1 +1,3 @@
+//These tests require a persistent store
org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
diff --git a/qpid/java/test-profiles/java-derby.testprofile b/qpid/java/test-profiles/java-derby.testprofile
index 1238a44c84..618dd5b45d 100644
--- a/qpid/java/test-profiles/java-derby.testprofile
+++ b/qpid/java/test-profiles/java-derby.testprofile
@@ -7,3 +7,4 @@ broker.config=${project.root}/build/etc/config-systests-derby.xml
qpid.amqp.version=0-9
profile.excludes=JavaStandaloneExcludes
broker.clean.between.tests=true
+broker.persistent=true
diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test
index f7955a4a9d..a47f633565 100755
--- a/qpid/python/qpid-python-test
+++ b/qpid/python/qpid-python-test
@@ -20,7 +20,7 @@
# TODO: summarize, test harness preconditions (e.g. broker is alive)
-import logging, optparse, os, struct, sys, traceback, types
+import logging, optparse, os, struct, sys, time, traceback, types
from fnmatch import fnmatchcase as match
from getopt import GetoptError
from logging import getLogger, StreamHandler, Formatter, Filter, \
@@ -60,6 +60,8 @@ parser.add_option("-I", "--ignore-file", metavar="IFILE", action="append",
help="ignore tests matching patterns in IFILE")
parser.add_option("-H", "--halt-on-error", action="store_true", default=False,
dest="hoe", help="halt if an error is encountered")
+parser.add_option("-t", "--time", action="store_true", default=False,
+ help="report timing information on test run")
parser.add_option("-D", "--define", metavar="DEFINE", dest="defines",
action="append", default=[], help="define test parameters")
@@ -165,7 +167,9 @@ KEYWORDS = {"pass": (32,),
"start": (34,),
"total": (34,),
"ignored": (33,),
- "selected": (34,)}
+ "selected": (34,),
+ "elapsed": (34,),
+ "average": (34,)}
COLORIZE = is_smart()
@@ -525,6 +529,7 @@ total = len(filtered) + len(ignored)
passed = 0
failed = 0
skipped = 0
+start = time.time()
for t in filtered:
if list_only:
print t.name()
@@ -538,6 +543,7 @@ for t in filtered:
failed += 1
if opts.hoe:
break
+end = time.time()
run = passed + failed
@@ -558,16 +564,22 @@ if not list_only:
skip = "skip"
else:
skip = "pass"
- print colorize("Totals:", 1), \
- colorize_word("total", "%s tests" % total) + ",", \
- colorize_word(_pass, "%s passed" % passed) + ",", \
- colorize_word(skip, "%s skipped" % skipped) + ",", \
- colorize_word(ign, "%s ignored" % len(ignored)) + ",", \
- colorize_word(outcome, "%s failed" % failed),
+ print colorize("Totals:", 1),
+ totals = [colorize_word("total", "%s tests" % total),
+ colorize_word(_pass, "%s passed" % passed),
+ colorize_word(skip, "%s skipped" % skipped),
+ colorize_word(ign, "%s ignored" % len(ignored)),
+ colorize_word(outcome, "%s failed" % failed)]
+ print ", ".join(totals),
if opts.hoe and failed > 0:
print " -- (halted after %s)" % run
else:
print
+ if opts.time and run > 0:
+ print colorize("Timing:", 1),
+ timing = [colorize_word("elapsed", "%.2fs elapsed" % (end - start)),
+ colorize_word("average", "%.2fs average" % ((end - start)/run))]
+ print ", ".join(timing)
if failed or skipped:
sys.exit(1)
diff --git a/qpid/python/qpid/messaging/driver.py b/qpid/python/qpid/messaging/driver.py
index a09686badd..3712bdd221 100644
--- a/qpid/python/qpid/messaging/driver.py
+++ b/qpid/python/qpid/messaging/driver.py
@@ -283,6 +283,11 @@ EMPTY_MP = MessageProperties()
SUBJECT = "qpid.subject"
TO = "qpid.to"
+CLOSED = "CLOSED"
+READ_ONLY = "READ_ONLY"
+WRITE_ONLY = "WRITE_ONLY"
+OPEN = "OPEN"
+
class Driver:
def __init__(self, connection):
@@ -290,24 +295,158 @@ class Driver:
self.log_id = "%x" % id(self.connection)
self._lock = self.connection._lock
- self._in = LinkIn()
- self._out = LinkOut()
-
self._selector = Selector.default()
self._attempts = 0
self._hosts = [(self.connection.host, self.connection.port)] + \
self.connection.backups
self._host = 0
self._retrying = False
+ self._socket = None
+
+ self._timeout = None
+
+ self.engine = None
+
+ @synchronized
+ def wakeup(self):
+ self.dispatch()
+ self._selector.wakeup()
+
+ def start(self):
+ self._selector.register(self)
+
+ def fileno(self):
+ return self._socket.fileno()
- self.reset()
+ @synchronized
+ def reading(self):
+ return self._socket is not None
- def reset(self):
- self._opening = False
+ @synchronized
+ def writing(self):
+ return self._socket is not None and self.engine.pending()
+
+ @synchronized
+ def timing(self):
+ return self._timeout
+
+ @synchronized
+ def readable(self):
+ try:
+ data = self._socket.recv(64*1024)
+ if data:
+ rawlog.debug("READ[%s]: %r", self.log_id, data)
+ self.engine.write(data)
+ else:
+ self.close_engine()
+ except socket.error, e:
+ self.close_engine(e)
+
+ self.update_status()
+
+ self.connection._waiter.notifyAll()
+
+ def close_engine(self, e=None):
+ if e is None:
+ e = "connection aborted"
+
+ if (self.connection.reconnect and
+ (self.connection.reconnect_limit is None or
+ self.connection.reconnect_limit <= 0 or
+ self._attempts <= self.connection.reconnect_limit)):
+ if self._host > 0:
+ delay = 0
+ else:
+ delay = self.connection.reconnect_delay
+ self._timeout = time.time() + delay
+ log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
+ if delay > 0:
+ log.warn("sleeping %s seconds" % delay)
+ self._retrying = True
+ self.engine.close()
+ else:
+ self.engine.close(e)
+
+ def update_status(self):
+ status = self.engine.status()
+ return getattr(self, "st_%s" % status.lower())()
+
+ def st_closed(self):
+ # XXX: this log statement seems to sometimes hit when the socket is not connected
+ # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
+ self._socket.close()
+ self._socket = None
+ self.engine = None
+ return True
+
+ def st_open(self):
+ return False
+
+ @synchronized
+ def writeable(self):
+ notify = False
+ try:
+ n = self._socket.send(self.engine.peek())
+ sent = self.engine.read(n)
+ rawlog.debug("SENT[%s]: %r", self.log_id, sent)
+ except socket.error, e:
+ self.close_engine(e)
+ notify = True
+
+ if self.update_status() or notify:
+ self.connection._waiter.notifyAll()
+
+ @synchronized
+ def timeout(self):
+ self.dispatch()
+ self.connection._waiter.notifyAll()
+
+ def dispatch(self):
+ try:
+ if self._socket is None:
+ if self.connection._connected:
+ self.connect()
+ else:
+ self.engine.dispatch()
+ except:
+ # XXX: Does socket get leaked if this occurs?
+ msg = compat.format_exc()
+ self.connection.error = (msg,)
+
+ def connect(self):
+ try:
+ # XXX: should make this non blocking
+ if self._host == 0:
+ self._attempts += 1
+ host, port = self._hosts[self._host]
+ if self._retrying:
+ log.warn("trying: %s:%s", host, port)
+ self.engine = Engine(self.connection)
+ self.engine.open()
+ rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
+ self._socket = connect(host, port)
+ if self._retrying:
+ log.warn("reconnect succeeded: %s:%s", host, port)
+ self._timeout = None
+ self._attempts = 0
+ self._host = 0
+ self._retrying = False
+ except socket.error, e:
+ self._host = (self._host + 1) % len(self._hosts)
+ self.close_engine(e)
+
+class Engine:
+
+ def __init__(self, connection):
+ self.connection = connection
+ self.log_id = "%x" % id(self.connection)
self._closing = False
self._connected = False
self._attachments = {}
+ self._in = LinkIn()
+ self._out = LinkOut()
+
self._channel_max = 65536
self._channels = 0
self._sessions = {}
@@ -316,7 +455,7 @@ class Driver:
self.address_cache = Cache(options.get("address_ttl", 60))
- self._socket = None
+ self._status = CLOSED
self._buf = ""
self._hdr = ""
self._op_enc = OpEncoder()
@@ -325,7 +464,6 @@ class Driver:
self._frame_dec = FrameDecoder()
self._seg_dec = SegmentDecoder()
self._op_dec = OpDecoder()
- self._timeout = None
self._sasl = sasl.Client()
if self.connection.username:
@@ -343,6 +481,9 @@ class Driver:
self._sasl_encode = False
self._sasl_decode = False
+ def _reset(self):
+ self.connection._transport_connected = False
+
for ssn in self.connection.sessions.values():
for m in ssn.acked + ssn.unacked + ssn.incoming:
m._transfer_id = None
@@ -352,76 +493,40 @@ class Driver:
rcv.impending = rcv.received
rcv.linked = False
- @synchronized
- def wakeup(self):
- self.dispatch()
- self._selector.wakeup()
-
- def start(self):
- self._selector.register(self)
-
- def fileno(self):
- return self._socket.fileno()
-
- @synchronized
- def reading(self):
- return self._socket is not None
-
- @synchronized
- def writing(self):
- return self._socket is not None and self._buf
-
- @synchronized
- def timing(self):
- return self._timeout
+ def status(self):
+ return self._status
- @synchronized
- def readable(self):
- error = None
- recoverable = False
+ def write(self, data):
try:
- data = self._socket.recv(64*1024)
- if data:
- rawlog.debug("READ[%s]: %r", self.log_id, data)
- if self._sasl_decode:
- data = self._sasl.decode(data)
- else:
- rawlog.debug("ABORTED[%s]: %s", self.log_id, self._socket.getpeername())
- error = "connection aborted"
- recoverable = True
- except socket.error, e:
- error = e
- recoverable = True
-
- if not error:
- try:
- if len(self._hdr) < 8:
- r = 8 - len(self._hdr)
- self._hdr += data[:r]
- data = data[r:]
-
- if len(self._hdr) == 8:
- self.do_header(self._hdr)
-
- self._frame_dec.write(data)
- self._seg_dec.write(*self._frame_dec.read())
- self._op_dec.write(*self._seg_dec.read())
- for op in self._op_dec.read():
- self.assign_id(op)
- opslog.debug("RCVD[%s]: %r", self.log_id, op)
- op.dispatch(self)
- except VersionError, e:
- error = e
- except:
- msg = compat.format_exc()
- error = msg
-
- if error:
- self._error(error, recoverable)
- else:
+ if self._sasl_decode:
+ data = self._sasl.decode(data)
+
+ if len(self._hdr) < 8:
+ r = 8 - len(self._hdr)
+ self._hdr += data[:r]
+ data = data[r:]
+
+ if len(self._hdr) == 8:
+ self.do_header(self._hdr)
+
+ self._frame_dec.write(data)
+ self._seg_dec.write(*self._frame_dec.read())
+ self._op_dec.write(*self._seg_dec.read())
+ for op in self._op_dec.read():
+ self.assign_id(op)
+ opslog.debug("RCVD[%s]: %r", self.log_id, op)
+ op.dispatch(self)
self.dispatch()
+ except VersionError, e:
+ self.close(e)
+ except:
+ self.close(compat.format_exc())
- self.connection._waiter.notifyAll()
+ def close(self, e=None):
+ self._reset()
+ if e:
+ self.connection.error = (e,)
+ self._status = CLOSED
def assign_id(self, op):
if isinstance(op, Command):
@@ -429,40 +534,16 @@ class Driver:
op.id = sst.received
sst.received += 1
- @synchronized
- def writeable(self):
- try:
- n = self._socket.send(self._buf)
- rawlog.debug("SENT[%s]: %r", self.log_id, self._buf[:n])
- self._buf = self._buf[n:]
- except socket.error, e:
- self._error(e, True)
- self.connection._waiter.notifyAll()
+ def pending(self):
+ return len(self._buf)
- @synchronized
- def timeout(self):
- self.dispatch()
- self.connection._waiter.notifyAll()
+ def read(self, n):
+ result = self._buf[:n]
+ self._buf = self._buf[n:]
+ return result
- def _error(self, err, recoverable):
- if self._socket is not None:
- self._socket.close()
- self.reset()
- if (recoverable and self.connection.reconnect and
- (self.connection.reconnect_limit is None or
- self.connection.reconnect_limit <= 0 or
- self._attempts <= self.connection.reconnect_limit)):
- if self._host > 0:
- delay = 0
- else:
- delay = self.connection.reconnect_delay
- self._timeout = time.time() + delay
- log.warn("recoverable error[attempt %s]: %s" % (self._attempts, err))
- if delay > 0:
- log.warn("sleeping %s seconds" % delay)
- self._retrying = True
- else:
- self.connection.error = (err,)
+ def peek(self):
+ return self._buf
def write_op(self, op):
opslog.debug("SENT[%s]: %r", self.log_id, op)
@@ -507,6 +588,7 @@ class Driver:
def do_connection_open_ok(self, open_ok):
self._connected = True
self._sasl_decode = True
+ self.connection._transport_connected = True
def connection_heartbeat(self, hrt):
self.write_op(ConnectionHeartbeat())
@@ -522,8 +604,7 @@ class Driver:
# probably the right thing to do
def do_connection_close_ok(self, close_ok):
- self._socket.close()
- self.reset()
+ self.close()
def do_session_attached(self, atc):
pass
@@ -576,40 +657,18 @@ class Driver:
sst.session.error = (ex,)
def dispatch(self):
- try:
- if self._socket is None and self.connection._connected and not self._opening:
- self.connect()
- elif self._socket is not None and not self.connection._connected and not self._closing:
- self.disconnect()
-
- if self._connected and not self._closing:
- for ssn in self.connection.sessions.values():
- self.attach(ssn)
- self.process(ssn)
- except:
- msg = compat.format_exc()
- self.connection.error = (msg,)
+ if not self.connection._connected and not self._closing and self._status != CLOSED:
+ self.disconnect()
- def connect(self):
- try:
- # XXX: should make this non blocking
- if self._host == 0:
- self._attempts += 1
- host, port = self._hosts[self._host]
- if self._retrying:
- log.warn("trying: %s:%s", host, port)
- self._socket = connect(host, port)
- if self._retrying:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._timeout = None
- self._attempts = 0
- self._host = 0
- self._retrying = False
- self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
- self._opening = True
- except socket.error, e:
- self._host = (self._host + 1) % len(self._hosts)
- self._error(e, True)
+ if self._connected and not self._closing:
+ for ssn in self.connection.sessions.values():
+ self.attach(ssn)
+ self.process(ssn)
+
+ def open(self):
+ self._reset()
+ self._status = OPEN
+ self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
def disconnect(self):
self.write_op(ConnectionClose(close_code.normal))
@@ -1023,7 +1082,6 @@ class Driver:
rcv.received += 1
log.debug("RCVD[%s]: %s", ssn.log_id, msg)
ssn.incoming.append(msg)
- self.connection._waiter.notifyAll()
def _decode(self, xfr):
dp = EMPTY_DP
diff --git a/qpid/python/qpid/messaging/endpoints.py b/qpid/python/qpid/messaging/endpoints.py
index 596866de66..004cee5f88 100644
--- a/qpid/python/qpid/messaging/endpoints.py
+++ b/qpid/python/qpid/messaging/endpoints.py
@@ -94,6 +94,7 @@ class Connection:
self.session_counter = 0
self.sessions = {}
self._connected = False
+ self._transport_connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
self._waiter = Waiter(self._condition)
@@ -157,7 +158,7 @@ class Connection:
"""
self._connected = True
self._wakeup()
- self._ewait(lambda: self._driver._connected and not self._unlinked(),
+ self._ewait(lambda: self._transport_connected and not self._unlinked(),
exc=ConnectError)
def _unlinked(self):
@@ -173,7 +174,7 @@ class Connection:
"""
self._connected = False
self._wakeup()
- self._ewait(lambda: not self._driver._connected)
+ self._ewait(lambda: not self._transport_connected)
@synchronized
def connected(self):