summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/bindings/qpid/python/qpid_messaging.i83
-rw-r--r--qpid/cpp/include/qpid/messaging/Logger.h2
-rw-r--r--qpid/cpp/include/qpid/qpid.i2
-rw-r--r--qpid/cpp/include/qpid/swig_python_typemaps.i7
-rw-r--r--qpid/cpp/src/amqp.cmake2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp27
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionOptions.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp184
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp24
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/PnData.cpp218
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/PnData.h60
-rw-r--r--qpid/cpp/src/tests/brokertest.py134
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py41
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py461
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py10
-rw-r--r--qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py45
-rw-r--r--qpid/cpp/src/tests/legacystore/python_tests/resize.py45
-rw-r--r--qpid/cpp/src/tests/legacystore/python_tests/store_test.py79
-rwxr-xr-xqpid/cpp/src/tests/qpidd_qmfv2_tests.py2
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in4
-rw-r--r--qpid/tools/src/py/qpidtoollibs/broker.py23
28 files changed, 871 insertions, 608 deletions
diff --git a/qpid/cpp/bindings/qpid/python/qpid_messaging.i b/qpid/cpp/bindings/qpid/python/qpid_messaging.i
index 8063db5967..e728c90334 100644
--- a/qpid/cpp/bindings/qpid/python/qpid_messaging.i
+++ b/qpid/cpp/bindings/qpid/python/qpid_messaging.i
@@ -141,7 +141,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
* don't even know why there is a non-const version of the method. */
%rename(opened) qpid::messaging::Connection::isOpen();
%rename(_close) qpid::messaging::Connection::close();
-%rename(receiver) qpid::messaging::Session::createReceiver;
+%rename(_receiver) qpid::messaging::Session::createReceiver;
%rename(_sender) qpid::messaging::Session::createSender;
%rename(_acknowledge_all) qpid::messaging::Session::acknowledge(bool);
%rename(_acknowledge_msg) qpid::messaging::Session::acknowledge(
@@ -161,6 +161,16 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
%rename(_getTtl) qpid::messaging::Message::getTtl;
%rename(_setTtl) qpid::messaging::Message::setTtl;
+%rename(_sync) qpid::messaging::Session::sync;
+
+// Capitalize constant names correctly for python
+%rename(TRACE) qpid::messaging::trace;
+%rename(DEBUG) qpid::messaging::debug;
+%rename(INFO) qpid::messaging::info;
+%rename(NOTICE) qpid::messaging::notice;
+%rename(WARNING) qpid::messaging::warning;
+%rename(ERROR) qpid::messaging::error;
+%rename(CRITICAL) qpid::messaging::critical;
%include "qpid/qpid.i"
@@ -221,31 +231,77 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
%pythoncode %{
@staticmethod
- def establish(url=None, **options) :
+ def establish(url=None, timeout=None, **options) :
+ if timeout and "reconnect-timeout" not in options:
+ options["reconnect-timeout"] = timeout
conn = Connection(url, **options)
conn.open()
return conn
%}
}
+%pythoncode %{
+ # Disposition class from messaging/message.py
+ class Disposition:
+ def __init__(self, type, **options):
+ self.type = type
+ self.options = options
+
+ def __repr__(self):
+ args = [str(self.type)] + ["%s=%r" % (k, v) for k, v in self.options.items()]
+ return "Disposition(%s)" % ", ".join(args)
+
+ # Consntants from messaging/constants.py
+ __SELF__ = object()
+
+ class Constant:
+
+ def __init__(self, name, value=__SELF__):
+ self.name = name
+ if value is __SELF__:
+ self.value = self
+ else:
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+ AMQP_PORT = 5672
+ AMQPS_PORT = 5671
+
+ UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+ REJECTED = Constant("REJECTED")
+ RELEASED = Constant("RELEASED")
+%}
+
%extend qpid::messaging::Session {
%pythoncode %{
def acknowledge(self, message=None, disposition=None, sync=True) :
- if disposition :
- raise Exception("SWIG does not support dispositions yet. Use "
- "Session.reject and Session.release instead")
if message :
- self._acknowledge_msg(message, sync)
+ if disposition is None: self._acknowledge_msg(message, sync)
+ # FIXME aconway 2014-02-11: the following does not repsect the sync flag.
+ elif disposition.type == REJECTED: self.reject(message)
+ elif disposition.type == RELEASED: self.release(message)
else :
+ if disposition : # FIXME aconway 2014-02-11: support this
+ raise Exception("SWIG does not support dispositions yet. Use "
+ "Session.reject and Session.release instead")
self._acknowledge_all(sync)
__swig_getmethods__["connection"] = getConnection
if _newclass: connection = property(getConnection)
- def sender(self, target, **options) :
- s = self._sender(target)
- s._setDurable(options.get("durable"))
- return s
+ def receiver(self, source, capacity=None):
+ r = self._receiver(source)
+ if capacity is not None: r.capacity = capacity
+ return r
+
+ def sender(self, target, durable=None, capacity=None) :
+ s = self._sender(target)
+ if capacity is not None: s.capacity = capacity
+ s._setDurable(durable)
+ return s
def next_receiver(self, timeout=None) :
if timeout is None :
@@ -254,6 +310,11 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
# Python API uses timeouts in seconds,
# but C++ API uses milliseconds
return self._next_receiver(Duration(int(1000*timeout)))
+
+ def sync(self, timeout=None):
+ if timeout == 0: self._sync(False) # Non-blocking sync
+ else: self._sync(True) # Blocking sync, C++ has not timeout.
+
%}
}
@@ -302,6 +363,8 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError)
message.durable = self.durable
return self._send(message, sync)
+ def sync(self, timeout=None): self.session.sync(timeout)
+
__swig_getmethods__["capacity"] = getCapacity
__swig_setmethods__["capacity"] = setCapacity
if _newclass: capacity = property(getCapacity, setCapacity)
diff --git a/qpid/cpp/include/qpid/messaging/Logger.h b/qpid/cpp/include/qpid/messaging/Logger.h
index 3b774dc8a9..39518e01d4 100644
--- a/qpid/cpp/include/qpid/messaging/Logger.h
+++ b/qpid/cpp/include/qpid/messaging/Logger.h
@@ -103,7 +103,7 @@ public:
* --log-time ("on"|"off|"0"|"1")
* --log-level ("on"|"off|"0"|"1")
* --log-source ("on"|"off|"0"|"1")
- * --log-thread ("on"|"off|"0"|"1")
+ * --log-thread ("on"|"off|"0"|"1")
* --log-function ("on"|"off|"0"|"1")
* --log-hires-timestamp ("on"|"off|"0"|"1")
*
diff --git a/qpid/cpp/include/qpid/qpid.i b/qpid/cpp/include/qpid/qpid.i
index 28a9064ebb..70f4ce8105 100644
--- a/qpid/cpp/include/qpid/qpid.i
+++ b/qpid/cpp/include/qpid/qpid.i
@@ -53,6 +53,7 @@ struct mystr
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Duration.h>
#include <qpid/messaging/FailoverUpdates.h>
+#include <qpid/messaging/Logger.h>
//
// Wrapper functions for map-decode and list-decode. This allows us to avoid
@@ -84,6 +85,7 @@ qpid::types::Variant::List& decodeList(const qpid::messaging::Message& msg) {
%include <qpid/messaging/Session.h>
%include <qpid/messaging/Connection.h>
%include <qpid/messaging/FailoverUpdates.h>
+%include <qpid/messaging/Logger.h>
qpid::types::Variant::Map& decodeMap(const qpid::messaging::Message&);
qpid::types::Variant::List& decodeList(const qpid::messaging::Message&);
diff --git a/qpid/cpp/include/qpid/swig_python_typemaps.i b/qpid/cpp/include/qpid/swig_python_typemaps.i
index 9d44a1e1ef..e9a84ffa8e 100644
--- a/qpid/cpp/include/qpid/swig_python_typemaps.i
+++ b/qpid/cpp/include/qpid/swig_python_typemaps.i
@@ -452,3 +452,10 @@ typedef int Py_ssize_t;
$1 = PyInt_Check($input) ? 1 : 0;
}
+
+/**
+ * argc,argv as python list
+ */
+
+%include <argcargv.i>
+%apply (int ARGC, char **ARGV) { (int argc, const char *argv[]) }
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index d1ff184a64..f98b3da419 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -120,6 +120,8 @@ if (BUILD_AMQP)
qpid/messaging/amqp/ConnectionHandle.cpp
qpid/messaging/amqp/DriverImpl.h
qpid/messaging/amqp/DriverImpl.cpp
+ qpid/messaging/amqp/PnData.h
+ qpid/messaging/amqp/PnData.cpp
qpid/messaging/amqp/ReceiverContext.h
qpid/messaging/amqp/ReceiverContext.cpp
qpid/messaging/amqp/ReceiverHandle.h
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 0010f602c1..cd62523fc1 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1292,12 +1292,13 @@ const std::string Broker::TCP_TRANSPORT("tcp");
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
- const QueueSettings& settings,
+ const QueueSettings& constSettings,
const OwnershipToken* owner,
const std::string& alternateExchange,
const std::string& userId,
const std::string& connectionId)
{
+ QueueSettings settings(constSettings); // So we can modify them
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
@@ -1335,6 +1336,10 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
+ // Identify queues that won't survive a failover: exclusive, auto-delete with no delay.
+ if (owner && settings.autodelete && !settings.autoDeleteDelay)
+ settings.isTemporary = true;
+
std::pair<Queue::shared_ptr, bool> result =
queues.declare(name, settings, alternate, false/*recovering*/,
owner, connectionId, userId);
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 941c0c88a2..fd99629492 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -521,7 +521,7 @@ void Queue::markInUse(bool controlling)
else users.addOther();
}
-void Queue::releaseFromUse(bool controlling)
+void Queue::releaseFromUse(bool controlling, bool doDelete)
{
bool trydelete;
if (controlling) {
@@ -533,7 +533,7 @@ void Queue::releaseFromUse(bool controlling)
users.removeOther();
trydelete = isUnused(locker);
}
- if (trydelete) scheduleAutoDelete();
+ if (trydelete && doDelete) scheduleAutoDelete();
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
@@ -577,11 +577,13 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
if (mgmtObject != 0 && c->isCounted()) {
mgmtObject->inc_consumerCount();
}
- ManagementAgent* agent = broker->getManagementAgent();
- if (agent) {
- agent->raiseEvent(
- _qmf::EventSubscribe(connectionId, userId, name,
- c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+ if (broker) {
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent) {
+ agent->raiseEvent(
+ _qmf::EventSubscribe(connectionId, userId, name,
+ c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+ }
}
}
@@ -589,6 +591,7 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons
{
removeListener(c);
if(c->isCounted())
+
{
bool unused;
{
@@ -605,12 +608,12 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons
if (mgmtObject != 0) {
mgmtObject->dec_consumerCount();
}
- if (unused && settings.autodelete) {
- scheduleAutoDelete();
- }
+ if (unused && settings.autodelete) scheduleAutoDelete();
+ }
+ if (broker) {
+ ManagementAgent* agent = broker->getManagementAgent();
+ if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
}
- ManagementAgent* agent = broker->getManagementAgent();
- if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
}
/**
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 683468e9a4..c5996dd8be 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -363,7 +363,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* be created.
*/
QPID_BROKER_EXTERN void markInUse(bool controlling=false);
- QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false);
+ QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false, bool doDelete=true);
QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages
boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(),
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 888dc68833..b9ae261f83 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -283,8 +283,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
} catch (const qpid::types::Exception& e) {
throw InvalidArgumentException(e.what());
}
- // Identify queues that won't survive a failover.
- settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay;
std::pair<Queue::shared_ptr, bool> queue_created =
getBroker().createQueue(name, settings,
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index f6b242c100..5f0dd2e101 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -264,7 +264,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
}
std::pair<boost::shared_ptr<Queue>, bool> result
- = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
+ = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), node.properties.isExclusive() ? this:0, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
node.queue = result.first;
node.created = result.second;
}
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 6881896f5e..4e908fbe79 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -156,7 +156,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
// Don't allow backup queues to auto-delete, primary decides when to delete.
- if (q->isAutoDelete()) q->markInUse();
+ if (q->isAutoDelete()) q->markInUse(false);
dispatch[DequeueEvent::KEY] =
boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
@@ -352,13 +352,13 @@ void QueueReplicator::promoted() {
queue->getMessageInterceptors().add(
boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
// Process auto-deletes
- if (queue->isAutoDelete() && subscribed) {
+ if (queue->isAutoDelete()) {
// Make a temporary shared_ptr to prevent premature deletion of queue.
// Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
// which could delete the queue while it's still running it's destroyed logic.
boost::shared_ptr<Queue> q(queue);
- q->releaseFromUse();
- q->scheduleAutoDelete();
+ // See markInUse in ctor: release but don't delete if never used.
+ q->releaseFromUse(false/*controller*/, subscribed/*doDelete*/);
}
}
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index 8fe74ee959..8b19dc2ee0 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -64,8 +64,6 @@ class ReplicationTest
// Calculate level for objects that may not have replication set,
// including auto-delete/exclusive settings.
- ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const;
- ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const;
ReplicateLevel useLevel(const broker::Queue&) const;
ReplicateLevel useLevel(const broker::Exchange&) const;
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
index 26bb699565..e5712288dd 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
+++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
@@ -119,6 +119,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant&
nestAnnotations = value;
} else if (name == "set-to-on-send" || name == "set_to_on_send") {
setToOnSend = value;
+ } else if (name == "properties" || name == "client-properties" || name == "client_properties") {
+ properties = value.asMap();
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
index 877e541636..c8c8798b7b 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
+++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
@@ -47,6 +47,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings
std::string identifier;
bool nestAnnotations;
bool setToOnSend;
+ std::map<std::string, qpid::types::Variant> properties;
QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index 3dbacac11f..763deb33c6 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -18,6 +18,8 @@
* under the License.
*
*/
+
+#include "PnData.h"
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/AddressImpl.h"
@@ -33,6 +35,7 @@ extern "C" {
#include <proton/engine.h>
}
+
namespace qpid {
namespace messaging {
namespace amqp {
@@ -239,179 +242,6 @@ bool replace(Variant::Map& map, const std::string& original, const std::string&
}
}
-void write(pn_data_t* data, const Variant& value);
-
-void write(pn_data_t* data, const Variant::Map& map)
-{
- pn_data_put_map(data);
- pn_data_enter(data);
- for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
- pn_data_put_string(data, convert(i->first));
- write(data, i->second);
- }
- pn_data_exit(data);
-}
-void write(pn_data_t* data, const Variant::List& list)
-{
- pn_data_put_list(data);
- pn_data_enter(data);
- for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- write(data, *i);
- }
- pn_data_exit(data);
-}
-void write(pn_data_t* data, const Variant& value)
-{
- switch (value.getType()) {
- case qpid::types::VAR_VOID:
- pn_data_put_null(data);
- break;
- case qpid::types::VAR_BOOL:
- pn_data_put_bool(data, value.asBool());
- break;
- case qpid::types::VAR_UINT64:
- pn_data_put_ulong(data, value.asUint64());
- break;
- case qpid::types::VAR_INT64:
- pn_data_put_long(data, value.asInt64());
- break;
- case qpid::types::VAR_DOUBLE:
- pn_data_put_double(data, value.asDouble());
- break;
- case qpid::types::VAR_STRING:
- pn_data_put_string(data, convert(value.asString()));
- break;
- case qpid::types::VAR_MAP:
- write(data, value.asMap());
- break;
- case qpid::types::VAR_LIST:
- write(data, value.asList());
- break;
- default:
- break;
- }
-}
-bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value);
-bool read(pn_data_t* data, qpid::types::Variant& value)
-{
- return read(data, pn_data_type(data), value);
-}
-void readList(pn_data_t* data, qpid::types::Variant::List& value)
-{
- size_t count = pn_data_get_list(data);
- pn_data_enter(data);
- for (size_t i = 0; i < count && pn_data_next(data); ++i) {
- qpid::types::Variant e;
- if (read(data, e)) value.push_back(e);
- }
- pn_data_exit(data);
-}
-void readMap(pn_data_t* data, qpid::types::Variant::Map& value)
-{
- size_t count = pn_data_get_list(data);
- pn_data_enter(data);
- for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
- std::string key = convert(pn_data_get_symbol(data));
- pn_data_next(data);
- qpid::types::Variant e;
- if (read(data, e)) value[key]= e;
- }
- pn_data_exit(data);
-}
-void readArray(pn_data_t* data, qpid::types::Variant::List& value)
-{
- size_t count = pn_data_get_array(data);
- pn_type_t type = pn_data_get_array_type(data);
- pn_data_enter(data);
- for (size_t i = 0; i < count && pn_data_next(data); ++i) {
- qpid::types::Variant e;
- if (read(data, type, e)) value.push_back(e);
- }
- pn_data_exit(data);
-}
-bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value)
-{
- switch (type) {
- case PN_NULL:
- if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
- return true;
- case PN_BOOL:
- value = pn_data_get_bool(data);
- return true;
- case PN_UBYTE:
- value = pn_data_get_ubyte(data);
- return true;
- case PN_BYTE:
- value = pn_data_get_byte(data);
- return true;
- case PN_USHORT:
- value = pn_data_get_ushort(data);
- return true;
- case PN_SHORT:
- value = pn_data_get_short(data);
- return true;
- case PN_UINT:
- value = pn_data_get_uint(data);
- return true;
- case PN_INT:
- value = pn_data_get_int(data);
- return true;
- case PN_CHAR:
- value = pn_data_get_char(data);
- return true;
- case PN_ULONG:
- value = pn_data_get_ulong(data);
- return true;
- case PN_LONG:
- value = pn_data_get_long(data);
- return true;
- case PN_TIMESTAMP:
- value = pn_data_get_timestamp(data);
- return true;
- case PN_FLOAT:
- value = pn_data_get_float(data);
- return true;
- case PN_DOUBLE:
- value = pn_data_get_double(data);
- return true;
- case PN_UUID:
- value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
- return true;
- case PN_BINARY:
- value = convert(pn_data_get_binary(data));
- value.setEncoding(qpid::types::encodings::BINARY);
- return true;
- case PN_STRING:
- value = convert(pn_data_get_string(data));
- value.setEncoding(qpid::types::encodings::UTF8);
- return true;
- case PN_SYMBOL:
- value = convert(pn_data_get_string(data));
- value.setEncoding(qpid::types::encodings::ASCII);
- return true;
- case PN_LIST:
- value = qpid::types::Variant::List();
- readList(data, value.asList());
- return true;
- break;
- case PN_MAP:
- value = qpid::types::Variant::Map();
- readMap(data, value.asMap());
- return true;
- case PN_ARRAY:
- value = qpid::types::Variant::List();
- readArray(data, value.asList());
- return true;
- case PN_DESCRIBED:
- case PN_DECIMAL32:
- case PN_DECIMAL64:
- case PN_DECIMAL128:
- default:
- return false;
- }
-
-}
-
const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
const uint32_t DEFAULT_TIMEOUT(0);
}
@@ -680,9 +510,9 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
requested.erase(j->first);
}
} else if (key == AUTO_DELETE) {
- read(data, v);
+ PnData(data).read(v);
isAutoDeleted = v.asBool();
- } else if (j != requested.end() && (read(data, v) && v.asString() == j->second.asString())) {
+ } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) {
requested.erase(j->first);
}
}
@@ -815,7 +645,7 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod
} else {
pn_data_put_ulong(filter, i->descriptorCode);
}
- write(filter, i->value);
+ PnData(filter).write(i->value);
pn_data_exit(filter);
}
pn_data_exit(filter);
@@ -902,7 +732,7 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
putLifetimePolicy(data, toLifetimePolicy(i->second.asString()));
} else {
pn_data_put_symbol(data, convert(i->first));
- write(data, i->second);
+ PnData(data).write(i->second);
}
}
pn_data_exit(data);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index b6a72fd204..68a8f85f82 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -20,6 +20,7 @@
*/
#include "ConnectionContext.h"
#include "DriverImpl.h"
+#include "PnData.h"
#include "ReceiverContext.h"
#include "Sasl.h"
#include "SenderContext.h"
@@ -49,6 +50,8 @@ extern "C" {
namespace qpid {
namespace messaging {
namespace amqp {
+using types::Variant;
+
namespace {
//remove conditional when 0.5 is no longer supported
@@ -872,13 +875,6 @@ namespace {
const std::string CLIENT_PROCESS_NAME("qpid.client_process");
const std::string CLIENT_PID("qpid.client_pid");
const std::string CLIENT_PPID("qpid.client_ppid");
-pn_bytes_t convert(const std::string& s)
-{
- pn_bytes_t result;
- result.start = const_cast<char*>(s.data());
- result.size = s.size();
- return result;
-}
}
void ConnectionContext::setProperties()
{
@@ -886,15 +882,21 @@ void ConnectionContext::setProperties()
pn_data_put_map(data);
pn_data_enter(data);
- pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+ pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME));
std::string processName = sys::SystemInfo::getProcessName();
- pn_data_put_string(data, convert(processName));
+ pn_data_put_string(data, PnData::str(processName));
- pn_data_put_symbol(data, convert(CLIENT_PID));
+ pn_data_put_symbol(data, PnData::str(CLIENT_PID));
pn_data_put_int(data, sys::SystemInfo::getProcessId());
- pn_data_put_symbol(data, convert(CLIENT_PPID));
+ pn_data_put_symbol(data, PnData::str(CLIENT_PPID));
pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
+
+ for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i)
+ {
+ pn_data_put_symbol(data, PnData::str(i->first));
+ PnData(data).write(i->second);
+ }
pn_data_exit(data);
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
new file mode 100644
index 0000000000..5c57c5b0a3
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PnData.h"
+#include "qpid/types/encodings.h"
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+using types::Variant;
+
+void PnData::write(const Variant::Map& map)
+{
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+ pn_data_put_string(data, str(i->first));
+ write(i->second);
+ }
+ pn_data_exit(data);
+}
+void PnData::write(const Variant::List& list)
+{
+ pn_data_put_list(data);
+ pn_data_enter(data);
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ write(*i);
+ }
+ pn_data_exit(data);
+}
+void PnData::write(const Variant& value)
+{
+ switch (value.getType()) {
+ case qpid::types::VAR_VOID:
+ pn_data_put_null(data);
+ break;
+ case qpid::types::VAR_BOOL:
+ pn_data_put_bool(data, value.asBool());
+ break;
+ case qpid::types::VAR_UINT64:
+ pn_data_put_ulong(data, value.asUint64());
+ break;
+ case qpid::types::VAR_INT64:
+ pn_data_put_long(data, value.asInt64());
+ break;
+ case qpid::types::VAR_DOUBLE:
+ pn_data_put_double(data, value.asDouble());
+ break;
+ case qpid::types::VAR_STRING:
+ pn_data_put_string(data, str(value.asString()));
+ break;
+ case qpid::types::VAR_MAP:
+ write(value.asMap());
+ break;
+ case qpid::types::VAR_LIST:
+ write(value.asList());
+ break;
+ default:
+ break;
+ }
+}
+
+bool PnData::read(qpid::types::Variant& value)
+{
+ return read(pn_data_type(data), value);
+}
+
+void PnData::readList(qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+
+void PnData::readMap(qpid::types::Variant::Map& value)
+{
+ size_t count = pn_data_get_list(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
+ std::string key = str(pn_data_get_symbol(data));
+ pn_data_next(data);
+ qpid::types::Variant e;
+ if (read(e)) value[key]= e;
+ }
+ pn_data_exit(data);
+}
+
+void PnData::readArray(qpid::types::Variant::List& value)
+{
+ size_t count = pn_data_get_array(data);
+ pn_type_t type = pn_data_get_array_type(data);
+ pn_data_enter(data);
+ for (size_t i = 0; i < count && pn_data_next(data); ++i) {
+ qpid::types::Variant e;
+ if (read(type, e)) value.push_back(e);
+ }
+ pn_data_exit(data);
+}
+
+bool PnData::read(pn_type_t type, qpid::types::Variant& value)
+{
+ switch (type) {
+ case PN_NULL:
+ if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant();
+ return true;
+ case PN_BOOL:
+ value = pn_data_get_bool(data);
+ return true;
+ case PN_UBYTE:
+ value = pn_data_get_ubyte(data);
+ return true;
+ case PN_BYTE:
+ value = pn_data_get_byte(data);
+ return true;
+ case PN_USHORT:
+ value = pn_data_get_ushort(data);
+ return true;
+ case PN_SHORT:
+ value = pn_data_get_short(data);
+ return true;
+ case PN_UINT:
+ value = pn_data_get_uint(data);
+ return true;
+ case PN_INT:
+ value = pn_data_get_int(data);
+ return true;
+ case PN_CHAR:
+ value = pn_data_get_char(data);
+ return true;
+ case PN_ULONG:
+ value = pn_data_get_ulong(data);
+ return true;
+ case PN_LONG:
+ value = pn_data_get_long(data);
+ return true;
+ case PN_TIMESTAMP:
+ value = pn_data_get_timestamp(data);
+ return true;
+ case PN_FLOAT:
+ value = pn_data_get_float(data);
+ return true;
+ case PN_DOUBLE:
+ value = pn_data_get_double(data);
+ return true;
+ case PN_UUID:
+ value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
+ return true;
+ case PN_BINARY:
+ value = str(pn_data_get_binary(data));
+ value.setEncoding(qpid::types::encodings::BINARY);
+ return true;
+ case PN_STRING:
+ value = str(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::UTF8);
+ return true;
+ case PN_SYMBOL:
+ value = str(pn_data_get_string(data));
+ value.setEncoding(qpid::types::encodings::ASCII);
+ return true;
+ case PN_LIST:
+ value = qpid::types::Variant::List();
+ readList(value.asList());
+ return true;
+ break;
+ case PN_MAP:
+ value = qpid::types::Variant::Map();
+ readMap(value.asMap());
+ return true;
+ case PN_ARRAY:
+ value = qpid::types::Variant::List();
+ readArray(value.asList());
+ return true;
+ case PN_DESCRIBED:
+ case PN_DECIMAL32:
+ case PN_DECIMAL64:
+ case PN_DECIMAL128:
+ default:
+ return false;
+ }
+
+}
+
+pn_bytes_t PnData::str(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+
+std::string PnData::str(const pn_bytes_t& in)
+{
+ return std::string(in.start, in.size);
+}
+
+}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h
new file mode 100644
index 0000000000..6d03235432
--- /dev/null
+++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h
@@ -0,0 +1,60 @@
+#ifndef QPID_MESSAGING_AMQP_PNDATA_H
+#define QPID_MESSAGING_AMQP_PNDATA_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Variant.h"
+extern "C" {
+#include <proton/engine.h>
+}
+
+namespace qpid {
+namespace messaging {
+namespace amqp {
+
+/**
+ * Helper class to read/write messaging types to/from pn_data_t.
+ */
+class PnData
+{
+ public:
+ PnData(pn_data_t* d) : data(d) {}
+
+ void write(const types::Variant& value);
+ void write(const types::Variant::Map& map);
+ void write(const types::Variant::List& list);
+
+ bool read(pn_type_t type, types::Variant& value);
+ bool read(types::Variant& value);
+ void readList(types::Variant::List& value);
+ void readMap(types::Variant::Map& value);
+ void readArray(types::Variant::List& value);
+
+ static pn_bytes_t str(const std::string&);
+ static std::string str(const pn_bytes_t&);
+
+ private:
+ pn_data_t* data;
+};
+}}} // namespace messaging::amqp
+
+#endif /*!QPID_MESSAGING_AMQP_PNDATA_H*/
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 4b6820e4fd..4e5c255a1a 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -21,19 +21,47 @@
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
-from qpid import connection, messaging, util
+from qpid import connection, util
from qpid.compat import format_exc
from qpid.harness import Skipped
from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
+from qpidtoollibs import BrokerAgent
-try: import qmf.console
-except: print "Cannot import module qmf.console, skipping tests"; exit(0);
+# NOTE: Always import native client qpid.messaging, import swigged client
+# qpid_messaging if possible. qpid_messaing is set to None if not available.
+#
+# qm is set to qpid_messaging if it is available, qpid.messaging if not.
+# Use qm.X to specify names from the default messaging module.
+#
+# Set environment variable QPID_PY_NO_SWIG=1 to prevent qpid_messaging from loading.
+#
+# BrokerTest can be configured to determine which protocol is used by default:
+#
+# -DPROTOCOL="amqpX": Use protocol "amqpX". Defaults to amqp1.0 if swig client
+# is being used, amqp0-10 if native client is being used.
+#
+# The configured defaults can be over-ridden on BrokerTest.connect and some
+# other methods by specifying native=True|False and protocol="amqpX"
+#
+import qpid.messaging
+qm = qpid.messaging
+qpid_messaging = None
+if not os.environ.get("QPID_PY_NO_SWIG"):
+ try:
+ import qpid_messaging
+ from qpid.datatypes import uuid4
+ qm = qpid_messaging
+ # Silence warnings from swigged messaging library unless enabled in environment.
+ if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+ qm.Logger.configure(["--log-enable=error"])
+ except ImportError:
+ print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
-log = getLogger("qpid.brokertest")
+log = getLogger("brokertest")
# Values for expected outcome of process at end of test
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
@@ -149,7 +177,7 @@ class Popen(subprocess.Popen):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
- def stop(self): # Clean up at end of test.
+ def teardown(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
@@ -253,14 +281,16 @@ class Broker(Popen):
self.test = test
self._port=port
+ args = copy(args)
+ if BrokerTest.amqp_lib: args += ["--load-module", BrokerTest.amqp_lib]
if BrokerTest.store_lib and not test_store:
- args = args + ['--load-module', BrokerTest.store_lib]
+ args += ['--load-module', BrokerTest.store_lib]
if BrokerTest.sql_store_lib:
- args = args + ['--load-module', BrokerTest.sql_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
+ args += ['--load-module', BrokerTest.sql_store_lib]
+ args += ['--catalog', BrokerTest.sql_catalog]
if BrokerTest.sql_clfs_store_lib:
- args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
- args = args + ['--catalog', BrokerTest.sql_catalog]
+ args += ['--load-module', BrokerTest.sql_clfs_store_lib]
+ args += ['--catalog', BrokerTest.sql_catalog]
cmd = [BrokerTest.qpidd_exec, "--port", port, "--interface", "127.0.0.1", "--no-module-dir"] + args
if not "--auth" in args: cmd.append("--auth=no")
if wait != None:
@@ -288,13 +318,11 @@ class Broker(Popen):
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
Popen.__init__(self, cmd, expect, stdout=PIPE)
- test.cleanup_stop(self)
+ test.teardown_add(self)
self._host = "127.0.0.1"
- log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ self._agent = None
- def startQmf(self, handler=None):
- self.qmf_session = qmf.console.Session(handler)
- self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+ log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
def host(self): return self._host
@@ -310,22 +338,25 @@ class Broker(Popen):
def unexpected(self,msg):
raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
- def connect(self, timeout=5, **kwargs):
- """New API connection to the broker."""
- return messaging.Connection.establish(self.host_port(), timeout=timeout, **kwargs)
+ def connect(self, timeout=5, native=False, **kwargs):
+ """New API connection to the broker.
+ @param native if True force use of the native qpid.messaging client
+ even if swig client is available.
+ """
+ if self.test.protocol: kwargs.setdefault("protocol", self.test.protocol)
+ if native: connection_class = qpid.messaging.Connection
+ else: connection_class = qm.Connection
+ return connection_class.establish(self.host_port(), timeout=timeout, **kwargs)
+
+ @property
+ def agent(self, **kwargs):
+ """Return a BrokerAgent for this broker"""
+ if not self._agent: self._agent = BrokerAgent(self.connect(**kwargs))
+ return self._agent
- def connect_old(self):
- """Old API connection to the broker."""
- socket = qpid.util.connect(self.host(),self.port())
- connection = qpid.connection.Connection (sock=socket)
- connection.start()
- return connection;
def declare_queue(self, queue):
- c = self.connect_old()
- s = c.session(str(qpid.datatypes.uuid4()))
- s.queue_declare(queue=queue)
- c.close()
+ self.agent.addQueue(queue)
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
@@ -402,7 +433,7 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content):
contents = []
try:
while True: contents.append(transform(r.fetch(timeout=timeout)))
- except messaging.Empty: pass
+ except qm.Empty: pass
finally: r.close()
return contents
@@ -451,30 +482,42 @@ class BrokerTest(TestCase):
def configure(self, config): self.config=config
def setUp(self):
- outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
+ defs = self.config.defines
+ outdir = defs.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
os.makedirs(self.dir)
os.chdir(self.dir)
- self.stopem = [] # things to stop at end of test
+ self.teardown_list = [] # things to tear down at end of test
+
+ self.protocol = defs.get("PROTOCOL") or ("amqp1.0" if qpid_messaging else "amqp0-10")
+ self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
+
def tearDown(self):
err = []
- for p in self.stopem:
- try: p.stop()
- except Exception, e: err.append(str(e))
- self.stopem = [] # reset in case more processes start
+ self.teardown_list.reverse() # Tear down in reverse order
+ for p in self.teardown_list:
+ log.debug("Tearing down %s", p)
+ try:
+ # Call the first of the methods that is available on p.
+ for m in ["teardown", "close"]:
+ a = getattr(p, m, None)
+ if a: a(); break
+ else: raise Exception("Don't know how to tear down %s", p)
+ except Exception, e: err.append("%s: %s"%(e.__class__.__name__, str(e)))
+ self.teardown_list = [] # reset in case more processes start
os.chdir(self.rootdir)
if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
- def cleanup_stop(self, stopable):
- """Call thing.stop at end of test"""
- self.stopem.append(stopable)
+ def teardown_add(self, thing):
+ """Call thing.teardown() or thing.close() at end of test"""
+ self.teardown_list.append(thing)
def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
- self.cleanup_stop(p)
+ self.teardown_add(p)
return p
def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
@@ -490,6 +533,11 @@ class BrokerTest(TestCase):
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
+ def protocol_option(self, connection_options=""):
+ if "protocol" in connection_options: return connection_options
+ else: return ",".join(filter(None, [connection_options,"protocol:'%s'"%self.protocol]))
+
+
def join(thread, timeout=30):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
@@ -524,7 +572,7 @@ class NumberedSender(Thread):
def __init__(self, broker, max_depth=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None, args=[]):
+ failover_updates=False, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -533,7 +581,7 @@ class NumberedSender(Thread):
cmd = ["qpid-send",
"--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
- "--connection-options", "{%s}"%(connection_options),
+ "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
"--content-stdin"
] + args
if failover_updates: cmd += ["--failover-updates"]
@@ -592,7 +640,7 @@ class NumberedReceiver(Thread):
"""
def __init__(self, broker, sender=None, queue="test-queue",
connection_options=RECONNECT_OPTIONS,
- failover_updates=True, url=None, args=[]):
+ failover_updates=False, url=None, args=[]):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -601,7 +649,7 @@ class NumberedReceiver(Thread):
cmd = ["qpid-receive",
"--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
- "--connection-options", "{%s}"%(connection_options),
+ "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)),
"--forever"
]
if failover_updates: cmd += [ "--failover-updates" ]
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 0f92f7dbcc..2bf8677cd1 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -20,8 +20,6 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
-from qpid.datatypes import uuid4, UUID
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
@@ -44,7 +42,7 @@ class LogLevel:
class QmfAgent(object):
"""Access to a QMF broker agent."""
def __init__(self, address, **kwargs):
- self._connection = Connection.establish(
+ self._connection = qm.Connection.establish(
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
@@ -105,9 +103,9 @@ class HaPort:
self.port = self.socket.getsockname()[1]
self.fileno = self.socket.fileno()
self.stopped = False
- test.cleanup_stop(self) # Stop during test.tearDown
+ test.teardown_add(self) # Stop during test.tearDown
- def stop(self): # Called in tearDown
+ def teardown(self): # Called in tearDown
if not self.stopped:
self.stopped = True
self.socket.shutdown(socket.SHUT_RDWR)
@@ -180,6 +178,7 @@ acl allow all all
def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]);
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ @property
def agent(self):
if not self._agent:
cred = self.client_credentials
@@ -190,7 +189,7 @@ acl allow all all
return self._agent
def qmf(self):
- hb = self.agent().getHaBroker()
+ hb = self.agent.getHaBroker()
hb.update()
return hb
@@ -203,19 +202,19 @@ acl allow all all
try:
self._status = self.ha_status()
return self._status == status;
- except ConnectionError: return False
+ except qm.ConnectionError: return False
assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
self, status, self._status)
- def wait_queue(self, queue, timeout=1):
+ def wait_queue(self, queue, timeout=1, msg="wait_queue"):
""" Wait for queue to be visible via QMF"""
- agent = self.agent()
- assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout)
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue
- def wait_no_queue(self, queue, timeout=1):
+ def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"):
""" Wait for queue to be invisible via QMF"""
- agent = self.agent()
- assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout)
+ agent = self.agent
+ assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
# TODO aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -273,12 +272,12 @@ acl allow all all
def assert_connect_fail(self):
try:
self.connect()
- self.test.fail("Expected ConnectionError")
- except ConnectionError: pass
+ self.test.fail("Expected qm.ConnectionError")
+ except qm.ConnectionError: pass
def try_connect(self):
try: return self.connect()
- except ConnectionError: return None
+ except qm.ConnectionError: return None
def ready(self, *args, **kwargs):
if not 'client_properties' in kwargs: kwargs['client_properties'] = {}
@@ -286,7 +285,7 @@ acl allow all all
return Broker.ready(self, *args, **kwargs)
def kill(self, final=True):
- if final: self.ha_port.stop()
+ if final: self.ha_port.teardown()
self._agent = None
return Broker.kill(self)
@@ -355,9 +354,9 @@ class HaCluster(object):
b.set_brokers_url(self.url)
b.set_public_url(self.url)
- def connect(self, i):
+ def connect(self, i, **kwargs):
"""Connect with reconnect_urls"""
- return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs)
def kill(self, i, promote_next=True, final=True):
"""Kill broker i, promote broker i+1"""
@@ -393,7 +392,7 @@ def wait_address(session, address):
"""Wait for an address to become valid."""
def check():
try: session.sender(address); return True
- except NotFound: return False
+ except qm.NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
def valid_address(session, address):
@@ -401,6 +400,6 @@ def valid_address(session, address):
try:
session.receiver(address)
return True
- except NotFound: return False
+ except qm.NotFound: return False
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index f22e12a355..6ac7888f93 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -20,7 +19,6 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
-from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError
from qpid.datatypes import uuid4, UUID
from qpid.harness import Skipped
from brokertest import *
@@ -31,18 +29,9 @@ from qpidtoollibs import BrokerAgent, EventHelper
log = getLogger(__name__)
-
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
-def alt_setup(session, suffix):
- # Create exchange to use as alternate and a queue bound to it.
- # altex exchange: acts as alternate exchange
- session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
- # altq queue bound to altex, collect re-routed messages.
- session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
-
-
class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
@@ -50,38 +39,46 @@ class ReplicationTests(HaBrokerTest):
"""Test basic replication of configuration and messages before and
after backup has connected"""
- def queue(name, replicate):
- return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
+ def setup(prefix, primary):
+ """Create config, send messages on the primary p"""
+ a = primary.agent
+
+ def queue(name, replicate):
+ a.addQueue(name, options={'qpid.replicate':replicate})
+ return name
- def exchange(name, replicate, bindq, key):
- return "%s/%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'topic'},x-bindings:[{exchange:'%s',queue:'%s',key:'%s'}]}}"%(name, key, replicate, name, bindq, key)
+ def exchange(name, replicate, bindq, key):
+ a.addExchange("fanout", name, options={'qpid.replicate':replicate})
+ a.bind(name, bindq, key)
+ return name
- def setup(p, prefix, primary):
- """Create config, send messages on the primary p"""
+ # Test replication of messages
+ p = primary.connect().session()
s = p.sender(queue(prefix+"q1", "all"))
- for m in ["a", "b", "1"]: s.send(Message(m))
+ for m in ["a", "b", "1"]: s.send(qm.Message(m))
# Test replication of dequeue
self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
p.acknowledge()
- p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
- p.sender(queue(prefix+"q3", "none")).send(Message("3"))
- p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(Message("4"))
- p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(Message("5"))
+
+ p.sender(queue(prefix+"q2", "configuration")).send(qm.Message("2"))
+ p.sender(queue(prefix+"q3", "none")).send(qm.Message("3"))
+ p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(qm.Message("4"))
+ p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(qm.Message("5"))
# Test unbind
- p.sender(queue(prefix+"q4", "all")).send(Message("6"))
+ p.sender(queue(prefix+"q4", "all")).send(qm.Message("6"))
s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4"))
- s3.send(Message("7"))
- # Use old connection to unbind
- us = primary.connect_old().session(str(uuid4()))
- us.exchange_unbind(exchange=prefix+"e4", binding_key="key4", queue=prefix+"q4")
- p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
+ s3.send(qm.Message("7"))
+ a.unbind(prefix+"e4", prefix+"q4", "key4")
+ p.sender(prefix+"e4").send(qm.Message("drop1")) # Should be dropped
+
# Test replication of deletes
- p.sender(queue(prefix+"dq", "all"))
- p.sender(exchange(prefix+"de", "all", prefix+"dq", ""))
- p.sender(prefix+"dq;{delete:always}").close()
- p.sender(prefix+"de;{delete:always}").close()
+ queue(prefix+"dq", "all")
+ exchange(prefix+"de", "all", prefix+"dq", "")
+ a.delQueue(prefix+"dq")
+ a.delExchange(prefix+"de")
+
# Need a marker so we can wait till sync is done.
- p.sender(queue(prefix+"x", "configuration"))
+ queue(prefix+"x", "configuration")
def verify(b, prefix, p):
"""Verify setup was replicated to backup b"""
@@ -97,14 +94,14 @@ class ReplicationTests(HaBrokerTest):
assert not valid_address(b, prefix+"q3")
# Verify exchange with replicate=all
- b.sender(prefix+"e1/key1").send(Message(prefix+"e1"))
+ b.sender(prefix+"e1/key1").send(qm.Message(prefix+"e1"))
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
# Verify exchange with replicate=configuration
- b.sender(prefix+"e2/key2").send(Message(prefix+"e2"))
+ b.sender(prefix+"e2/key2").send(qm.Message(prefix+"e2"))
self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
- b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
+ b.sender(prefix+"e4/key4").send(qm.Message("drop2")) # Verify unbind.
self.assert_browse_retry(b, prefix+"q4", ["6","7"])
# Verify deletes
@@ -117,25 +114,26 @@ class ReplicationTests(HaBrokerTest):
primary = cluster[0]
backup = cluster[1]
- p = primary.connect().session()
-
# Send messages before re-starting the backup, test catch-up replication.
cluster.kill(1, promote_next=False, final=False)
- setup(p, "1", primary)
+ setup("1", primary)
cluster.restart(1)
# Send messages after re-starting the backup, to test steady-state replication.
- setup(p, "2", primary)
+ setup("2", primary)
+
+ p = primary.connect().session()
# Verify the data on the backup
b = backup.connect_admin().session()
verify(b, "1", p)
verify(b, "2", p)
# Test a series of messages, enqueue all then dequeue all.
- s = p.sender(queue("foo","all"))
+ primary.agent.addQueue("foo")
+ s = p.sender("foo")
wait_address(b, "foo")
msgs = [str(i) for i in range(10)]
- for m in msgs: s.send(Message(m))
+ for m in msgs: s.send(qm.Message(m))
self.assert_browse_retry(p, "foo", msgs)
self.assert_browse_retry(b, "foo", msgs)
r = p.receiver("foo")
@@ -145,7 +143,7 @@ class ReplicationTests(HaBrokerTest):
self.assert_browse_retry(b, "foo", [])
# Another series, this time verify each dequeue individually.
- for m in msgs: s.send(Message(m))
+ for m in msgs: s.send(qm.Message(m))
self.assert_browse_retry(p, "foo", msgs)
self.assert_browse_retry(b, "foo", msgs)
for i in range(len(msgs)):
@@ -187,14 +185,16 @@ class ReplicationTests(HaBrokerTest):
"--broker", brokers[0].host_port(),
"--address", "q;{create:always}",
"--messages=1000",
- "--content-string=x"
+ "--content-string=x",
+ "--connection-options={%s}"%self.protocol_option()
])
receiver = self.popen(
["qpid-receive",
"--broker", brokers[0].host_port(),
"--address", "q;{create:always}",
"--messages=990",
- "--timeout=10"
+ "--timeout=10",
+ "--connection-options={%s}"%self.protocol_option()
])
self.assertEqual(sender.wait(), 0)
self.assertEqual(receiver.wait(), 0)
@@ -215,7 +215,7 @@ class ReplicationTests(HaBrokerTest):
try:
backup.connect().session()
self.fail("Expected connection to backup to fail")
- except ConnectionError: pass
+ except qm.ConnectionError: pass
# Check that admin connections are allowed to backup.
backup.connect_admin().close()
@@ -224,8 +224,8 @@ class ReplicationTests(HaBrokerTest):
reconnect=True)
s = c.session()
sender = s.sender("q;{create:always}")
- backup.wait_backup("q")
- sender.send("foo")
+ sender.send("foo", sync=True)
+ s.sync()
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
@@ -243,31 +243,36 @@ class ReplicationTests(HaBrokerTest):
broker_addr = broker.host_port()
# Case 1: Connect before stalling the broker, use the connection after stalling.
- c = Connection(broker_addr, heartbeat=1)
+ c = qm.Connection(broker_addr, heartbeat=1)
c.open()
os.kill(broker.pid, signal.SIGSTOP) # Stall the broker
- self.assertRaises(ConnectionError, c.session().sender, "foo")
+
+ def make_sender(): c.session().sender("foo")
+ self.assertRaises(qm.ConnectionError, make_sender)
# Case 2: Connect to a stalled broker
- c = Connection(broker_addr, heartbeat=1)
- self.assertRaises(ConnectionError, c.open)
+ c = qm.Connection(broker_addr, heartbeat=1)
+ self.assertRaises(qm.ConnectionError, c.open)
# Case 3: Re-connect to a stalled broker.
broker2 = Broker(self)
- c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
- reconnect=True, reconnect_urls=[broker_addr],
- reconnect_log=False) # Hide expected warnings
+ c = qm.Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
+ reconnect=True, reconnect_urls=[broker_addr],
+ reconnect_log=False) # Hide expected warnings
c.open()
broker2.kill() # Cause re-connection to broker
- self.assertRaises(ConnectionError, c.session().sender, "foo")
+ self.assertRaises(qm.ConnectionError, make_sender)
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
cluster = HaCluster(self, 2)
cluster[0].connect().session().sender("q;{create:always}")
cluster[1].wait_backup("q")
- sender = NumberedSender(cluster[0], url=cluster.url, queue="q", failover_updates = False)
- receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q", failover_updates = False)
+ # FIXME aconway 2014-02-21: using 0-10, there is a failover problem with 1.0
+ sender = NumberedSender(cluster[0], url=cluster.url, queue="q",
+ connection_options="reconnect:true,protocol:'amqp0-10'")
+ receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q",
+ connection_options="reconnect:true,protocol:'amqp0-10'")
receiver.start()
sender.start()
assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
@@ -330,7 +335,7 @@ class ReplicationTests(HaBrokerTest):
# Set up replication with qpid-config
ps2 = pc.session().sender("q2;{create:always}")
backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x", timeout=1)
+ ps2.send("x")
backup.assert_browse_backup("q2", ["x"])
@@ -349,6 +354,7 @@ class ReplicationTests(HaBrokerTest):
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
+ ps.sync()
backup.assert_browse_backup("q", ["a"])
cluster.bounce(0)
backup.assert_browse_backup("q", ["a"])
@@ -356,6 +362,8 @@ class ReplicationTests(HaBrokerTest):
backup.assert_browse_backup("q", ["a", "b"])
cluster[0].wait_status("ready")
cluster.bounce(1)
+ # FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig
+ if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug")
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
backup.assert_browse_backup("q", ["b"])
@@ -369,7 +377,7 @@ class ReplicationTests(HaBrokerTest):
s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
def send(key,value,expect):
- s.send(Message(content=value,properties={"lvq-key":key}), timeout=1)
+ s.send(qm.Message(content=value,properties={"lvq-key":key}))
cluster[1].assert_browse_backup("lvq", expect)
send("a", "a-1", ["a-1"])
@@ -387,7 +395,7 @@ class ReplicationTests(HaBrokerTest):
"""Verify that we replicate to an LVQ correctly"""
cluster = HaCluster(self, 2)
s = cluster[0].connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
- for i in range(10): s.send(Message(str(i)))
+ for i in range(10): s.send(qm.Message(str(i)))
cluster[1].assert_browse_backup("q", [str(i) for i in range(5,10)])
def test_reject(self):
@@ -396,11 +404,11 @@ class ReplicationTests(HaBrokerTest):
primary, backup = cluster
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
try:
- for i in range(10): s.send(Message(str(i)), sync=False)
- except qpid.messaging.exceptions.TargetCapacityExceeded: pass
+ for i in range(10): s.send(qm.Message(str(i)), sync=False)
+ except qm.TargetCapacityExceeded: pass
backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
- # Detach, don't close as there is a broken session
- s.session.connection.detach()
+ try: s.session.connection.close()
+ except: pass # Expect exception from broken session
def test_priority(self):
"""Verify priority queues replicate correctly"""
@@ -408,7 +416,7 @@ class ReplicationTests(HaBrokerTest):
session = cluster[0].connect().session()
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
- for p in priorities: s.send(Message(priority=p))
+ for p in priorities: s.send(qm.Message(priority=p))
# Can't use browse_backup as browser sees messages in delivery order not priority.
cluster[1].wait_backup("priority-queue")
r = cluster[1].connect_admin().session().receiver("priority-queue")
@@ -425,7 +433,7 @@ class ReplicationTests(HaBrokerTest):
limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
- messages = [Message(content=str(uuid4()), priority = p) for p in priorities]
+ messages = [qm.Message(content=str(uuid4()), priority = p) for p in priorities]
for m in messages: s.send(m)
backup.wait_backup(s.target)
r = backup.connect_admin().session().receiver("priority-queue")
@@ -439,7 +447,7 @@ class ReplicationTests(HaBrokerTest):
primary, backup = cluster
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
- for p in priorities: s.send(Message(priority=p))
+ for p in priorities: s.send(qm.Message(priority=p))
expect = sorted(priorities,reverse=True)[0:5]
primary.assert_browse("q", expect, transform=lambda m: m.priority)
backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
@@ -456,8 +464,8 @@ class ReplicationTests(HaBrokerTest):
def send(self, connection):
"""Send messages, then acquire one but don't acknowledge"""
s = connection.session()
- for m in range(10): s.sender(self.address).send(str(m), timeout=1)
- s.receiver(self.address, timeout=1).fetch()
+ for m in range(10): s.sender(self.address).send(str(m))
+ s.receiver(self.address).fetch()
def verify(self, brokertest, backup):
backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
@@ -492,38 +500,33 @@ class ReplicationTests(HaBrokerTest):
cluster2[1].wait_status("ready")
cluster2[0].connect().session().sender("q;{create:always}")
time.sleep(.1) # Give replication a chance.
- try:
- cluster1[1].connect_admin().session().receiver("q")
- self.fail("Excpected no-such-queue exception")
- except NotFound: pass
- try:
- cluster2[1].connect_admin().session().receiver("q")
- self.fail("Excpected no-such-queue exception")
- except NotFound: pass
+ # Expect queues not to be found
+ self.assertRaises(qm.NotFound, cluster1[1].connect_admin().session().receiver, "q")
+ self.assertRaises(qm.NotFound, cluster2[1].connect_admin().session().receiver, "q")
def test_replicate_binding(self):
"""Verify that binding replication can be disabled"""
cluster = HaCluster(self, 2)
primary, backup = cluster[0], cluster[1]
ps = primary.connect().session()
- ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
- ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+ a = primary.agent
+ a.addExchange("fanout", "ex")
+ a.addQueue("q")
+ a.bind("ex", "q", options={'qpid.replicate':'none'})
backup.wait_backup("q")
primary.kill()
assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
backup.promote()
bs = backup.connect_admin().session()
- bs.sender("ex").send(Message("msg"))
+ bs.sender("ex").send(qm.Message("msg"))
self.assert_browse_retry(bs, "q", [])
def test_invalid_replication(self):
"""Verify that we reject an attempt to declare a queue with invalid replication value."""
cluster = HaCluster(self, 1, ha_replicate="all")
- try:
- c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
- self.fail("Expected ConnectionError")
- except ConnectionError: pass
+ self.assertRaises(Exception, cluster[0].connect().session().sender,
+ "q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
def test_exclusive_queue(self):
"""Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -533,12 +536,12 @@ class ReplicationTests(HaBrokerTest):
c = cluster[0].connect()
q = addr.split(";")[0]
r = c.session().receiver(addr)
- try: c.session().receiver(addr); self.fail("Expected exclusive exception")
- except ReceiverError: pass
+ self.assertRaises(qm.LinkError, c.session().receiver, addr)
s = c.session().sender(q).send(q)
cluster[1].assert_browse_backup(q, [q])
- test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+ if qm == qpid.messaging: # FIXME aconway 2014-02-20: swig client no exclusive subscribe
+ test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
def test_auto_delete_exclusive(self):
"""Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
@@ -568,12 +571,12 @@ class ReplicationTests(HaBrokerTest):
cluster = HaCluster(self, 3)
def ha_broker(broker):
- ha_broker = broker.agent().getHaBroker();
+ ha_broker = broker.agent.getHaBroker();
ha_broker.update()
return ha_broker
for broker in cluster: # Make sure HA system-id matches broker's
- self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef))
+ self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent.getBroker().systemRef))
# Check that all brokers have the same membership as the cluster
def check_ids(broker):
@@ -612,7 +615,6 @@ acl allow zag@QPID access method
acl allow zag@QPID create link
# Normal user
acl allow zig@QPID all all
-
acl deny all all
""")
aclf.close()
@@ -625,9 +627,14 @@ acl deny all all
client_credentials=Credentials("zag", "zag", "PLAIN"))
c = cluster[0].connect(username="zig", password="zig")
s0 = c.session();
- s0.sender("q;{create:always}")
- s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ a = cluster[0].agent
+ a.addQueue("q")
+ a.addExchange("fanout", "ex")
+ a.bind("ex", "q", "")
s0.sender("ex").send("foo");
+
+ # Transactions should be done over the tx_protocol
+ c = cluster[0].connect(protocol=self.tx_protocol, username="zig", password="zig")
s1 = c.session(transactional=True)
s1.sender("ex").send("foo-tx");
cluster[1].assert_browse_backup("q", ["foo"])
@@ -640,19 +647,22 @@ acl deny all all
cluster = HaCluster(self, 2)
s = cluster[0].connect().session()
# altex exchange: acts as alternate exchange
- s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ a = cluster[0].agent
+ a.addExchange("fanout", "altex")
# altq queue bound to altex, collect re-routed messages.
- s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ a.addQueue("altq")
+ a.bind("altex", "altq", "")
# ex exchange with alternate-exchange altex and no queues bound
- s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ a.addExchange("direct", "ex", {"alternate-exchange":"altex"})
# create queue q with alternate-exchange altex
- s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+ a.addQueue("q", {"alternate-exchange":"altex"})
# create a bunch of exchanges to ensure we don't clean up prematurely if the
# response comes in multiple fragments.
for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
def verify(broker):
- s = broker.connect().session()
+ c = broker.connect()
+ s = c.session()
# Verify unmatched message goes to ex's alternate.
s.sender("ex").send("foo")
altq = s.receiver("altq")
@@ -662,17 +672,16 @@ acl deny all all
s.sender("q").send("bar")
msg = s.receiver("q").fetch(timeout=0)
self.assertEqual("bar", msg.content)
- s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ s.acknowledge(msg, qm.Disposition(qm.REJECTED)) # Reject the message
self.assertEqual("bar", altq.fetch(timeout=0).content)
s.acknowledge()
-
- def ss(n): return cluster[n].connect().session()
+ c.close()
# Sanity check: alternate exchanges on original broker
verify(cluster[0])
- # Altex is in use as an alternate exchange.
- self.assertRaises(SessionError,
- lambda:ss(0).sender("altex;{delete:always}").close())
+ a = cluster[0].agent
+ # Altex is in use as an alternate exchange, we should get an exception
+ self.assertRaises(Exception, a.delExchange, "altex")
# Check backup that was connected during setup.
cluster[1].wait_status("ready")
cluster[1].wait_backup("ex")
@@ -689,15 +698,12 @@ acl deny all all
verify(cluster[2])
# Check that alt-exchange in-use count is replicated
- s = cluster[2].connect().session();
-
- self.assertRaises(SessionError,
- lambda:ss(2).sender("altex;{delete:always}").close())
- s.sender("q;{delete:always}").close()
- self.assertRaises(SessionError,
- lambda:ss(2).sender("altex;{delete:always}").close())
- s.sender("ex;{delete:always}").close()
- s.sender("altex;{delete:always}").close()
+ a = cluster[2].agent
+ self.assertRaises(Exception, a.delExchange, "altex")
+ a.delQueue("q")
+ self.assertRaises(Exception, a.delExchange, "altex")
+ a.delExchange("ex")
+ a.delExchange("altex")
def test_priority_reroute(self):
"""Regression test for QPID-4262, rerouting messages from a priority queue
@@ -705,8 +711,11 @@ acl deny all all
cluster = HaCluster(self, 2)
primary = cluster[0]
session = primary.connect().session()
- s = session.sender("pq; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}},x-bindings:[{exchange:'amq.fanout',queue:pq}]}}")
- for m in xrange(100): s.send(Message(str(m), priority=m%10))
+ a = primary.agent
+ a.addQueue("pq", {'qpid.priorities':10})
+ a.bind("amq.fanout", "pq")
+ s = session.sender("pq")
+ for m in xrange(100): s.send(qm.Message(str(m), priority=m%10))
pq = QmfAgent(primary.host_port()).getQueue("pq")
pq.reroute(request=0, useAltExchange=False, exchange="amq.fanout")
# Verify that consuming is in priority order
@@ -740,8 +749,8 @@ acl deny all all
# Verify the backup deletes the surplus queue and exchange
cluster[1].wait_status("ready")
s = cluster[1].connect_admin().session()
- self.assertRaises(NotFound, s.receiver, ("q2"));
- self.assertRaises(NotFound, s.receiver, ("e2"));
+ self.assertRaises(qm.NotFound, s.receiver, ("q2"));
+ self.assertRaises(qm.NotFound, s.receiver, ("e2"));
def test_delete_qpid_4285(self):
@@ -753,61 +762,72 @@ acl deny all all
cluster[1].wait_backup("q")
cluster.kill(0) # Make the backup take over.
s = cluster[1].connect().session()
- s.receiver("q;{delete:always}").close() # Delete q on new primary
- try:
- s.receiver("q")
- self.fail("Expected NotFound exception") # Should not be avaliable
- except NotFound: pass
- assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+ cluster[1].agent.delQueue("q") # Delete q on new primary
+ self.assertRaises(qm.NotFound, s.receiver, "q")
+ assert not cluster[1].agent.getQueue("q") # Should not be in QMF
def test_auto_delete_failover(self):
"""Test auto-delete queues. Verify that:
- queues auto-deleted on the primary are deleted on the backup.
- - auto-delete queues with/without timeout are deleted after a failover.
+ - auto-delete queues with/without timeout are deleted after a failover correctly
+ - auto-delete queues never used (subscribe to) to are not deleted
- messages are correctly routed to the alternate exchange.
"""
cluster = HaCluster(self, 3)
s = cluster[0].connect().session()
- def setup(q, timeout=""):
- if timeout: timeout = ",arguments:{'qpid.auto_delete_timeout':%s}"%timeout
+ a = cluster[0].agent
+
+ def setup(q, timeout=None):
# Create alternate exchange, auto-delete queue and queue bound to alt. ex.
- s.sender("%s-altex;{create:always,node:{type:topic,x-declare:{type:fanout}}}"%q)
- qs = s.sender("%s;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:%s-altex%s}}}"%(q,q,timeout))
- s.sender("%s-altq;{create:always,node:{x-bindings:[{exchange:%s-altex,queue:%s-altq}]}}"%(q,q,q))
- qs.send(q) # Send a message to the auto-delete queue
- return s
-
- for args in [("q1",""),("q2","0"),("q3","1"),("q4",""),("q5","")]: setup(*args)
- receivers = [s.receiver("q%s"%i) for i in [1,2,3,4]] # Subscribe to queues
- # Note q5 is never subscribed to, so should not be auto-deleted.
+ a.addExchange("fanout", q+"-altex")
+ args = {"auto-delete":True, "alternate-exchange":q+"-altex"}
+ if timeout is not None: args['qpid.auto_delete_timeout'] = timeout
+ a.addQueue(q, args)
+ a.addQueue(q+"-altq")
+ a.bind("%s-altex"%q, "%s-altq"%q)
+
+ for args in [["q1"],["q2",0],["q3",1],["q4"],["q5"]]: setup(*args)
+ receivers = []
+ for i in xrange(1,5): # Don't use q5
+ q = "q%s"%i
+ receivers.append(s.receiver(q)) # Subscribe
+ qs = s.sender(q); qs.send(q); qs.close() # Send q name as message
+
receivers[3].close() # Trigger auto-delete for q4
- cluster[0].kill(final=False)
+ for b in cluster[1:3]: b.wait_no_queue("q4") # Verify deleted on backups
+
+ cluster[0].kill(final=False) # Kill primary
cluster[2].promote()
cluster.restart(0)
- cluster[2].assert_browse("q3",["q3"]) # Not yet auto-deleted, 1 sec timeout.
- for i in [2,1,0]:
- for q in ["q1", "q2", "q3","q4"]:
- cluster[i].wait_no_queue(q,timeout=2) # auto-deleted
- cluster[i].assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate
- cluster[i].assert_browse_backup("q5", ["q5"]) # Never subscribed, not deleted.
- cluster[i].assert_browse_backup("q5-altq", [])
+ cluster[2].wait_queue("q3") # Not yet auto-deleted, 1 sec timeout.
+ for b in cluster:
+ for q in ["q%s"%i for i in xrange(1,5)]:
+ b.wait_no_queue(q,timeout=2, msg=str(b)) # auto-deleted
+ b.assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate
+ cluster[2].wait_queue("q5") # Not auto-deleted, never subscribed
+ cluster[2].connect().session().receiver("q5").close()
+ cluster[2].wait_no_queue("q5")
def test_auto_delete_close(self):
"""Verify auto-delete queues are deleted on backup if auto-deleted
on primary"""
cluster=HaCluster(self, 2)
+
+ # Create altex to use as alternate exchange, with altq bound to it
+ a = cluster[0].agent
+ a.addExchange("fanout", "altex")
+ a.addQueue("altq", {"auto-delete":True})
+ a.bind("altex", "altq")
+
p = cluster[0].connect().session()
- alt_setup(p, "1")
- r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+ r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex'}}}")
s = p.sender("adq1")
for m in ["aa","bb","cc"]: s.send(m)
- p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s.close()
cluster[1].wait_queue("adq1")
- cluster[1].wait_queue("adq2")
r.close() # trigger auto-delete of adq1
cluster[1].wait_no_queue("adq1")
- cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
- cluster[1].wait_queue("adq2")
+ cluster[1].assert_browse_backup("altq", ["aa","bb","cc"])
def test_expired(self):
"""Regression test for QPID-4379: HA does not properly handle expired messages"""
@@ -815,7 +835,7 @@ acl deny all all
cluster = HaCluster(self, 2)
s = cluster[0].connect().session().sender("q;{create:always}", capacity=2)
def send_ttl_messages():
- for i in xrange(100): s.send(Message(str(i), ttl=0.001), timeout=1)
+ for i in xrange(100): s.send(qm.Message(str(i), ttl=0.001))
send_ttl_messages()
cluster.start()
send_ttl_messages()
@@ -827,7 +847,9 @@ acl deny all all
s = cluster[0].connect().session()
s.sender("keep;{create:always}") # Leave this queue in place.
for i in xrange(100):
- s.sender("deleteme%s;{create:always,delete:always}"%(i)).close()
+ q = "deleteme%s"%(i)
+ cluster[0].agent.addQueue(q)
+ cluster[0].agent.delQueue(q)
# It is possible for the backup to attempt to subscribe after the queue
# is deleted. This is not an error, but is logged as an error on the primary.
# The backup does not log this as an error so we only check the backup log for errors.
@@ -846,58 +868,46 @@ acl deny all all
cluster[1].assert_browse_backup("qq", msgs)
cluster[2].assert_browse_backup("qq", msgs)
# Set up an exchange with a binding.
- sn.sender("xx;{create:always,node:{type:topic}}")
- sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}")
+ a = cluster[0].agent
+ a.addExchange("fanout", "xx")
+ a.addQueue("xxq")
+ a.bind("xx", "xxq", "xxq")
cluster[1].wait_address("xx")
- self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1)
+ self.assertEqual(cluster[1].agent.getExchange("xx").values["bindingCount"], 1)
cluster[2].wait_address("xx")
- self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
+ self.assertEqual(cluster[2].agent.getExchange("xx").values["bindingCount"], 1)
# Simulate the race by re-creating the objects before promoting the new primary
cluster.kill(0, promote_next=False)
xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
node = "node:{%s}"%(xdecl)
sn = cluster[1].connect_admin().session()
- sn.sender("qq;{delete:always}").close()
+ a = cluster[1].agent
+ a.delQueue("qq", if_empty=False)
s = sn.sender("qq;{create:always, %s}"%(node))
s.send("foo")
- sn.sender("xx;{delete:always}").close()
+ a.delExchange("xx")
sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl))
cluster[1].promote()
cluster[1].wait_status("active")
# Verify we are not still using the old objects on cluster[2]
cluster[2].assert_browse_backup("qq", ["foo"])
cluster[2].wait_address("xx")
- self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0)
-
- def test_redeclare_exchange(self):
- """Ensure that re-declaring an exchange is an HA no-op"""
- cluster = HaCluster(self, 2)
- ps = cluster[0].connect().session()
- ps.sender("ex1;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
- ps.sender("ex2;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout', alternate-exchange:'ex1'}}}")
- cluster[1].wait_backup("ex1")
- cluster[1].wait_backup("ex2")
-
- # Use old API to re-declare the exchange
- old_conn = cluster[0].connect_old()
- old_sess = old_conn.session(str(qpid.datatypes.uuid4()))
- old_sess.exchange_declare(exchange='ex1', type='fanout')
- cluster[1].wait_backup("ex1")
+ self.assertEqual(cluster[2].agent.getExchange("xx").values["bindingCount"], 0)
def test_resource_limit_bug(self):
"""QPID-5666 Regression test: Incorrect resource limit exception for queue creation."""
cluster = HaCluster(self, 3)
qs = ["q%s"%i for i in xrange(10)]
- s = cluster[0].connect().session()
- s.sender("q;{create:always}").close()
+ a = cluster[0].agent
+ a.addQueue("q")
cluster.kill(0)
cluster[1].promote()
cluster[1].wait_status("active")
- s = cluster[1].connect().session()
- s.receiver("q;{delete:always}").close()
- s.sender("qq;{create:always}").close()
-
+ a = cluster[1].agent
+ a.delQueue("q")
+ a.addQueue("q")
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -948,12 +958,12 @@ class LongTests(HaBrokerTest):
n = 10
senders = [
NumberedSender(
- brokers[0], url=brokers.url,max_depth=50, failover_updates=False,
+ brokers[0], url=brokers.url,max_depth=50,
queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
receivers = [
NumberedReceiver(
- brokers[0], url=brokers.url, sender=senders[i],failover_updates=False,
+ brokers[0], url=brokers.url, sender=senders[i],
queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
for r in receivers: r.start()
@@ -1017,6 +1027,7 @@ class LongTests(HaBrokerTest):
"--address", "q;{create:always}",
"--messages=1000",
"--tx=10"
+ # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
])
receiver = self.popen(
["qpid-receive",
@@ -1025,6 +1036,7 @@ class LongTests(HaBrokerTest):
"--messages=990",
"--timeout=10",
"--tx=10"
+ # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
])
self.assertEqual(sender.wait(), 0)
self.assertEqual(receiver.wait(), 0)
@@ -1053,7 +1065,7 @@ class LongTests(HaBrokerTest):
try:
self.connection.session().receiver(
self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}")
- except NotFound: pass # Can occur occasionally, not an error.
+ except qm.NotFound: pass # Can occur occasionally, not an error.
try: self.connection.close()
except: pass
@@ -1119,7 +1131,8 @@ class RecoveryTests(HaBrokerTest):
cluster[0].wait_status("active")
for b in cluster[1:4]: b.wait_status("ready")
# Create a queue before the failure.
- s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ # FIXME aconway 2014-02-20: SWIG client doesn't respect sync=False
+ s1 = cluster.connect(0, native=True).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
for i in xrange(10): s1.send(str(i), timeout=0.1)
@@ -1130,13 +1143,11 @@ class RecoveryTests(HaBrokerTest):
cluster[3].wait_status("recovering")
def assertSyncTimeout(s):
- try:
- s.sync(timeout=.01)
- self.fail("Expected Timeout exception")
- except Timeout: pass
+ self.assertRaises(qpid.messaging.Timeout, s.sync, timeout=.01)
# Create a queue after the failure
- s2 = cluster.connect(3).session().sender("q2;{create:always}")
+ # FIXME aconway 2014-02-20: SWIG client doesn't respect sync=False
+ s2 = cluster.connect(3, native=True).session().sender("q2;{create:always}")
# Verify that messages sent are not completed
for i in xrange(10,20):
@@ -1182,12 +1193,11 @@ class RecoveryTests(HaBrokerTest):
# Should not go active till the expected backup connects or times out.
cluster[2].wait_status("recovering")
# Messages should be held till expected backup times out
- s = cluster[2].connect().session().sender("q;{create:always}")
+ ss = cluster[2].connect().session()
+ s = ss.sender("q;{create:always}")
s.send("foo", sync=False)
- # Verify message held initially.
- try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
- except Timeout: pass
- s.sync(timeout=1) # And released after the timeout.
+ self.assertEqual(s.unsettled(), 1) # Verify message not settled immediately.
+ s.sync(timeout=1) # And settled after timeout.
cluster[2].wait_status("active")
def test_join_ready_cluster(self):
@@ -1251,7 +1261,7 @@ class ConfigurationTests(HaBrokerTest):
cluster[0].set_public_url("bar:1234")
assert_url(r.fetch(1), "bar:1234")
cluster[0].set_brokers_url(cluster.url+",xxx:1234")
- self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL
+ self.assertRaises(qm.Empty, r.fetch, 0) # Not updated for brokers URL
class StoreTests(HaBrokerTest):
"""Test for HA with persistence."""
@@ -1268,18 +1278,19 @@ class StoreTests(HaBrokerTest):
sn = cluster[0].connect().session()
# Create queue qq, exchange exx and binding between them
s = sn.sender("qq;{create:always,node:{durable:true}}")
- sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:exx,key:k,queue:qq}]}}")
- for m in ["foo", "bar", "baz"]: s.send(Message(m, durable=True))
+ sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}}}")
+ cluster[0].agent.bind("exx", "qq", "k")
+ for m in ["foo", "bar", "baz"]: s.send(qm.Message(m, durable=True))
r = cluster[0].connect().session().receiver("qq")
self.assertEqual(r.fetch().content, "foo")
r.session.acknowledge()
# Sending this message is a hack to flush the dequeue operation on qq.
- s.send(Message("flush", durable=True))
+ s.send(qm.Message("flush", durable=True))
def verify(broker, x_count):
sn = broker.connect().session()
assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
- sn.sender("exx/k").send(Message("x", durable=True))
+ sn.sender("exx/k").send(qm.Message("x", durable=True))
assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
verify(cluster[0], 0) # Sanity check
@@ -1303,10 +1314,11 @@ class StoreTests(HaBrokerTest):
cluster = HaCluster(self, 2)
sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
- for m in ["foo","bar"]: s1.send(Message(m, durable=True))
+ for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
s2 = sn.sender("q2;{create:always,node:{durable:true}}")
- sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}")
- sk2.send(Message("hello", durable=True))
+ sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}}}")
+ cluster[0].agent.bind("ex", "q2", "k2")
+ sk2.send(qm.Message("hello", durable=True))
# Wait for backup to catch up.
cluster[1].assert_browse_backup("q1", ["foo","bar"])
cluster[1].assert_browse_backup("q2", ["hello"])
@@ -1315,11 +1327,9 @@ class StoreTests(HaBrokerTest):
r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1")
for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
r1.session.acknowledge()
- for m in ["x","y","z"]: s1.send(Message(m, durable=True))
- # Use old connection to unbind
- us = cluster[0].connect_old().session(str(uuid4()))
- us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2")
- us.exchange_bind(exchange="ex", binding_key="k1", queue="q1")
+ for m in ["x","y","z"]: s1.send(qm.Message(m, durable=True))
+ cluster[0].agent.unbind("ex", "q2", "k2")
+ cluster[0].agent.bind("ex", "q1", "k1")
# Restart both brokers from store to get inconsistent sequence numbering.
cluster.bounce(0, promote_next=False)
cluster[0].promote()
@@ -1350,11 +1360,11 @@ class TransactionTests(HaBrokerTest):
def tx_simple_setup(self, broker):
"""Start a transaction, remove messages from queue a, add messages to queue b"""
- c = broker.connect()
+ c = broker.connect(protocol=self.tx_protocol)
# Send messages to a, no transaction.
sa = c.session().sender("a;{create:always,node:{durable:true}}")
tx_msgs = ["x","y","z"]
- for m in tx_msgs: sa.send(Message(content=m, durable=True))
+ for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
# Receive messages from a, in transaction.
tx = c.session(transactional=True)
@@ -1373,14 +1383,14 @@ class TransactionTests(HaBrokerTest):
def tx_subscriptions(self, broker):
"""Return list of queue names for tx subscriptions"""
- return [q for q in broker.agent().repsub_queues()
+ return [q for q in broker.agent.repsub_queues()
if q.startswith("qpid.ha-tx")]
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
- tx_queues = cluster[0].agent().tx_queues()
+ tx_queues = cluster[0].agent.tx_queues()
# NOTE: backup does not process transactional dequeues until prepare
cluster[1].assert_browse_backup("a", ["x","y","z"])
@@ -1400,7 +1410,7 @@ class TransactionTests(HaBrokerTest):
def __init__(self, f): self.f, self.value = f, None
def __call__(self): self.value = self.f(); return self.value
- txq= FunctionCache(b.agent().tx_queues)
+ txq= FunctionCache(b.agent.tx_queues)
assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
txsub = FunctionCache(lambda: self.tx_subscriptions(b))
assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
@@ -1426,7 +1436,7 @@ class TransactionTests(HaBrokerTest):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
- tx_queues = cluster[0].agent().tx_queues()
+ tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
tx.rollback()
tx.close() # For clean test.
@@ -1447,7 +1457,7 @@ class TransactionTests(HaBrokerTest):
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
- tx_queues = cluster[0].agent().tx_queues()
+ tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
cluster[0].wait_status("ready") # Restarted.
@@ -1464,7 +1474,7 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.commit()
tx.sync()
- tx_queues = cluster[0].agent().tx_queues()
+ tx_queues = cluster[0].agent.tx_queues()
tx.close()
self.assert_simple_commit_outcome(cluster[0], tx_queues)
@@ -1472,7 +1482,7 @@ class TransactionTests(HaBrokerTest):
cluster = HaCluster(self, 1, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
- tx_queues = cluster[0].agent().tx_queues()
+ tx_queues = cluster[0].agent.tx_queues()
tx.acknowledge()
tx.rollback()
tx.sync()
@@ -1480,16 +1490,16 @@ class TransactionTests(HaBrokerTest):
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def assert_commit_raises(self, tx):
- def commit_sync(): tx.commit(timeout=1); tx.sync(timeout=1)
- self.assertRaises(ServerError, commit_sync)
+ def commit_sync(): tx.commit(); tx.sync()
+ self.assertRaises(Exception, commit_sync)
def test_tx_backup_fail(self):
cluster = HaCluster(
self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]])
- c = cluster[0].connect()
+ c = cluster[0].connect(protocol=self.tx_protocol)
tx = c.session(transactional=True)
s = tx.sender("q;{create:always,node:{durable:true}}")
- for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
+ for m in ["foo","bang","bar"]: s.send(qm.Message(m, durable=True))
self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
@@ -1502,10 +1512,10 @@ class TransactionTests(HaBrokerTest):
cluster = HaCluster(self, 3)
# Leaving
- tx = cluster[0].connect().session(transactional=True)
+ tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("a", sync=True)
- self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
+ self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
tx.commit()
@@ -1514,7 +1524,7 @@ class TransactionTests(HaBrokerTest):
self.assert_tx_clean(b)
b.assert_browse_backup("q", ["a","b"], msg=b)
# Joining
- tx = cluster[0].connect().session(transactional=True)
+ tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
cluster.restart(1) # Not a part of the current transaction.
@@ -1523,18 +1533,17 @@ class TransactionTests(HaBrokerTest):
for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
- # FIXME aconway 2013-11-07: assert_log_clean
def test_tx_block_threads(self):
"""Verify that TXs blocked in commit don't deadlock."""
cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
n = 10 # Number of concurrent transactions
- sessions = [cluster[0].connect().session(transactional=True) for i in xrange(n)]
+ sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
# Have the store delay the response for 10s
for s in sessions:
sn = s.sender("qq;{create:always,node:{durable:true}}")
- sn.send(Message("foo", durable=True))
- self.assertEqual(n, len(cluster[1].agent().tx_queues()))
+ sn.send(qm.Message("foo", durable=True))
+ self.assertEqual(n, len(cluster[1].agent.tx_queues()))
threads = [ Thread(target=s.commit) for s in sessions]
for t in threads: t.start()
cluster[0].ready(timeout=1) # Check for deadlock
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index c7269ac5e9..bb82b1c956 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -25,7 +25,7 @@ from brokertest import *
from ha_test import HaPort
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerAgent, BrokerObject
+from qpidtoollibs import BrokerObject
class Domain(BrokerObject):
def __init__(self, broker, values):
@@ -49,7 +49,7 @@ class AmqpBrokerTest(BrokerTest):
self.port_holder = HaPort(self)
self.broker = self.amqp_broker(port_holder=self.port_holder)
self.default_config = Config(self.broker)
- self.agent = BrokerAgent(self.broker.connect())
+ self.agent = self.broker.agent
def sender(self, config, reply_to=None):
cmd = ["qpid-send",
@@ -224,7 +224,7 @@ class AmqpBrokerTest(BrokerTest):
def incoming_link(self, mechanism):
brokerB = self.amqp_broker()
- agentB = BrokerAgent(brokerB.connect())
+ agentB = brokerB.agent
self.agent.create("queue", "q")
agentB.create("queue", "q")
self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism})
@@ -240,7 +240,7 @@ class AmqpBrokerTest(BrokerTest):
def test_outgoing_link(self):
brokerB = self.amqp_broker()
- agentB = BrokerAgent(brokerB.connect())
+ agentB = brokerB.agent
self.agent.create("queue", "q")
agentB.create("queue", "q")
self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
@@ -250,7 +250,7 @@ class AmqpBrokerTest(BrokerTest):
def test_relay(self):
brokerB = self.amqp_broker()
- agentB = BrokerAgent(brokerB.connect())
+ agentB = brokerB.agent
agentB.create("queue", "q")
self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
#send to q on broker B through brokerA
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
index 3c62740a62..37c12601be 100644
--- a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
+++ b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
@@ -17,15 +17,20 @@
# under the License.
#
+import os
+
from brokertest import EXPECT_EXIT_OK
from store_test import StoreTest, Qmf, store_args
from qpid.messaging import *
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client.
+
class ExchangeQueueTests(StoreTest):
"""
Simple tests of the broker exchange and queue types
"""
-
+
def test_direct_exchange(self):
"""Test Direct exchange."""
broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK)
@@ -34,11 +39,11 @@ class ExchangeQueueTests(StoreTest):
broker.send_message("a", msg1)
broker.send_message("b", msg2)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_direct_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg2, True)
-
+
def test_topic_exchange(self):
"""Test Topic exchange."""
broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK)
@@ -56,17 +61,17 @@ class ExchangeQueueTests(StoreTest):
msg2 = Message("Message2", durable=True, correlation_id="Msg0004")
snd2.send(msg2)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_topic_exchange")
self.check_message(broker, "a", msg1, True)
self.check_message(broker, "b", msg1, True)
self.check_messages(broker, "c", [msg1, msg2], True)
self.check_message(broker, "d", msg2, True)
self.check_message(broker, "e", msg2, True)
-
-
+
+
def test_legacy_lvq(self):
- """Test legacy LVQ."""
+ """Test legacy LVQ."""
broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
@@ -77,7 +82,7 @@ class ExchangeQueueTests(StoreTest):
broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
xprops="arguments:{\"qpid.last_value_queue\":True}")
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
# Add more messages while subscriber is active (no replacement):
@@ -89,11 +94,11 @@ class ExchangeQueueTests(StoreTest):
broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn)
ssn.acknowledge()
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_lvq")
self.check_messages(broker, "lvq-test", [ma4, mc4], True)
-
-
+
+
def test_fanout_exchange(self):
"""Test Fanout Exchange"""
broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK)
@@ -107,7 +112,7 @@ class ExchangeQueueTests(StoreTest):
msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
snd.send(msg2)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_fanout_exchange")
self.check_messages(broker, "q1", [msg1, msg2], True)
self.check_messages(broker, "q2", [msg1, msg2], True)
@@ -124,18 +129,18 @@ class ExchangeQueueTests(StoreTest):
m2 = rcv.fetch()
ssn.acknowledge(message=m2, disposition=Disposition(REJECTED))
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_message_reject")
qmf = Qmf(broker)
assert qmf.queue_message_count("tmr") == 0
-
+
def test_route(self):
""" Test the recovery of a route (link and bridge objects."""
broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK)
qmf = Qmf(broker)
qmf_broker_obj = qmf.get_objects("broker")[0]
-
+
# create a "link"
link_args = {"host":"a.fake.host.com", "port":9999, "durable":True,
"authMechanism":"PLAIN", "username":"guest", "password":"guest",
@@ -143,16 +148,16 @@ class ExchangeQueueTests(StoreTest):
result = qmf_broker_obj.create("link", "test-link", link_args, False)
self.assertEqual(result.status, 0, result)
link = qmf.get_objects("link")[0]
-
+
# create bridge
bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout",
"key":"my-key", "durable":True}
result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False);
self.assertEqual(result.status, 0, result)
bridge = qmf.get_objects("bridge")[0]
-
+
broker.terminate()
-
+
# recover the link and bridge
broker = self.broker(store_args(), name="test_route")
qmf = Qmf(broker)
@@ -189,7 +194,7 @@ class AlternateExchangePropertyTests(StoreTest):
self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"),
"Alternate exchange property not found or is incorrect on exchange \"testExch\".")
qmf.close()
-
+
def test_queue(self):
"""Queue alternate exchange property persistexchangeNamece test"""
broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK)
@@ -226,7 +231,7 @@ class RedeliveredTests(StoreTest):
msg = Message(msg_content, durable=True)
broker.send_message("testQueue", msg)
broker.terminate()
-
+
broker = self.broker(store_args(), name="test_broker_recovery")
rcv_msg = broker.get_message("testQueue")
self.assertEqual(msg_content, rcv_msg.content)
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/resize.py b/qpid/cpp/src/tests/legacystore/python_tests/resize.py
index 469e0f6730..e719b755da 100644
--- a/qpid/cpp/src/tests/legacystore/python_tests/resize.py
+++ b/qpid/cpp/src/tests/legacystore/python_tests/resize.py
@@ -26,8 +26,11 @@ from qpid.datatypes import uuid4
from store_test import StoreTest, store_args
from qpid.messaging import Message
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
+
class ResizeTest(StoreTest):
-
+
resize_tool = os.getenv("QPID_STORE_RESIZE_TOOL", "qpid-store-resize")
print resize_tool
def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail):
@@ -47,21 +50,21 @@ class ResizeTest(StoreTest):
finally:
p.stdout.close()
return res
-
+
def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8,
init_file_size = 24, exp_fail = False, wait_time = None):
# Using a sender will force the creation of an empty persistent queue which is needed for some tests
broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time)
ssn = broker.connect().session()
snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name)
-
+
msgs = []
for index in range(0, num_msgs):
msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index)
msgs.append(msg)
snd.send(msg)
broker.terminate()
-
+
res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files,
resize_file_size, exp_fail)
if res != 0:
@@ -70,95 +73,95 @@ class ResizeTest(StoreTest):
self.fail("ERROR: Resize operation failed with return code %d" % res)
elif exp_fail:
self.fail("ERROR: Resize operation succeeded, but a failure was expected")
-
+
broker = self.broker(store_args(), name="broker")
self.check_messages(broker, queue_name, msgs, True)
-
- # TODO: Check the physical files to check number and size are as expected.
+
+ # TODO: Check the physical files to check number and size are as expected.
class SimpleTest(ResizeTest):
"""
Simple tests of the resize utility for resizing a journal to larger and smaller sizes.
"""
-
+
def test_empty_store_same(self):
self._resize_test(queue_name = "empty_store_same",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 8, resize_file_size = 24)
-
+
def test_empty_store_up(self):
self._resize_test(queue_name = "empty_store_up",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_empty_store_down(self):
self._resize_test(queue_name = "empty_store_down",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 6, resize_file_size = 12)
-
-# TODO: Put into long tests, make sure there is > 128GB free disk space
+
+# TODO: Put into long tests, make sure there is > 128GB free disk space
# def test_empty_store_max(self):
# self._resize_test(queue_name = "empty_store_max",
# num_msgs = 0, msg_size = 0,
# init_num_files = 8, init_file_size = 24,
# resize_num_files = 64, resize_file_size = 32768,
# wait_time = 120)
-
+
def test_empty_store_min(self):
self._resize_test(queue_name = "empty_store_min",
num_msgs = 0, msg_size = 0,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 1)
-
+
def test_basic_up(self):
self._resize_test(queue_name = "basic_up",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_basic_down(self):
self._resize_test(queue_name = "basic_down",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 15)
-
+
def test_basic_low(self):
self._resize_test(queue_name = "basic_low",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 4,
exp_fail = True)
-
+
def test_basic_under(self):
self._resize_test(queue_name = "basic_under",
num_msgs = 100, msg_size = 10000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 4, resize_file_size = 3,
exp_fail = True)
-
+
def test_very_large_msg_up(self):
self._resize_test(queue_name = "very_large_msg_up",
num_msgs = 4, msg_size = 2000000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_very_large_msg_down(self):
self._resize_test(queue_name = "very_large_msg_down",
num_msgs = 4, msg_size = 2000000,
init_num_files = 16, init_file_size = 64,
resize_num_files = 16, resize_file_size = 48)
-
+
def test_very_large_msg_low(self):
self._resize_test(queue_name = "very_large_msg_low",
num_msgs = 4, msg_size = 2000000,
init_num_files = 8, init_file_size = 24,
resize_num_files = 7, resize_file_size = 20,
exp_fail = True)
-
+
def test_very_large_msg_under(self):
self._resize_test(queue_name = "very_large_msg_under",
num_msgs = 4, msg_size = 2000000,
diff --git a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
index 2fcab4e38e..cc846aefd4 100644
--- a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
+++ b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
@@ -22,10 +22,13 @@ from brokertest import BrokerTest
from qpid.messaging import Empty
from qmf.console import Session
-
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
+
+
def store_args(store_dir = None):
"""Return the broker args necessary to load the async store"""
- assert BrokerTest.store_lib
+ assert BrokerTest.store_lib
if store_dir == None:
return []
return ["--store-dir", store_dir]
@@ -51,7 +54,7 @@ class Qmf:
else:
amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable,
arguments=arguments)
-
+
def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None):
"""Add a new queue"""
amqp_session = self.__broker.getAmqpSession()
@@ -62,7 +65,7 @@ class Qmf:
durable=durable, arguments=arguments)
else:
amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments)
-
+
def delete_queue(self, queue_name):
"""Delete an existing queue"""
amqp_session = self.__broker.getAmqpSession()
@@ -84,24 +87,24 @@ class Qmf:
return found
except Exception:
return False
-
+
def query_exchange(self, exchange_name, alt_exchange_name=None):
"""Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
value."""
return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name)
-
+
def query_queue(self, queue_name, alt_exchange_name=None):
"""Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
value."""
return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name)
-
+
def queue_message_count(self, queue_name):
"""Query the number of messages on a queue"""
queue_list = self.__session.getObjects(_class="queue", _name=queue_name)
if len(queue_list):
return queue_list[0].msgDepth
-
+
def queue_empty(self, queue_name):
"""Check if a queue is empty (has no messages waiting)"""
return self.queue_message_count(queue_name) == 0
@@ -109,7 +112,7 @@ class Qmf:
def get_objects(self, target_class, target_package="org.apache.qpid.broker"):
return self.__session.getObjects(_class=target_class, _package=target_package)
-
+
def close(self):
self.__session.delBroker(self.__broker)
self.__session = None
@@ -119,7 +122,7 @@ class StoreTest(BrokerTest):
"""
This subclass of BrokerTest adds some convenience test/check functions
"""
-
+
def _chk_empty(self, queue, receiver):
"""Check if a queue is empty (has no more messages)"""
try:
@@ -141,9 +144,9 @@ class StoreTest(BrokerTest):
else:
buff += chr(ord('a') + (index % 26))
return buff + msg
-
+
# Functions for formatting address strings
-
+
@staticmethod
def _fmt_csv(string_list, list_braces = None):
"""Format a list using comma-separation. Braces are optionally added."""
@@ -163,16 +166,16 @@ class StoreTest(BrokerTest):
if list_braces != None:
str_ += list_braces[1]
return str_
-
+
def _fmt_map(self, string_list):
"""Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map
element('key:val')."""
- return self._fmt_csv(string_list, list_braces="{}")
-
+ return self._fmt_csv(string_list, list_braces="{}")
+
def _fmt_list(self, string_list):
"""Format a list [l1, l2, l3, ...] from a string list."""
- return self._fmt_csv(string_list, list_braces="[]")
-
+ return self._fmt_csv(string_list, list_braces="[]")
+
def addr_fmt(self, node_name, **kwargs):
"""Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address
string."""
@@ -190,12 +193,12 @@ class StoreTest(BrokerTest):
x_declare_list = kwargs.get("x_declare_list", [])
x_bindings_list = kwargs.get("x_bindings_list", [])
x_subscribe_list = kwargs.get("x_subscribe_list", [])
-
+
node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0)
link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or
len(x_bindings_list) > 0 or len(x_subscribe_list) > 0)
assert not (node_flag and link_flag)
-
+
opt_str_list = []
if create_policy != None:
opt_str_list.append("create: %s" % create_policy)
@@ -231,7 +234,7 @@ class StoreTest(BrokerTest):
if len(opt_str_list) > 0:
addr_str += "; %s" % self._fmt_map(opt_str_list)
return addr_str
-
+
def snd_addr(self, node_name, **kwargs):
""" Create a send (node) address"""
# Get keyword args
@@ -245,7 +248,7 @@ class StoreTest(BrokerTest):
ftd_size = kwargs.get("ftd_size")
policy = kwargs.get("policy", "flow-to-disk")
exchage_type = kwargs.get("exchage_type")
-
+
create_policy = None
if auto_create:
create_policy = "always"
@@ -265,10 +268,10 @@ class StoreTest(BrokerTest):
x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
if exchage_type != None:
x_declare_list.append("type: %s" % exchage_type)
-
+
return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy,
node_type=node_type, durable=durable, x_declare_list=x_declare_list)
-
+
def rcv_addr(self, node_name, **kwargs):
""" Create a receive (link) address"""
# Get keyword args
@@ -282,7 +285,7 @@ class StoreTest(BrokerTest):
ftd_count = kwargs.get("ftd_count")
ftd_size = kwargs.get("ftd_size")
policy = kwargs.get("policy", "flow-to-disk")
-
+
create_policy = None
if auto_create:
create_policy = "always"
@@ -291,7 +294,7 @@ class StoreTest(BrokerTest):
delete_policy = "always"
mode = None
if browse:
- mode = "browse"
+ mode = "browse"
x_declare_list = ["\"exclusive\": %s" % exclusive]
if ftd_count != None or ftd_size != None:
queue_policy = ["\'qpid.policy_type\': %s" % policy]
@@ -308,11 +311,11 @@ class StoreTest(BrokerTest):
return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
link_name=link_name, durable=durable, x_declare_list=x_declare_list,
x_bindings_list=x_bindings_list, link_reliability=reliability)
-
+
def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
"""Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse)
-
+
def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False,
emtpy_flag=False):
"""Check that messages is on a queue by dequeuing them and comparing them to the expected messages"""
@@ -341,8 +344,8 @@ class StoreTest(BrokerTest):
if transactional:
ssn.commit()
return ssn
-
-
+
+
# Functions for finding strings in the broker log file (or other files)
@staticmethod
@@ -353,7 +356,7 @@ class StoreTest(BrokerTest):
return file_handle.read()
finally:
file_handle.close()
-
+
def _get_hits(self, broker, search):
"""Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple
queues)"""
@@ -361,9 +364,9 @@ class StoreTest(BrokerTest):
hits = []
for hit in search.findall(self._read_file(broker.log)):
if hit not in hits:
- hits.append(hit)
+ hits.append(hit)
return hits
-
+
def _reconsile_hits(self, broker, ftd_msgs, release_hits):
"""Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining
release_hits."""
@@ -382,35 +385,33 @@ class StoreTest(BrokerTest):
for hit in release_hits:
err += " %s\n" % hit
self.assert_(False, err)
-
+
def check_msg_release(self, broker, ftd_msgs):
""" Check for 'Content released' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_release_on_commit(self, broker, ftd_msgs):
""" Check for 'Content released on commit' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released on commit$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_release_on_recover(self, broker, ftd_msgs):
""" Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released after recovery$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_block(self, broker, ftd_msgs):
"""Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content release blocked$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
+
def check_msg_block_on_commit(self, broker, ftd_msgs):
"""Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content release blocked on commit$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
-
-
diff --git a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
index 55497ccc03..2b45cb6eea 100755
--- a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
+++ b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py
@@ -30,6 +30,8 @@ from qpid.messaging import Message
try: import qmf.console
except: print "Cannot import module qmf.console, skipping tests"; exit(0);
+import qpid.messaging, brokertest
+brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
class ConsoleTest(BrokerTest):
"""
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index df410ec188..678f7aa223 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -50,11 +50,11 @@ export QPID_TESTS_PY=$QPID_TESTS/src/py
export QPID_TOOLS=$top_srcdir/../tools
export QMF_LIB=$top_srcdir/../extras/qmf/src/py
export PYTHON_COMMANDS=$QPID_TOOLS/src/py
-export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$PYTHONPATH
+export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir
+export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$PYTHONPATH_SWIG:$PYTHONPATH
export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config
export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route
export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha
-export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir
export PYTHONSWIGMODULE=$pythonswigdir/qpid_messaging.py
# Executables
export QPIDD_EXEC=$top_builddir/src/qpidd
diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py
index e7be98c486..f6243bbba5 100644
--- a/qpid/tools/src/py/qpidtoollibs/broker.py
+++ b/qpid/tools/src/py/qpidtoollibs/broker.py
@@ -17,7 +17,7 @@
# under the License.
#
-from qpid.messaging import Message
+import sys
from qpidtoollibs.disp import TimeLong
try:
from uuid import uuid4
@@ -26,13 +26,16 @@ except ImportError:
class BrokerAgent(object):
"""
- Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection.
+ Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection
+ or qpid_messaging.Connection
"""
def __init__(self, conn):
+ # Use the Message class from the same module as conn which could be qpid.messaging
+ # or qpid_messaging
+ self.message_class = sys.modules[conn.__class__.__module__].Message
self.conn = conn
self.sess = self.conn.session()
- self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
- str(uuid4())
+ self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4())
self.reply_rx = self.sess.receiver(self.reply_to)
self.reply_rx.capacity = 10
self.tx = self.sess.sender("qmf.default.direct/broker")
@@ -55,8 +58,9 @@ class BrokerAgent(object):
'_method_name' : method,
'_arguments' : arguments}
- message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
- properties=props, subject="broker")
+ message = self.message_class(
+ content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
self.tx.send(message)
response = self.reply_rx.fetch(timeout)
self.sess.acknowledge()
@@ -72,8 +76,9 @@ class BrokerAgent(object):
'x-amqp-0-10.app-id' : 'qmf2'}
correlator = str(self.next_correlator)
self.next_correlator += 1
- message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
- properties=props, subject="broker")
+ message = self.message_class(
+ content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
self.tx.send(message)
return correlator
@@ -259,7 +264,7 @@ class BrokerAgent(object):
'options': options}
self._method('delete', args)
- def bind(self, exchange, queue, key, options={}, **kwargs):
+ def bind(self, exchange, queue, key="", options={}, **kwargs):
properties = options
for k,v in kwargs.items():
properties[k] = v