diff options
28 files changed, 871 insertions, 608 deletions
diff --git a/qpid/cpp/bindings/qpid/python/qpid_messaging.i b/qpid/cpp/bindings/qpid/python/qpid_messaging.i index 8063db5967..e728c90334 100644 --- a/qpid/cpp/bindings/qpid/python/qpid_messaging.i +++ b/qpid/cpp/bindings/qpid/python/qpid_messaging.i @@ -141,7 +141,7 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) * don't even know why there is a non-const version of the method. */ %rename(opened) qpid::messaging::Connection::isOpen(); %rename(_close) qpid::messaging::Connection::close(); -%rename(receiver) qpid::messaging::Session::createReceiver; +%rename(_receiver) qpid::messaging::Session::createReceiver; %rename(_sender) qpid::messaging::Session::createSender; %rename(_acknowledge_all) qpid::messaging::Session::acknowledge(bool); %rename(_acknowledge_msg) qpid::messaging::Session::acknowledge( @@ -161,6 +161,16 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) %rename(_getTtl) qpid::messaging::Message::getTtl; %rename(_setTtl) qpid::messaging::Message::setTtl; +%rename(_sync) qpid::messaging::Session::sync; + +// Capitalize constant names correctly for python +%rename(TRACE) qpid::messaging::trace; +%rename(DEBUG) qpid::messaging::debug; +%rename(INFO) qpid::messaging::info; +%rename(NOTICE) qpid::messaging::notice; +%rename(WARNING) qpid::messaging::warning; +%rename(ERROR) qpid::messaging::error; +%rename(CRITICAL) qpid::messaging::critical; %include "qpid/qpid.i" @@ -221,31 +231,77 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) %pythoncode %{ @staticmethod - def establish(url=None, **options) : + def establish(url=None, timeout=None, **options) : + if timeout and "reconnect-timeout" not in options: + options["reconnect-timeout"] = timeout conn = Connection(url, **options) conn.open() return conn %} } +%pythoncode %{ + # Disposition class from messaging/message.py + class Disposition: + def __init__(self, type, **options): + self.type = type + self.options = options + + def __repr__(self): + args = [str(self.type)] + ["%s=%r" % (k, v) for k, v in self.options.items()] + return "Disposition(%s)" % ", ".join(args) + + # Consntants from messaging/constants.py + __SELF__ = object() + + class Constant: + + def __init__(self, name, value=__SELF__): + self.name = name + if value is __SELF__: + self.value = self + else: + self.value = value + + def __repr__(self): + return self.name + + AMQP_PORT = 5672 + AMQPS_PORT = 5671 + + UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + + REJECTED = Constant("REJECTED") + RELEASED = Constant("RELEASED") +%} + %extend qpid::messaging::Session { %pythoncode %{ def acknowledge(self, message=None, disposition=None, sync=True) : - if disposition : - raise Exception("SWIG does not support dispositions yet. Use " - "Session.reject and Session.release instead") if message : - self._acknowledge_msg(message, sync) + if disposition is None: self._acknowledge_msg(message, sync) + # FIXME aconway 2014-02-11: the following does not repsect the sync flag. + elif disposition.type == REJECTED: self.reject(message) + elif disposition.type == RELEASED: self.release(message) else : + if disposition : # FIXME aconway 2014-02-11: support this + raise Exception("SWIG does not support dispositions yet. Use " + "Session.reject and Session.release instead") self._acknowledge_all(sync) __swig_getmethods__["connection"] = getConnection if _newclass: connection = property(getConnection) - def sender(self, target, **options) : - s = self._sender(target) - s._setDurable(options.get("durable")) - return s + def receiver(self, source, capacity=None): + r = self._receiver(source) + if capacity is not None: r.capacity = capacity + return r + + def sender(self, target, durable=None, capacity=None) : + s = self._sender(target) + if capacity is not None: s.capacity = capacity + s._setDurable(durable) + return s def next_receiver(self, timeout=None) : if timeout is None : @@ -254,6 +310,11 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) # Python API uses timeouts in seconds, # but C++ API uses milliseconds return self._next_receiver(Duration(int(1000*timeout))) + + def sync(self, timeout=None): + if timeout == 0: self._sync(False) # Non-blocking sync + else: self._sync(True) # Blocking sync, C++ has not timeout. + %} } @@ -302,6 +363,8 @@ QPID_EXCEPTION(UnauthorizedAccess, SessionError) message.durable = self.durable return self._send(message, sync) + def sync(self, timeout=None): self.session.sync(timeout) + __swig_getmethods__["capacity"] = getCapacity __swig_setmethods__["capacity"] = setCapacity if _newclass: capacity = property(getCapacity, setCapacity) diff --git a/qpid/cpp/include/qpid/messaging/Logger.h b/qpid/cpp/include/qpid/messaging/Logger.h index 3b774dc8a9..39518e01d4 100644 --- a/qpid/cpp/include/qpid/messaging/Logger.h +++ b/qpid/cpp/include/qpid/messaging/Logger.h @@ -103,7 +103,7 @@ public: * --log-time ("on"|"off|"0"|"1") * --log-level ("on"|"off|"0"|"1") * --log-source ("on"|"off|"0"|"1") - * --log-thread ("on"|"off|"0"|"1") + * --log-thread ("on"|"off|"0"|"1") * --log-function ("on"|"off|"0"|"1") * --log-hires-timestamp ("on"|"off|"0"|"1") * diff --git a/qpid/cpp/include/qpid/qpid.i b/qpid/cpp/include/qpid/qpid.i index 28a9064ebb..70f4ce8105 100644 --- a/qpid/cpp/include/qpid/qpid.i +++ b/qpid/cpp/include/qpid/qpid.i @@ -53,6 +53,7 @@ struct mystr #include <qpid/messaging/Message.h> #include <qpid/messaging/Duration.h> #include <qpid/messaging/FailoverUpdates.h> +#include <qpid/messaging/Logger.h> // // Wrapper functions for map-decode and list-decode. This allows us to avoid @@ -84,6 +85,7 @@ qpid::types::Variant::List& decodeList(const qpid::messaging::Message& msg) { %include <qpid/messaging/Session.h> %include <qpid/messaging/Connection.h> %include <qpid/messaging/FailoverUpdates.h> +%include <qpid/messaging/Logger.h> qpid::types::Variant::Map& decodeMap(const qpid::messaging::Message&); qpid::types::Variant::List& decodeList(const qpid::messaging::Message&); diff --git a/qpid/cpp/include/qpid/swig_python_typemaps.i b/qpid/cpp/include/qpid/swig_python_typemaps.i index 9d44a1e1ef..e9a84ffa8e 100644 --- a/qpid/cpp/include/qpid/swig_python_typemaps.i +++ b/qpid/cpp/include/qpid/swig_python_typemaps.i @@ -452,3 +452,10 @@ typedef int Py_ssize_t; $1 = PyInt_Check($input) ? 1 : 0; } + +/** + * argc,argv as python list + */ + +%include <argcargv.i> +%apply (int ARGC, char **ARGV) { (int argc, const char *argv[]) } diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index d1ff184a64..f98b3da419 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -120,6 +120,8 @@ if (BUILD_AMQP) qpid/messaging/amqp/ConnectionHandle.cpp qpid/messaging/amqp/DriverImpl.h qpid/messaging/amqp/DriverImpl.cpp + qpid/messaging/amqp/PnData.h + qpid/messaging/amqp/PnData.cpp qpid/messaging/amqp/ReceiverContext.h qpid/messaging/amqp/ReceiverContext.cpp qpid/messaging/amqp/ReceiverHandle.h diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 0010f602c1..cd62523fc1 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1292,12 +1292,13 @@ const std::string Broker::TCP_TRANSPORT("tcp"); std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( const std::string& name, - const QueueSettings& settings, + const QueueSettings& constSettings, const OwnershipToken* owner, const std::string& alternateExchange, const std::string& userId, const std::string& connectionId) { + QueueSettings settings(constSettings); // So we can modify them if (acl) { std::map<acl::Property, std::string> params; params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); @@ -1335,6 +1336,10 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); } + // Identify queues that won't survive a failover: exclusive, auto-delete with no delay. + if (owner && settings.autodelete && !settings.autoDeleteDelay) + settings.isTemporary = true; + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, false/*recovering*/, owner, connectionId, userId); diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 941c0c88a2..fd99629492 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -521,7 +521,7 @@ void Queue::markInUse(bool controlling) else users.addOther(); } -void Queue::releaseFromUse(bool controlling) +void Queue::releaseFromUse(bool controlling, bool doDelete) { bool trydelete; if (controlling) { @@ -533,7 +533,7 @@ void Queue::releaseFromUse(bool controlling) users.removeOther(); trydelete = isUnused(locker); } - if (trydelete) scheduleAutoDelete(); + if (trydelete && doDelete) scheduleAutoDelete(); } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive, @@ -577,11 +577,13 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive, if (mgmtObject != 0 && c->isCounted()) { mgmtObject->inc_consumerCount(); } - ManagementAgent* agent = broker->getManagementAgent(); - if (agent) { - agent->raiseEvent( - _qmf::EventSubscribe(connectionId, userId, name, - c->getTag(), requestExclusive, ManagementAgent::toMap(arguments))); + if (broker) { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent) { + agent->raiseEvent( + _qmf::EventSubscribe(connectionId, userId, name, + c->getTag(), requestExclusive, ManagementAgent::toMap(arguments))); + } } } @@ -589,6 +591,7 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons { removeListener(c); if(c->isCounted()) + { bool unused; { @@ -605,12 +608,12 @@ void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, cons if (mgmtObject != 0) { mgmtObject->dec_consumerCount(); } - if (unused && settings.autodelete) { - scheduleAutoDelete(); - } + if (unused && settings.autodelete) scheduleAutoDelete(); + } + if (broker) { + ManagementAgent* agent = broker->getManagementAgent(); + if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag())); } - ManagementAgent* agent = broker->getManagementAgent(); - if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag())); } /** diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 683468e9a4..c5996dd8be 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -363,7 +363,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, * be created. */ QPID_BROKER_EXTERN void markInUse(bool controlling=false); - QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false); + QPID_BROKER_EXTERN void releaseFromUse(bool controlling=false, bool doDelete=true); QPID_BROKER_EXTERN uint32_t purge(const uint32_t purge_request=0, //defaults to all messages boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 888dc68833..b9ae261f83 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -283,8 +283,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } catch (const qpid::types::Exception& e) { throw InvalidArgumentException(e.what()); } - // Identify queues that won't survive a failover. - settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay; std::pair<Queue::shared_ptr, bool> queue_created = getBroker().createQueue(name, settings, diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index f6b242c100..5f0dd2e101 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -264,7 +264,7 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists"); } std::pair<boost::shared_ptr<Queue>, bool> result - = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()); + = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), node.properties.isExclusive() ? this:0, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()); node.queue = result.first; node.created = result.second; } diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 6881896f5e..4e908fbe79 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -156,7 +156,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb, args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); // Don't allow backup queues to auto-delete, primary decides when to delete. - if (q->isAutoDelete()) q->markInUse(); + if (q->isAutoDelete()) q->markInUse(false); dispatch[DequeueEvent::KEY] = boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2); @@ -352,13 +352,13 @@ void QueueReplicator::promoted() { queue->getMessageInterceptors().add( boost::shared_ptr<IdSetter>(new IdSetter(maxId+1))); // Process auto-deletes - if (queue->isAutoDelete() && subscribed) { + if (queue->isAutoDelete()) { // Make a temporary shared_ptr to prevent premature deletion of queue. // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue // which could delete the queue while it's still running it's destroyed logic. boost::shared_ptr<Queue> q(queue); - q->releaseFromUse(); - q->scheduleAutoDelete(); + // See markInUse in ctor: release but don't delete if never used. + q->releaseFromUse(false/*controller*/, subscribed/*doDelete*/); } } } diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index 8fe74ee959..8b19dc2ee0 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -64,8 +64,6 @@ class ReplicationTest // Calculate level for objects that may not have replication set, // including auto-delete/exclusive settings. - ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive) const; - ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive) const; ReplicateLevel useLevel(const broker::Queue&) const; ReplicateLevel useLevel(const broker::Exchange&) const; diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp index 26bb699565..e5712288dd 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp @@ -119,6 +119,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& nestAnnotations = value; } else if (name == "set-to-on-send" || name == "set_to_on_send") { setToOnSend = value; + } else if (name == "properties" || name == "client-properties" || name == "client_properties") { + properties = value.asMap(); } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h index 877e541636..c8c8798b7b 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h @@ -47,6 +47,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings std::string identifier; bool nestAnnotations; bool setToOnSend; + std::map<std::string, qpid::types::Variant> properties; QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value); diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 3dbacac11f..763deb33c6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -18,6 +18,8 @@ * under the License. * */ + +#include "PnData.h" #include "qpid/messaging/amqp/AddressHelper.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/AddressImpl.h" @@ -33,6 +35,7 @@ extern "C" { #include <proton/engine.h> } + namespace qpid { namespace messaging { namespace amqp { @@ -239,179 +242,6 @@ bool replace(Variant::Map& map, const std::string& original, const std::string& } } -void write(pn_data_t* data, const Variant& value); - -void write(pn_data_t* data, const Variant::Map& map) -{ - pn_data_put_map(data); - pn_data_enter(data); - for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { - pn_data_put_string(data, convert(i->first)); - write(data, i->second); - } - pn_data_exit(data); -} -void write(pn_data_t* data, const Variant::List& list) -{ - pn_data_put_list(data); - pn_data_enter(data); - for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { - write(data, *i); - } - pn_data_exit(data); -} -void write(pn_data_t* data, const Variant& value) -{ - switch (value.getType()) { - case qpid::types::VAR_VOID: - pn_data_put_null(data); - break; - case qpid::types::VAR_BOOL: - pn_data_put_bool(data, value.asBool()); - break; - case qpid::types::VAR_UINT64: - pn_data_put_ulong(data, value.asUint64()); - break; - case qpid::types::VAR_INT64: - pn_data_put_long(data, value.asInt64()); - break; - case qpid::types::VAR_DOUBLE: - pn_data_put_double(data, value.asDouble()); - break; - case qpid::types::VAR_STRING: - pn_data_put_string(data, convert(value.asString())); - break; - case qpid::types::VAR_MAP: - write(data, value.asMap()); - break; - case qpid::types::VAR_LIST: - write(data, value.asList()); - break; - default: - break; - } -} -bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value); -bool read(pn_data_t* data, qpid::types::Variant& value) -{ - return read(data, pn_data_type(data), value); -} -void readList(pn_data_t* data, qpid::types::Variant::List& value) -{ - size_t count = pn_data_get_list(data); - pn_data_enter(data); - for (size_t i = 0; i < count && pn_data_next(data); ++i) { - qpid::types::Variant e; - if (read(data, e)) value.push_back(e); - } - pn_data_exit(data); -} -void readMap(pn_data_t* data, qpid::types::Variant::Map& value) -{ - size_t count = pn_data_get_list(data); - pn_data_enter(data); - for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { - std::string key = convert(pn_data_get_symbol(data)); - pn_data_next(data); - qpid::types::Variant e; - if (read(data, e)) value[key]= e; - } - pn_data_exit(data); -} -void readArray(pn_data_t* data, qpid::types::Variant::List& value) -{ - size_t count = pn_data_get_array(data); - pn_type_t type = pn_data_get_array_type(data); - pn_data_enter(data); - for (size_t i = 0; i < count && pn_data_next(data); ++i) { - qpid::types::Variant e; - if (read(data, type, e)) value.push_back(e); - } - pn_data_exit(data); -} -bool read(pn_data_t* data, pn_type_t type, qpid::types::Variant& value) -{ - switch (type) { - case PN_NULL: - if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant(); - return true; - case PN_BOOL: - value = pn_data_get_bool(data); - return true; - case PN_UBYTE: - value = pn_data_get_ubyte(data); - return true; - case PN_BYTE: - value = pn_data_get_byte(data); - return true; - case PN_USHORT: - value = pn_data_get_ushort(data); - return true; - case PN_SHORT: - value = pn_data_get_short(data); - return true; - case PN_UINT: - value = pn_data_get_uint(data); - return true; - case PN_INT: - value = pn_data_get_int(data); - return true; - case PN_CHAR: - value = pn_data_get_char(data); - return true; - case PN_ULONG: - value = pn_data_get_ulong(data); - return true; - case PN_LONG: - value = pn_data_get_long(data); - return true; - case PN_TIMESTAMP: - value = pn_data_get_timestamp(data); - return true; - case PN_FLOAT: - value = pn_data_get_float(data); - return true; - case PN_DOUBLE: - value = pn_data_get_double(data); - return true; - case PN_UUID: - value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); - return true; - case PN_BINARY: - value = convert(pn_data_get_binary(data)); - value.setEncoding(qpid::types::encodings::BINARY); - return true; - case PN_STRING: - value = convert(pn_data_get_string(data)); - value.setEncoding(qpid::types::encodings::UTF8); - return true; - case PN_SYMBOL: - value = convert(pn_data_get_string(data)); - value.setEncoding(qpid::types::encodings::ASCII); - return true; - case PN_LIST: - value = qpid::types::Variant::List(); - readList(data, value.asList()); - return true; - break; - case PN_MAP: - value = qpid::types::Variant::Map(); - readMap(data, value.asMap()); - return true; - case PN_ARRAY: - value = qpid::types::Variant::List(); - readArray(data, value.asList()); - return true; - case PN_DESCRIBED: - case PN_DECIMAL32: - case PN_DECIMAL64: - case PN_DECIMAL128: - default: - return false; - } - -} - const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes const uint32_t DEFAULT_TIMEOUT(0); } @@ -680,9 +510,9 @@ void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) requested.erase(j->first); } } else if (key == AUTO_DELETE) { - read(data, v); + PnData(data).read(v); isAutoDeleted = v.asBool(); - } else if (j != requested.end() && (read(data, v) && v.asString() == j->second.asString())) { + } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) { requested.erase(j->first); } } @@ -815,7 +645,7 @@ void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMod } else { pn_data_put_ulong(filter, i->descriptorCode); } - write(filter, i->value); + PnData(filter).write(i->value); pn_data_exit(filter); } pn_data_exit(filter); @@ -902,7 +732,7 @@ void AddressHelper::setNodeProperties(pn_terminus_t* terminus) putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); } else { pn_data_put_symbol(data, convert(i->first)); - write(data, i->second); + PnData(data).write(i->second); } } pn_data_exit(data); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index b6a72fd204..68a8f85f82 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -20,6 +20,7 @@ */ #include "ConnectionContext.h" #include "DriverImpl.h" +#include "PnData.h" #include "ReceiverContext.h" #include "Sasl.h" #include "SenderContext.h" @@ -49,6 +50,8 @@ extern "C" { namespace qpid { namespace messaging { namespace amqp { +using types::Variant; + namespace { //remove conditional when 0.5 is no longer supported @@ -872,13 +875,6 @@ namespace { const std::string CLIENT_PROCESS_NAME("qpid.client_process"); const std::string CLIENT_PID("qpid.client_pid"); const std::string CLIENT_PPID("qpid.client_ppid"); -pn_bytes_t convert(const std::string& s) -{ - pn_bytes_t result; - result.start = const_cast<char*>(s.data()); - result.size = s.size(); - return result; -} } void ConnectionContext::setProperties() { @@ -886,15 +882,21 @@ void ConnectionContext::setProperties() pn_data_put_map(data); pn_data_enter(data); - pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME)); + pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME)); std::string processName = sys::SystemInfo::getProcessName(); - pn_data_put_string(data, convert(processName)); + pn_data_put_string(data, PnData::str(processName)); - pn_data_put_symbol(data, convert(CLIENT_PID)); + pn_data_put_symbol(data, PnData::str(CLIENT_PID)); pn_data_put_int(data, sys::SystemInfo::getProcessId()); - pn_data_put_symbol(data, convert(CLIENT_PPID)); + pn_data_put_symbol(data, PnData::str(CLIENT_PPID)); pn_data_put_int(data, sys::SystemInfo::getParentProcessId()); + + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) + { + pn_data_put_symbol(data, PnData::str(i->first)); + PnData(data).write(i->second); + } pn_data_exit(data); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp new file mode 100644 index 0000000000..5c57c5b0a3 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp @@ -0,0 +1,218 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PnData.h" +#include "qpid/types/encodings.h" + +namespace qpid { +namespace messaging { +namespace amqp { + +using types::Variant; + +void PnData::write(const Variant::Map& map) +{ + pn_data_put_map(data); + pn_data_enter(data); + for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { + pn_data_put_string(data, str(i->first)); + write(i->second); + } + pn_data_exit(data); +} +void PnData::write(const Variant::List& list) +{ + pn_data_put_list(data); + pn_data_enter(data); + for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + write(*i); + } + pn_data_exit(data); +} +void PnData::write(const Variant& value) +{ + switch (value.getType()) { + case qpid::types::VAR_VOID: + pn_data_put_null(data); + break; + case qpid::types::VAR_BOOL: + pn_data_put_bool(data, value.asBool()); + break; + case qpid::types::VAR_UINT64: + pn_data_put_ulong(data, value.asUint64()); + break; + case qpid::types::VAR_INT64: + pn_data_put_long(data, value.asInt64()); + break; + case qpid::types::VAR_DOUBLE: + pn_data_put_double(data, value.asDouble()); + break; + case qpid::types::VAR_STRING: + pn_data_put_string(data, str(value.asString())); + break; + case qpid::types::VAR_MAP: + write(value.asMap()); + break; + case qpid::types::VAR_LIST: + write(value.asList()); + break; + default: + break; + } +} + +bool PnData::read(qpid::types::Variant& value) +{ + return read(pn_data_type(data), value); +} + +void PnData::readList(qpid::types::Variant::List& value) +{ + size_t count = pn_data_get_list(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + qpid::types::Variant e; + if (read(e)) value.push_back(e); + } + pn_data_exit(data); +} + +void PnData::readMap(qpid::types::Variant::Map& value) +{ + size_t count = pn_data_get_list(data); + pn_data_enter(data); + for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { + std::string key = str(pn_data_get_symbol(data)); + pn_data_next(data); + qpid::types::Variant e; + if (read(e)) value[key]= e; + } + pn_data_exit(data); +} + +void PnData::readArray(qpid::types::Variant::List& value) +{ + size_t count = pn_data_get_array(data); + pn_type_t type = pn_data_get_array_type(data); + pn_data_enter(data); + for (size_t i = 0; i < count && pn_data_next(data); ++i) { + qpid::types::Variant e; + if (read(type, e)) value.push_back(e); + } + pn_data_exit(data); +} + +bool PnData::read(pn_type_t type, qpid::types::Variant& value) +{ + switch (type) { + case PN_NULL: + if (value.getType() != qpid::types::VAR_VOID) value = qpid::types::Variant(); + return true; + case PN_BOOL: + value = pn_data_get_bool(data); + return true; + case PN_UBYTE: + value = pn_data_get_ubyte(data); + return true; + case PN_BYTE: + value = pn_data_get_byte(data); + return true; + case PN_USHORT: + value = pn_data_get_ushort(data); + return true; + case PN_SHORT: + value = pn_data_get_short(data); + return true; + case PN_UINT: + value = pn_data_get_uint(data); + return true; + case PN_INT: + value = pn_data_get_int(data); + return true; + case PN_CHAR: + value = pn_data_get_char(data); + return true; + case PN_ULONG: + value = pn_data_get_ulong(data); + return true; + case PN_LONG: + value = pn_data_get_long(data); + return true; + case PN_TIMESTAMP: + value = pn_data_get_timestamp(data); + return true; + case PN_FLOAT: + value = pn_data_get_float(data); + return true; + case PN_DOUBLE: + value = pn_data_get_double(data); + return true; + case PN_UUID: + value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); + return true; + case PN_BINARY: + value = str(pn_data_get_binary(data)); + value.setEncoding(qpid::types::encodings::BINARY); + return true; + case PN_STRING: + value = str(pn_data_get_string(data)); + value.setEncoding(qpid::types::encodings::UTF8); + return true; + case PN_SYMBOL: + value = str(pn_data_get_string(data)); + value.setEncoding(qpid::types::encodings::ASCII); + return true; + case PN_LIST: + value = qpid::types::Variant::List(); + readList(value.asList()); + return true; + break; + case PN_MAP: + value = qpid::types::Variant::Map(); + readMap(value.asMap()); + return true; + case PN_ARRAY: + value = qpid::types::Variant::List(); + readArray(value.asList()); + return true; + case PN_DESCRIBED: + case PN_DECIMAL32: + case PN_DECIMAL64: + case PN_DECIMAL128: + default: + return false; + } + +} + +pn_bytes_t PnData::str(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} + +std::string PnData::str(const pn_bytes_t& in) +{ + return std::string(in.start, in.size); +} + +}}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/PnData.h b/qpid/cpp/src/qpid/messaging/amqp/PnData.h new file mode 100644 index 0000000000..6d03235432 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/PnData.h @@ -0,0 +1,60 @@ +#ifndef QPID_MESSAGING_AMQP_PNDATA_H +#define QPID_MESSAGING_AMQP_PNDATA_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/types/Variant.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +/** + * Helper class to read/write messaging types to/from pn_data_t. + */ +class PnData +{ + public: + PnData(pn_data_t* d) : data(d) {} + + void write(const types::Variant& value); + void write(const types::Variant::Map& map); + void write(const types::Variant::List& list); + + bool read(pn_type_t type, types::Variant& value); + bool read(types::Variant& value); + void readList(types::Variant::List& value); + void readMap(types::Variant::Map& value); + void readArray(types::Variant::List& value); + + static pn_bytes_t str(const std::string&); + static std::string str(const pn_bytes_t&); + + private: + pn_data_t* data; +}; +}}} // namespace messaging::amqp + +#endif /*!QPID_MESSAGING_AMQP_PNDATA_H*/ diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 4b6820e4fd..4e5c255a1a 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -21,19 +21,47 @@ import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal -from qpid import connection, messaging, util +from qpid import connection, util from qpid.compat import format_exc from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger +from qpidtoollibs import BrokerAgent -try: import qmf.console -except: print "Cannot import module qmf.console, skipping tests"; exit(0); +# NOTE: Always import native client qpid.messaging, import swigged client +# qpid_messaging if possible. qpid_messaing is set to None if not available. +# +# qm is set to qpid_messaging if it is available, qpid.messaging if not. +# Use qm.X to specify names from the default messaging module. +# +# Set environment variable QPID_PY_NO_SWIG=1 to prevent qpid_messaging from loading. +# +# BrokerTest can be configured to determine which protocol is used by default: +# +# -DPROTOCOL="amqpX": Use protocol "amqpX". Defaults to amqp1.0 if swig client +# is being used, amqp0-10 if native client is being used. +# +# The configured defaults can be over-ridden on BrokerTest.connect and some +# other methods by specifying native=True|False and protocol="amqpX" +# +import qpid.messaging +qm = qpid.messaging +qpid_messaging = None +if not os.environ.get("QPID_PY_NO_SWIG"): + try: + import qpid_messaging + from qpid.datatypes import uuid4 + qm = qpid_messaging + # Silence warnings from swigged messaging library unless enabled in environment. + if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ: + qm.Logger.configure(["--log-enable=error"]) + except ImportError: + print "Cannot load python SWIG bindings, falling back to native qpid.messaging." -log = getLogger("qpid.brokertest") +log = getLogger("brokertest") # Values for expected outcome of process at end of test EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. @@ -149,7 +177,7 @@ class Popen(subprocess.Popen): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) - def stop(self): # Clean up at end of test. + def teardown(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead @@ -253,14 +281,16 @@ class Broker(Popen): self.test = test self._port=port + args = copy(args) + if BrokerTest.amqp_lib: args += ["--load-module", BrokerTest.amqp_lib] if BrokerTest.store_lib and not test_store: - args = args + ['--load-module', BrokerTest.store_lib] + args += ['--load-module', BrokerTest.store_lib] if BrokerTest.sql_store_lib: - args = args + ['--load-module', BrokerTest.sql_store_lib] - args = args + ['--catalog', BrokerTest.sql_catalog] + args += ['--load-module', BrokerTest.sql_store_lib] + args += ['--catalog', BrokerTest.sql_catalog] if BrokerTest.sql_clfs_store_lib: - args = args + ['--load-module', BrokerTest.sql_clfs_store_lib] - args = args + ['--catalog', BrokerTest.sql_catalog] + args += ['--load-module', BrokerTest.sql_clfs_store_lib] + args += ['--catalog', BrokerTest.sql_catalog] cmd = [BrokerTest.qpidd_exec, "--port", port, "--interface", "127.0.0.1", "--no-module-dir"] + args if not "--auth" in args: cmd.append("--auth=no") if wait != None: @@ -288,13 +318,11 @@ class Broker(Popen): cmd += ["--data-dir", self.datadir] if show_cmd: print cmd Popen.__init__(self, cmd, expect, stdout=PIPE) - test.cleanup_stop(self) + test.teardown_add(self) self._host = "127.0.0.1" - log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) + self._agent = None - def startQmf(self, handler=None): - self.qmf_session = qmf.console.Session(handler) - self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) + log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) def host(self): return self._host @@ -310,22 +338,25 @@ class Broker(Popen): def unexpected(self,msg): raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) - def connect(self, timeout=5, **kwargs): - """New API connection to the broker.""" - return messaging.Connection.establish(self.host_port(), timeout=timeout, **kwargs) + def connect(self, timeout=5, native=False, **kwargs): + """New API connection to the broker. + @param native if True force use of the native qpid.messaging client + even if swig client is available. + """ + if self.test.protocol: kwargs.setdefault("protocol", self.test.protocol) + if native: connection_class = qpid.messaging.Connection + else: connection_class = qm.Connection + return connection_class.establish(self.host_port(), timeout=timeout, **kwargs) + + @property + def agent(self, **kwargs): + """Return a BrokerAgent for this broker""" + if not self._agent: self._agent = BrokerAgent(self.connect(**kwargs)) + return self._agent - def connect_old(self): - """Old API connection to the broker.""" - socket = qpid.util.connect(self.host(),self.port()) - connection = qpid.connection.Connection (sock=socket) - connection.start() - return connection; def declare_queue(self, queue): - c = self.connect_old() - s = c.session(str(qpid.datatypes.uuid4())) - s.queue_declare(queue=queue) - c.close() + self.agent.addQueue(queue) def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) @@ -402,7 +433,7 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content): contents = [] try: while True: contents.append(transform(r.fetch(timeout=timeout))) - except messaging.Empty: pass + except qm.Empty: pass finally: r.close() return contents @@ -451,30 +482,42 @@ class BrokerTest(TestCase): def configure(self, config): self.config=config def setUp(self): - outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" + defs = self.config.defines + outdir = defs.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) os.makedirs(self.dir) os.chdir(self.dir) - self.stopem = [] # things to stop at end of test + self.teardown_list = [] # things to tear down at end of test + + self.protocol = defs.get("PROTOCOL") or ("amqp1.0" if qpid_messaging else "amqp0-10") + self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0 + def tearDown(self): err = [] - for p in self.stopem: - try: p.stop() - except Exception, e: err.append(str(e)) - self.stopem = [] # reset in case more processes start + self.teardown_list.reverse() # Tear down in reverse order + for p in self.teardown_list: + log.debug("Tearing down %s", p) + try: + # Call the first of the methods that is available on p. + for m in ["teardown", "close"]: + a = getattr(p, m, None) + if a: a(); break + else: raise Exception("Don't know how to tear down %s", p) + except Exception, e: err.append("%s: %s"%(e.__class__.__name__, str(e))) + self.teardown_list = [] # reset in case more processes start os.chdir(self.rootdir) if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) - def cleanup_stop(self, stopable): - """Call thing.stop at end of test""" - self.stopem.append(stopable) + def teardown_add(self, thing): + """Call thing.teardown() or thing.close() at end of test""" + self.teardown_list.append(thing) def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) - self.cleanup_stop(p) + self.teardown_add(p) return p def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False): @@ -490,6 +533,11 @@ class BrokerTest(TestCase): def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) + def protocol_option(self, connection_options=""): + if "protocol" in connection_options: return connection_options + else: return ",".join(filter(None, [connection_options,"protocol:'%s'"%self.protocol])) + + def join(thread, timeout=30): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) @@ -524,7 +572,7 @@ class NumberedSender(Thread): def __init__(self, broker, max_depth=None, queue="test-queue", connection_options=RECONNECT_OPTIONS, - failover_updates=True, url=None, args=[]): + failover_updates=False, url=None, args=[]): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. @@ -533,7 +581,7 @@ class NumberedSender(Thread): cmd = ["qpid-send", "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, - "--connection-options", "{%s}"%(connection_options), + "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)), "--content-stdin" ] + args if failover_updates: cmd += ["--failover-updates"] @@ -592,7 +640,7 @@ class NumberedReceiver(Thread): """ def __init__(self, broker, sender=None, queue="test-queue", connection_options=RECONNECT_OPTIONS, - failover_updates=True, url=None, args=[]): + failover_updates=False, url=None, args=[]): """ sender: enable flow control. Call sender.received(n) for each message received. """ @@ -601,7 +649,7 @@ class NumberedReceiver(Thread): cmd = ["qpid-receive", "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, - "--connection-options", "{%s}"%(connection_options), + "--connection-options", "{%s}"%(broker.test.protocol_option(connection_options)), "--forever" ] if failover_updates: cmd += [ "--failover-updates" ] diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 0f92f7dbcc..2bf8677cd1 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -20,8 +20,6 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty -from qpid.datatypes import uuid4, UUID from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO @@ -44,7 +42,7 @@ class LogLevel: class QmfAgent(object): """Access to a QMF broker agent.""" def __init__(self, address, **kwargs): - self._connection = Connection.establish( + self._connection = qm.Connection.establish( address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) @@ -105,9 +103,9 @@ class HaPort: self.port = self.socket.getsockname()[1] self.fileno = self.socket.fileno() self.stopped = False - test.cleanup_stop(self) # Stop during test.tearDown + test.teardown_add(self) # Stop during test.tearDown - def stop(self): # Called in tearDown + def teardown(self): # Called in tearDown if not self.stopped: self.stopped = True self.socket.shutdown(socket.SHUT_RDWR) @@ -180,6 +178,7 @@ acl allow all all def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]); def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + @property def agent(self): if not self._agent: cred = self.client_credentials @@ -190,7 +189,7 @@ acl allow all all return self._agent def qmf(self): - hb = self.agent().getHaBroker() + hb = self.agent.getHaBroker() hb.update() return hb @@ -203,19 +202,19 @@ acl allow all all try: self._status = self.ha_status() return self._status == status; - except ConnectionError: return False + except qm.ConnectionError: return False assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( self, status, self._status) - def wait_queue(self, queue, timeout=1): + def wait_queue(self, queue, timeout=1, msg="wait_queue"): """ Wait for queue to be visible via QMF""" - agent = self.agent() - assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout) + agent = self.agent + assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue - def wait_no_queue(self, queue, timeout=1): + def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"): """ Wait for queue to be invisible via QMF""" - agent = self.agent() - assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout) + agent = self.agent + assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue) # TODO aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -273,12 +272,12 @@ acl allow all all def assert_connect_fail(self): try: self.connect() - self.test.fail("Expected ConnectionError") - except ConnectionError: pass + self.test.fail("Expected qm.ConnectionError") + except qm.ConnectionError: pass def try_connect(self): try: return self.connect() - except ConnectionError: return None + except qm.ConnectionError: return None def ready(self, *args, **kwargs): if not 'client_properties' in kwargs: kwargs['client_properties'] = {} @@ -286,7 +285,7 @@ acl allow all all return Broker.ready(self, *args, **kwargs) def kill(self, final=True): - if final: self.ha_port.stop() + if final: self.ha_port.teardown() self._agent = None return Broker.kill(self) @@ -355,9 +354,9 @@ class HaCluster(object): b.set_brokers_url(self.url) b.set_public_url(self.url) - def connect(self, i): + def connect(self, i, **kwargs): """Connect with reconnect_urls""" - return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) def kill(self, i, promote_next=True, final=True): """Kill broker i, promote broker i+1""" @@ -393,7 +392,7 @@ def wait_address(session, address): """Wait for an address to become valid.""" def check(): try: session.sender(address); return True - except NotFound: return False + except qm.NotFound: return False assert retry(check), "Timed out waiting for address %s"%(address) def valid_address(session, address): @@ -401,6 +400,6 @@ def valid_address(session, address): try: session.receiver(address) return True - except NotFound: return False + except qm.NotFound: return False diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index f22e12a355..6ac7888f93 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -20,7 +19,6 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback -from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty, ServerError from qpid.datatypes import uuid4, UUID from qpid.harness import Skipped from brokertest import * @@ -31,18 +29,9 @@ from qpidtoollibs import BrokerAgent, EventHelper log = getLogger(__name__) - class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" -def alt_setup(session, suffix): - # Create exchange to use as alternate and a queue bound to it. - # altex exchange: acts as alternate exchange - session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix)) - # altq queue bound to altex, collect re-routed messages. - session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix)) - - class ReplicationTests(HaBrokerTest): """Correctness tests for HA replication.""" @@ -50,38 +39,46 @@ class ReplicationTests(HaBrokerTest): """Test basic replication of configuration and messages before and after backup has connected""" - def queue(name, replicate): - return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + def setup(prefix, primary): + """Create config, send messages on the primary p""" + a = primary.agent + + def queue(name, replicate): + a.addQueue(name, options={'qpid.replicate':replicate}) + return name - def exchange(name, replicate, bindq, key): - return "%s/%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'topic'},x-bindings:[{exchange:'%s',queue:'%s',key:'%s'}]}}"%(name, key, replicate, name, bindq, key) + def exchange(name, replicate, bindq, key): + a.addExchange("fanout", name, options={'qpid.replicate':replicate}) + a.bind(name, bindq, key) + return name - def setup(p, prefix, primary): - """Create config, send messages on the primary p""" + # Test replication of messages + p = primary.connect().session() s = p.sender(queue(prefix+"q1", "all")) - for m in ["a", "b", "1"]: s.send(Message(m)) + for m in ["a", "b", "1"]: s.send(qm.Message(m)) # Test replication of dequeue self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a") p.acknowledge() - p.sender(queue(prefix+"q2", "configuration")).send(Message("2")) - p.sender(queue(prefix+"q3", "none")).send(Message("3")) - p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(Message("4")) - p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(Message("5")) + + p.sender(queue(prefix+"q2", "configuration")).send(qm.Message("2")) + p.sender(queue(prefix+"q3", "none")).send(qm.Message("3")) + p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(qm.Message("4")) + p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(qm.Message("5")) # Test unbind - p.sender(queue(prefix+"q4", "all")).send(Message("6")) + p.sender(queue(prefix+"q4", "all")).send(qm.Message("6")) s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4")) - s3.send(Message("7")) - # Use old connection to unbind - us = primary.connect_old().session(str(uuid4())) - us.exchange_unbind(exchange=prefix+"e4", binding_key="key4", queue=prefix+"q4") - p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped + s3.send(qm.Message("7")) + a.unbind(prefix+"e4", prefix+"q4", "key4") + p.sender(prefix+"e4").send(qm.Message("drop1")) # Should be dropped + # Test replication of deletes - p.sender(queue(prefix+"dq", "all")) - p.sender(exchange(prefix+"de", "all", prefix+"dq", "")) - p.sender(prefix+"dq;{delete:always}").close() - p.sender(prefix+"de;{delete:always}").close() + queue(prefix+"dq", "all") + exchange(prefix+"de", "all", prefix+"dq", "") + a.delQueue(prefix+"dq") + a.delExchange(prefix+"de") + # Need a marker so we can wait till sync is done. - p.sender(queue(prefix+"x", "configuration")) + queue(prefix+"x", "configuration") def verify(b, prefix, p): """Verify setup was replicated to backup b""" @@ -97,14 +94,14 @@ class ReplicationTests(HaBrokerTest): assert not valid_address(b, prefix+"q3") # Verify exchange with replicate=all - b.sender(prefix+"e1/key1").send(Message(prefix+"e1")) + b.sender(prefix+"e1/key1").send(qm.Message(prefix+"e1")) self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) # Verify exchange with replicate=configuration - b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) + b.sender(prefix+"e2/key2").send(qm.Message(prefix+"e2")) self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"]) - b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind. + b.sender(prefix+"e4/key4").send(qm.Message("drop2")) # Verify unbind. self.assert_browse_retry(b, prefix+"q4", ["6","7"]) # Verify deletes @@ -117,25 +114,26 @@ class ReplicationTests(HaBrokerTest): primary = cluster[0] backup = cluster[1] - p = primary.connect().session() - # Send messages before re-starting the backup, test catch-up replication. cluster.kill(1, promote_next=False, final=False) - setup(p, "1", primary) + setup("1", primary) cluster.restart(1) # Send messages after re-starting the backup, to test steady-state replication. - setup(p, "2", primary) + setup("2", primary) + + p = primary.connect().session() # Verify the data on the backup b = backup.connect_admin().session() verify(b, "1", p) verify(b, "2", p) # Test a series of messages, enqueue all then dequeue all. - s = p.sender(queue("foo","all")) + primary.agent.addQueue("foo") + s = p.sender("foo") wait_address(b, "foo") msgs = [str(i) for i in range(10)] - for m in msgs: s.send(Message(m)) + for m in msgs: s.send(qm.Message(m)) self.assert_browse_retry(p, "foo", msgs) self.assert_browse_retry(b, "foo", msgs) r = p.receiver("foo") @@ -145,7 +143,7 @@ class ReplicationTests(HaBrokerTest): self.assert_browse_retry(b, "foo", []) # Another series, this time verify each dequeue individually. - for m in msgs: s.send(Message(m)) + for m in msgs: s.send(qm.Message(m)) self.assert_browse_retry(p, "foo", msgs) self.assert_browse_retry(b, "foo", msgs) for i in range(len(msgs)): @@ -187,14 +185,16 @@ class ReplicationTests(HaBrokerTest): "--broker", brokers[0].host_port(), "--address", "q;{create:always}", "--messages=1000", - "--content-string=x" + "--content-string=x", + "--connection-options={%s}"%self.protocol_option() ]) receiver = self.popen( ["qpid-receive", "--broker", brokers[0].host_port(), "--address", "q;{create:always}", "--messages=990", - "--timeout=10" + "--timeout=10", + "--connection-options={%s}"%self.protocol_option() ]) self.assertEqual(sender.wait(), 0) self.assertEqual(receiver.wait(), 0) @@ -215,7 +215,7 @@ class ReplicationTests(HaBrokerTest): try: backup.connect().session() self.fail("Expected connection to backup to fail") - except ConnectionError: pass + except qm.ConnectionError: pass # Check that admin connections are allowed to backup. backup.connect_admin().close() @@ -224,8 +224,8 @@ class ReplicationTests(HaBrokerTest): reconnect=True) s = c.session() sender = s.sender("q;{create:always}") - backup.wait_backup("q") - sender.send("foo") + sender.send("foo", sync=True) + s.sync() primary.kill() assert retry(lambda: not is_running(primary.pid)) backup.promote() @@ -243,31 +243,36 @@ class ReplicationTests(HaBrokerTest): broker_addr = broker.host_port() # Case 1: Connect before stalling the broker, use the connection after stalling. - c = Connection(broker_addr, heartbeat=1) + c = qm.Connection(broker_addr, heartbeat=1) c.open() os.kill(broker.pid, signal.SIGSTOP) # Stall the broker - self.assertRaises(ConnectionError, c.session().sender, "foo") + + def make_sender(): c.session().sender("foo") + self.assertRaises(qm.ConnectionError, make_sender) # Case 2: Connect to a stalled broker - c = Connection(broker_addr, heartbeat=1) - self.assertRaises(ConnectionError, c.open) + c = qm.Connection(broker_addr, heartbeat=1) + self.assertRaises(qm.ConnectionError, c.open) # Case 3: Re-connect to a stalled broker. broker2 = Broker(self) - c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1, - reconnect=True, reconnect_urls=[broker_addr], - reconnect_log=False) # Hide expected warnings + c = qm.Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1, + reconnect=True, reconnect_urls=[broker_addr], + reconnect_log=False) # Hide expected warnings c.open() broker2.kill() # Cause re-connection to broker - self.assertRaises(ConnectionError, c.session().sender, "foo") + self.assertRaises(qm.ConnectionError, make_sender) def test_failover_cpp(self): """Verify that failover works in the C++ client.""" cluster = HaCluster(self, 2) cluster[0].connect().session().sender("q;{create:always}") cluster[1].wait_backup("q") - sender = NumberedSender(cluster[0], url=cluster.url, queue="q", failover_updates = False) - receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q", failover_updates = False) + # FIXME aconway 2014-02-21: using 0-10, there is a failover problem with 1.0 + sender = NumberedSender(cluster[0], url=cluster.url, queue="q", + connection_options="reconnect:true,protocol:'amqp0-10'") + receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q", + connection_options="reconnect:true,protocol:'amqp0-10'") receiver.start() sender.start() assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru @@ -330,7 +335,7 @@ class ReplicationTests(HaBrokerTest): # Set up replication with qpid-config ps2 = pc.session().sender("q2;{create:always}") backup.config_replicate(primary.host_port(), "q2"); - ps2.send("x", timeout=1) + ps2.send("x") backup.assert_browse_backup("q2", ["x"]) @@ -349,6 +354,7 @@ class ReplicationTests(HaBrokerTest): br = backup.connect().session().receiver("q;{create:always}") backup.replicate(cluster.url, "q") ps.send("a") + ps.sync() backup.assert_browse_backup("q", ["a"]) cluster.bounce(0) backup.assert_browse_backup("q", ["a"]) @@ -356,6 +362,8 @@ class ReplicationTests(HaBrokerTest): backup.assert_browse_backup("q", ["a", "b"]) cluster[0].wait_status("ready") cluster.bounce(1) + # FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig + if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug") self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() backup.assert_browse_backup("q", ["b"]) @@ -369,7 +377,7 @@ class ReplicationTests(HaBrokerTest): s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") def send(key,value,expect): - s.send(Message(content=value,properties={"lvq-key":key}), timeout=1) + s.send(qm.Message(content=value,properties={"lvq-key":key})) cluster[1].assert_browse_backup("lvq", expect) send("a", "a-1", ["a-1"]) @@ -387,7 +395,7 @@ class ReplicationTests(HaBrokerTest): """Verify that we replicate to an LVQ correctly""" cluster = HaCluster(self, 2) s = cluster[0].connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") - for i in range(10): s.send(Message(str(i))) + for i in range(10): s.send(qm.Message(str(i))) cluster[1].assert_browse_backup("q", [str(i) for i in range(5,10)]) def test_reject(self): @@ -396,11 +404,11 @@ class ReplicationTests(HaBrokerTest): primary, backup = cluster s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}") try: - for i in range(10): s.send(Message(str(i)), sync=False) - except qpid.messaging.exceptions.TargetCapacityExceeded: pass + for i in range(10): s.send(qm.Message(str(i)), sync=False) + except qm.TargetCapacityExceeded: pass backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) - # Detach, don't close as there is a broken session - s.session.connection.detach() + try: s.session.connection.close() + except: pass # Expect exception from broken session def test_priority(self): """Verify priority queues replicate correctly""" @@ -408,7 +416,7 @@ class ReplicationTests(HaBrokerTest): session = cluster[0].connect().session() s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] - for p in priorities: s.send(Message(priority=p)) + for p in priorities: s.send(qm.Message(priority=p)) # Can't use browse_backup as browser sees messages in delivery order not priority. cluster[1].wait_backup("priority-queue") r = cluster[1].connect_admin().session().receiver("priority-queue") @@ -425,7 +433,7 @@ class ReplicationTests(HaBrokerTest): limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2} limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()]) s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy)) - messages = [Message(content=str(uuid4()), priority = p) for p in priorities] + messages = [qm.Message(content=str(uuid4()), priority = p) for p in priorities] for m in messages: s.send(m) backup.wait_backup(s.target) r = backup.connect_admin().session().receiver("priority-queue") @@ -439,7 +447,7 @@ class ReplicationTests(HaBrokerTest): primary, backup = cluster s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] - for p in priorities: s.send(Message(priority=p)) + for p in priorities: s.send(qm.Message(priority=p)) expect = sorted(priorities,reverse=True)[0:5] primary.assert_browse("q", expect, transform=lambda m: m.priority) backup.assert_browse_backup("q", expect, transform=lambda m: m.priority) @@ -456,8 +464,8 @@ class ReplicationTests(HaBrokerTest): def send(self, connection): """Send messages, then acquire one but don't acknowledge""" s = connection.session() - for m in range(10): s.sender(self.address).send(str(m), timeout=1) - s.receiver(self.address, timeout=1).fetch() + for m in range(10): s.sender(self.address).send(str(m)) + s.receiver(self.address).fetch() def verify(self, brokertest, backup): backup.assert_browse_backup(self.queue, self.expect, msg=self.queue) @@ -492,38 +500,33 @@ class ReplicationTests(HaBrokerTest): cluster2[1].wait_status("ready") cluster2[0].connect().session().sender("q;{create:always}") time.sleep(.1) # Give replication a chance. - try: - cluster1[1].connect_admin().session().receiver("q") - self.fail("Excpected no-such-queue exception") - except NotFound: pass - try: - cluster2[1].connect_admin().session().receiver("q") - self.fail("Excpected no-such-queue exception") - except NotFound: pass + # Expect queues not to be found + self.assertRaises(qm.NotFound, cluster1[1].connect_admin().session().receiver, "q") + self.assertRaises(qm.NotFound, cluster2[1].connect_admin().session().receiver, "q") def test_replicate_binding(self): """Verify that binding replication can be disabled""" cluster = HaCluster(self, 2) primary, backup = cluster[0], cluster[1] ps = primary.connect().session() - ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") - ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") + a = primary.agent + a.addExchange("fanout", "ex") + a.addQueue("q") + a.bind("ex", "q", options={'qpid.replicate':'none'}) backup.wait_backup("q") primary.kill() assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die backup.promote() bs = backup.connect_admin().session() - bs.sender("ex").send(Message("msg")) + bs.sender("ex").send(qm.Message("msg")) self.assert_browse_retry(bs, "q", []) def test_invalid_replication(self): """Verify that we reject an attempt to declare a queue with invalid replication value.""" cluster = HaCluster(self, 1, ha_replicate="all") - try: - c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") - self.fail("Expected ConnectionError") - except ConnectionError: pass + self.assertRaises(Exception, cluster[0].connect().session().sender, + "q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") def test_exclusive_queue(self): """Ensure that we can back-up exclusive queues, i.e. the replicating @@ -533,12 +536,12 @@ class ReplicationTests(HaBrokerTest): c = cluster[0].connect() q = addr.split(";")[0] r = c.session().receiver(addr) - try: c.session().receiver(addr); self.fail("Expected exclusive exception") - except ReceiverError: pass + self.assertRaises(qm.LinkError, c.session().receiver, addr) s = c.session().sender(q).send(q) cluster[1].assert_browse_backup(q, [q]) - test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + if qm == qpid.messaging: # FIXME aconway 2014-02-20: swig client no exclusive subscribe + test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); def test_auto_delete_exclusive(self): """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues""" @@ -568,12 +571,12 @@ class ReplicationTests(HaBrokerTest): cluster = HaCluster(self, 3) def ha_broker(broker): - ha_broker = broker.agent().getHaBroker(); + ha_broker = broker.agent.getHaBroker(); ha_broker.update() return ha_broker for broker in cluster: # Make sure HA system-id matches broker's - self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef)) + self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent.getBroker().systemRef)) # Check that all brokers have the same membership as the cluster def check_ids(broker): @@ -612,7 +615,6 @@ acl allow zag@QPID access method acl allow zag@QPID create link # Normal user acl allow zig@QPID all all - acl deny all all """) aclf.close() @@ -625,9 +627,14 @@ acl deny all all client_credentials=Credentials("zag", "zag", "PLAIN")) c = cluster[0].connect(username="zig", password="zig") s0 = c.session(); - s0.sender("q;{create:always}") - s0.sender("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + a = cluster[0].agent + a.addQueue("q") + a.addExchange("fanout", "ex") + a.bind("ex", "q", "") s0.sender("ex").send("foo"); + + # Transactions should be done over the tx_protocol + c = cluster[0].connect(protocol=self.tx_protocol, username="zig", password="zig") s1 = c.session(transactional=True) s1.sender("ex").send("foo-tx"); cluster[1].assert_browse_backup("q", ["foo"]) @@ -640,19 +647,22 @@ acl deny all all cluster = HaCluster(self, 2) s = cluster[0].connect().session() # altex exchange: acts as alternate exchange - s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + a = cluster[0].agent + a.addExchange("fanout", "altex") # altq queue bound to altex, collect re-routed messages. - s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + a.addQueue("altq") + a.bind("altex", "altq", "") # ex exchange with alternate-exchange altex and no queues bound - s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + a.addExchange("direct", "ex", {"alternate-exchange":"altex"}) # create queue q with alternate-exchange altex - s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + a.addQueue("q", {"alternate-exchange":"altex"}) # create a bunch of exchanges to ensure we don't clean up prematurely if the # response comes in multiple fragments. for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i) def verify(broker): - s = broker.connect().session() + c = broker.connect() + s = c.session() # Verify unmatched message goes to ex's alternate. s.sender("ex").send("foo") altq = s.receiver("altq") @@ -662,17 +672,16 @@ acl deny all all s.sender("q").send("bar") msg = s.receiver("q").fetch(timeout=0) self.assertEqual("bar", msg.content) - s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + s.acknowledge(msg, qm.Disposition(qm.REJECTED)) # Reject the message self.assertEqual("bar", altq.fetch(timeout=0).content) s.acknowledge() - - def ss(n): return cluster[n].connect().session() + c.close() # Sanity check: alternate exchanges on original broker verify(cluster[0]) - # Altex is in use as an alternate exchange. - self.assertRaises(SessionError, - lambda:ss(0).sender("altex;{delete:always}").close()) + a = cluster[0].agent + # Altex is in use as an alternate exchange, we should get an exception + self.assertRaises(Exception, a.delExchange, "altex") # Check backup that was connected during setup. cluster[1].wait_status("ready") cluster[1].wait_backup("ex") @@ -689,15 +698,12 @@ acl deny all all verify(cluster[2]) # Check that alt-exchange in-use count is replicated - s = cluster[2].connect().session(); - - self.assertRaises(SessionError, - lambda:ss(2).sender("altex;{delete:always}").close()) - s.sender("q;{delete:always}").close() - self.assertRaises(SessionError, - lambda:ss(2).sender("altex;{delete:always}").close()) - s.sender("ex;{delete:always}").close() - s.sender("altex;{delete:always}").close() + a = cluster[2].agent + self.assertRaises(Exception, a.delExchange, "altex") + a.delQueue("q") + self.assertRaises(Exception, a.delExchange, "altex") + a.delExchange("ex") + a.delExchange("altex") def test_priority_reroute(self): """Regression test for QPID-4262, rerouting messages from a priority queue @@ -705,8 +711,11 @@ acl deny all all cluster = HaCluster(self, 2) primary = cluster[0] session = primary.connect().session() - s = session.sender("pq; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}},x-bindings:[{exchange:'amq.fanout',queue:pq}]}}") - for m in xrange(100): s.send(Message(str(m), priority=m%10)) + a = primary.agent + a.addQueue("pq", {'qpid.priorities':10}) + a.bind("amq.fanout", "pq") + s = session.sender("pq") + for m in xrange(100): s.send(qm.Message(str(m), priority=m%10)) pq = QmfAgent(primary.host_port()).getQueue("pq") pq.reroute(request=0, useAltExchange=False, exchange="amq.fanout") # Verify that consuming is in priority order @@ -740,8 +749,8 @@ acl deny all all # Verify the backup deletes the surplus queue and exchange cluster[1].wait_status("ready") s = cluster[1].connect_admin().session() - self.assertRaises(NotFound, s.receiver, ("q2")); - self.assertRaises(NotFound, s.receiver, ("e2")); + self.assertRaises(qm.NotFound, s.receiver, ("q2")); + self.assertRaises(qm.NotFound, s.receiver, ("e2")); def test_delete_qpid_4285(self): @@ -753,61 +762,72 @@ acl deny all all cluster[1].wait_backup("q") cluster.kill(0) # Make the backup take over. s = cluster[1].connect().session() - s.receiver("q;{delete:always}").close() # Delete q on new primary - try: - s.receiver("q") - self.fail("Expected NotFound exception") # Should not be avaliable - except NotFound: pass - assert not cluster[1].agent().getQueue("q") # Should not be in QMF + cluster[1].agent.delQueue("q") # Delete q on new primary + self.assertRaises(qm.NotFound, s.receiver, "q") + assert not cluster[1].agent.getQueue("q") # Should not be in QMF def test_auto_delete_failover(self): """Test auto-delete queues. Verify that: - queues auto-deleted on the primary are deleted on the backup. - - auto-delete queues with/without timeout are deleted after a failover. + - auto-delete queues with/without timeout are deleted after a failover correctly + - auto-delete queues never used (subscribe to) to are not deleted - messages are correctly routed to the alternate exchange. """ cluster = HaCluster(self, 3) s = cluster[0].connect().session() - def setup(q, timeout=""): - if timeout: timeout = ",arguments:{'qpid.auto_delete_timeout':%s}"%timeout + a = cluster[0].agent + + def setup(q, timeout=None): # Create alternate exchange, auto-delete queue and queue bound to alt. ex. - s.sender("%s-altex;{create:always,node:{type:topic,x-declare:{type:fanout}}}"%q) - qs = s.sender("%s;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:%s-altex%s}}}"%(q,q,timeout)) - s.sender("%s-altq;{create:always,node:{x-bindings:[{exchange:%s-altex,queue:%s-altq}]}}"%(q,q,q)) - qs.send(q) # Send a message to the auto-delete queue - return s - - for args in [("q1",""),("q2","0"),("q3","1"),("q4",""),("q5","")]: setup(*args) - receivers = [s.receiver("q%s"%i) for i in [1,2,3,4]] # Subscribe to queues - # Note q5 is never subscribed to, so should not be auto-deleted. + a.addExchange("fanout", q+"-altex") + args = {"auto-delete":True, "alternate-exchange":q+"-altex"} + if timeout is not None: args['qpid.auto_delete_timeout'] = timeout + a.addQueue(q, args) + a.addQueue(q+"-altq") + a.bind("%s-altex"%q, "%s-altq"%q) + + for args in [["q1"],["q2",0],["q3",1],["q4"],["q5"]]: setup(*args) + receivers = [] + for i in xrange(1,5): # Don't use q5 + q = "q%s"%i + receivers.append(s.receiver(q)) # Subscribe + qs = s.sender(q); qs.send(q); qs.close() # Send q name as message + receivers[3].close() # Trigger auto-delete for q4 - cluster[0].kill(final=False) + for b in cluster[1:3]: b.wait_no_queue("q4") # Verify deleted on backups + + cluster[0].kill(final=False) # Kill primary cluster[2].promote() cluster.restart(0) - cluster[2].assert_browse("q3",["q3"]) # Not yet auto-deleted, 1 sec timeout. - for i in [2,1,0]: - for q in ["q1", "q2", "q3","q4"]: - cluster[i].wait_no_queue(q,timeout=2) # auto-deleted - cluster[i].assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate - cluster[i].assert_browse_backup("q5", ["q5"]) # Never subscribed, not deleted. - cluster[i].assert_browse_backup("q5-altq", []) + cluster[2].wait_queue("q3") # Not yet auto-deleted, 1 sec timeout. + for b in cluster: + for q in ["q%s"%i for i in xrange(1,5)]: + b.wait_no_queue(q,timeout=2, msg=str(b)) # auto-deleted + b.assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate + cluster[2].wait_queue("q5") # Not auto-deleted, never subscribed + cluster[2].connect().session().receiver("q5").close() + cluster[2].wait_no_queue("q5") def test_auto_delete_close(self): """Verify auto-delete queues are deleted on backup if auto-deleted on primary""" cluster=HaCluster(self, 2) + + # Create altex to use as alternate exchange, with altq bound to it + a = cluster[0].agent + a.addExchange("fanout", "altex") + a.addQueue("altq", {"auto-delete":True}) + a.bind("altex", "altq") + p = cluster[0].connect().session() - alt_setup(p, "1") - r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1) + r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex'}}}") s = p.sender("adq1") for m in ["aa","bb","cc"]: s.send(m) - p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}") + s.close() cluster[1].wait_queue("adq1") - cluster[1].wait_queue("adq2") r.close() # trigger auto-delete of adq1 cluster[1].wait_no_queue("adq1") - cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"]) - cluster[1].wait_queue("adq2") + cluster[1].assert_browse_backup("altq", ["aa","bb","cc"]) def test_expired(self): """Regression test for QPID-4379: HA does not properly handle expired messages""" @@ -815,7 +835,7 @@ acl deny all all cluster = HaCluster(self, 2) s = cluster[0].connect().session().sender("q;{create:always}", capacity=2) def send_ttl_messages(): - for i in xrange(100): s.send(Message(str(i), ttl=0.001), timeout=1) + for i in xrange(100): s.send(qm.Message(str(i), ttl=0.001)) send_ttl_messages() cluster.start() send_ttl_messages() @@ -827,7 +847,9 @@ acl deny all all s = cluster[0].connect().session() s.sender("keep;{create:always}") # Leave this queue in place. for i in xrange(100): - s.sender("deleteme%s;{create:always,delete:always}"%(i)).close() + q = "deleteme%s"%(i) + cluster[0].agent.addQueue(q) + cluster[0].agent.delQueue(q) # It is possible for the backup to attempt to subscribe after the queue # is deleted. This is not an error, but is logged as an error on the primary. # The backup does not log this as an error so we only check the backup log for errors. @@ -846,58 +868,46 @@ acl deny all all cluster[1].assert_browse_backup("qq", msgs) cluster[2].assert_browse_backup("qq", msgs) # Set up an exchange with a binding. - sn.sender("xx;{create:always,node:{type:topic}}") - sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}") + a = cluster[0].agent + a.addExchange("fanout", "xx") + a.addQueue("xxq") + a.bind("xx", "xxq", "xxq") cluster[1].wait_address("xx") - self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1) + self.assertEqual(cluster[1].agent.getExchange("xx").values["bindingCount"], 1) cluster[2].wait_address("xx") - self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1) + self.assertEqual(cluster[2].agent.getExchange("xx").values["bindingCount"], 1) # Simulate the race by re-creating the objects before promoting the new primary cluster.kill(0, promote_next=False) xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}" node = "node:{%s}"%(xdecl) sn = cluster[1].connect_admin().session() - sn.sender("qq;{delete:always}").close() + a = cluster[1].agent + a.delQueue("qq", if_empty=False) s = sn.sender("qq;{create:always, %s}"%(node)) s.send("foo") - sn.sender("xx;{delete:always}").close() + a.delExchange("xx") sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl)) cluster[1].promote() cluster[1].wait_status("active") # Verify we are not still using the old objects on cluster[2] cluster[2].assert_browse_backup("qq", ["foo"]) cluster[2].wait_address("xx") - self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0) - - def test_redeclare_exchange(self): - """Ensure that re-declaring an exchange is an HA no-op""" - cluster = HaCluster(self, 2) - ps = cluster[0].connect().session() - ps.sender("ex1;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") - ps.sender("ex2;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout', alternate-exchange:'ex1'}}}") - cluster[1].wait_backup("ex1") - cluster[1].wait_backup("ex2") - - # Use old API to re-declare the exchange - old_conn = cluster[0].connect_old() - old_sess = old_conn.session(str(qpid.datatypes.uuid4())) - old_sess.exchange_declare(exchange='ex1', type='fanout') - cluster[1].wait_backup("ex1") + self.assertEqual(cluster[2].agent.getExchange("xx").values["bindingCount"], 0) def test_resource_limit_bug(self): """QPID-5666 Regression test: Incorrect resource limit exception for queue creation.""" cluster = HaCluster(self, 3) qs = ["q%s"%i for i in xrange(10)] - s = cluster[0].connect().session() - s.sender("q;{create:always}").close() + a = cluster[0].agent + a.addQueue("q") cluster.kill(0) cluster[1].promote() cluster[1].wait_status("active") - s = cluster[1].connect().session() - s.receiver("q;{delete:always}").close() - s.sender("qq;{create:always}").close() - + a = cluster[1].agent + a.delQueue("q") + a.addQueue("q") + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -948,12 +958,12 @@ class LongTests(HaBrokerTest): n = 10 senders = [ NumberedSender( - brokers[0], url=brokers.url,max_depth=50, failover_updates=False, + brokers[0], url=brokers.url,max_depth=50, queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)] receivers = [ NumberedReceiver( - brokers[0], url=brokers.url, sender=senders[i],failover_updates=False, + brokers[0], url=brokers.url, sender=senders[i], queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)] for r in receivers: r.start() @@ -1017,6 +1027,7 @@ class LongTests(HaBrokerTest): "--address", "q;{create:always}", "--messages=1000", "--tx=10" + # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet ]) receiver = self.popen( ["qpid-receive", @@ -1025,6 +1036,7 @@ class LongTests(HaBrokerTest): "--messages=990", "--timeout=10", "--tx=10" + # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet ]) self.assertEqual(sender.wait(), 0) self.assertEqual(receiver.wait(), 0) @@ -1053,7 +1065,7 @@ class LongTests(HaBrokerTest): try: self.connection.session().receiver( self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}") - except NotFound: pass # Can occur occasionally, not an error. + except qm.NotFound: pass # Can occur occasionally, not an error. try: self.connection.close() except: pass @@ -1119,7 +1131,8 @@ class RecoveryTests(HaBrokerTest): cluster[0].wait_status("active") for b in cluster[1:4]: b.wait_status("ready") # Create a queue before the failure. - s1 = cluster.connect(0).session().sender("q1;{create:always}") + # FIXME aconway 2014-02-20: SWIG client doesn't respect sync=False + s1 = cluster.connect(0, native=True).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") for i in xrange(10): s1.send(str(i), timeout=0.1) @@ -1130,13 +1143,11 @@ class RecoveryTests(HaBrokerTest): cluster[3].wait_status("recovering") def assertSyncTimeout(s): - try: - s.sync(timeout=.01) - self.fail("Expected Timeout exception") - except Timeout: pass + self.assertRaises(qpid.messaging.Timeout, s.sync, timeout=.01) # Create a queue after the failure - s2 = cluster.connect(3).session().sender("q2;{create:always}") + # FIXME aconway 2014-02-20: SWIG client doesn't respect sync=False + s2 = cluster.connect(3, native=True).session().sender("q2;{create:always}") # Verify that messages sent are not completed for i in xrange(10,20): @@ -1182,12 +1193,11 @@ class RecoveryTests(HaBrokerTest): # Should not go active till the expected backup connects or times out. cluster[2].wait_status("recovering") # Messages should be held till expected backup times out - s = cluster[2].connect().session().sender("q;{create:always}") + ss = cluster[2].connect().session() + s = ss.sender("q;{create:always}") s.send("foo", sync=False) - # Verify message held initially. - try: s.sync(timeout=.01); self.fail("Expected Timeout exception") - except Timeout: pass - s.sync(timeout=1) # And released after the timeout. + self.assertEqual(s.unsettled(), 1) # Verify message not settled immediately. + s.sync(timeout=1) # And settled after timeout. cluster[2].wait_status("active") def test_join_ready_cluster(self): @@ -1251,7 +1261,7 @@ class ConfigurationTests(HaBrokerTest): cluster[0].set_public_url("bar:1234") assert_url(r.fetch(1), "bar:1234") cluster[0].set_brokers_url(cluster.url+",xxx:1234") - self.assertRaises(Empty, r.fetch, 0) # Not updated for brokers URL + self.assertRaises(qm.Empty, r.fetch, 0) # Not updated for brokers URL class StoreTests(HaBrokerTest): """Test for HA with persistence.""" @@ -1268,18 +1278,19 @@ class StoreTests(HaBrokerTest): sn = cluster[0].connect().session() # Create queue qq, exchange exx and binding between them s = sn.sender("qq;{create:always,node:{durable:true}}") - sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:exx,key:k,queue:qq}]}}") - for m in ["foo", "bar", "baz"]: s.send(Message(m, durable=True)) + sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}}}") + cluster[0].agent.bind("exx", "qq", "k") + for m in ["foo", "bar", "baz"]: s.send(qm.Message(m, durable=True)) r = cluster[0].connect().session().receiver("qq") self.assertEqual(r.fetch().content, "foo") r.session.acknowledge() # Sending this message is a hack to flush the dequeue operation on qq. - s.send(Message("flush", durable=True)) + s.send(qm.Message("flush", durable=True)) def verify(broker, x_count): sn = broker.connect().session() assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"]) - sn.sender("exx/k").send(Message("x", durable=True)) + sn.sender("exx/k").send(qm.Message("x", durable=True)) assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"]) verify(cluster[0], 0) # Sanity check @@ -1303,10 +1314,11 @@ class StoreTests(HaBrokerTest): cluster = HaCluster(self, 2) sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") - for m in ["foo","bar"]: s1.send(Message(m, durable=True)) + for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True)) s2 = sn.sender("q2;{create:always,node:{durable:true}}") - sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}") - sk2.send(Message("hello", durable=True)) + sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}}}") + cluster[0].agent.bind("ex", "q2", "k2") + sk2.send(qm.Message("hello", durable=True)) # Wait for backup to catch up. cluster[1].assert_browse_backup("q1", ["foo","bar"]) cluster[1].assert_browse_backup("q2", ["hello"]) @@ -1315,11 +1327,9 @@ class StoreTests(HaBrokerTest): r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1") for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m) r1.session.acknowledge() - for m in ["x","y","z"]: s1.send(Message(m, durable=True)) - # Use old connection to unbind - us = cluster[0].connect_old().session(str(uuid4())) - us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2") - us.exchange_bind(exchange="ex", binding_key="k1", queue="q1") + for m in ["x","y","z"]: s1.send(qm.Message(m, durable=True)) + cluster[0].agent.unbind("ex", "q2", "k2") + cluster[0].agent.bind("ex", "q1", "k1") # Restart both brokers from store to get inconsistent sequence numbering. cluster.bounce(0, promote_next=False) cluster[0].promote() @@ -1350,11 +1360,11 @@ class TransactionTests(HaBrokerTest): def tx_simple_setup(self, broker): """Start a transaction, remove messages from queue a, add messages to queue b""" - c = broker.connect() + c = broker.connect(protocol=self.tx_protocol) # Send messages to a, no transaction. sa = c.session().sender("a;{create:always,node:{durable:true}}") tx_msgs = ["x","y","z"] - for m in tx_msgs: sa.send(Message(content=m, durable=True)) + for m in tx_msgs: sa.send(qm.Message(content=m, durable=True)) # Receive messages from a, in transaction. tx = c.session(transactional=True) @@ -1373,14 +1383,14 @@ class TransactionTests(HaBrokerTest): def tx_subscriptions(self, broker): """Return list of queue names for tx subscriptions""" - return [q for q in broker.agent().repsub_queues() + return [q for q in broker.agent.repsub_queues() if q.startswith("qpid.ha-tx")] def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() - tx_queues = cluster[0].agent().tx_queues() + tx_queues = cluster[0].agent.tx_queues() # NOTE: backup does not process transactional dequeues until prepare cluster[1].assert_browse_backup("a", ["x","y","z"]) @@ -1400,7 +1410,7 @@ class TransactionTests(HaBrokerTest): def __init__(self, f): self.f, self.value = f, None def __call__(self): self.value = self.f(); return self.value - txq= FunctionCache(b.agent().tx_queues) + txq= FunctionCache(b.agent.tx_queues) assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) txsub = FunctionCache(lambda: self.tx_subscriptions(b)) assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) @@ -1426,7 +1436,7 @@ class TransactionTests(HaBrokerTest): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() - tx_queues = cluster[0].agent().tx_queues() + tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() tx.rollback() tx.close() # For clean test. @@ -1447,7 +1457,7 @@ class TransactionTests(HaBrokerTest): cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() - tx_queues = cluster[0].agent().tx_queues() + tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() cluster.bounce(0) # Should cause roll-back cluster[0].wait_status("ready") # Restarted. @@ -1464,7 +1474,7 @@ class TransactionTests(HaBrokerTest): tx.acknowledge() tx.commit() tx.sync() - tx_queues = cluster[0].agent().tx_queues() + tx_queues = cluster[0].agent.tx_queues() tx.close() self.assert_simple_commit_outcome(cluster[0], tx_queues) @@ -1472,7 +1482,7 @@ class TransactionTests(HaBrokerTest): cluster = HaCluster(self, 1, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() - tx_queues = cluster[0].agent().tx_queues() + tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() tx.rollback() tx.sync() @@ -1480,16 +1490,16 @@ class TransactionTests(HaBrokerTest): self.assert_simple_rollback_outcome(cluster[0], tx_queues) def assert_commit_raises(self, tx): - def commit_sync(): tx.commit(timeout=1); tx.sync(timeout=1) - self.assertRaises(ServerError, commit_sync) + def commit_sync(): tx.commit(); tx.sync() + self.assertRaises(Exception, commit_sync) def test_tx_backup_fail(self): cluster = HaCluster( self, 2, test_store=True, s_args=[[],["--test-store-throw=bang"]]) - c = cluster[0].connect() + c = cluster[0].connect(protocol=self.tx_protocol) tx = c.session(transactional=True) s = tx.sender("q;{create:always,node:{durable:true}}") - for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) + for m in ["foo","bang","bar"]: s.send(qm.Message(m, durable=True)) self.assert_commit_raises(tx) for b in cluster: b.assert_browse_backup("q", []) self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") @@ -1502,10 +1512,10 @@ class TransactionTests(HaBrokerTest): cluster = HaCluster(self, 3) # Leaving - tx = cluster[0].connect().session(transactional=True) + tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) s = tx.sender("q;{create:always}") s.send("a", sync=True) - self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) + self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") tx.commit() @@ -1514,7 +1524,7 @@ class TransactionTests(HaBrokerTest): self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) # Joining - tx = cluster[0].connect().session(transactional=True) + tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") cluster.restart(1) # Not a part of the current transaction. @@ -1523,18 +1533,17 @@ class TransactionTests(HaBrokerTest): for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) - # FIXME aconway 2013-11-07: assert_log_clean def test_tx_block_threads(self): """Verify that TXs blocked in commit don't deadlock.""" cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True) n = 10 # Number of concurrent transactions - sessions = [cluster[0].connect().session(transactional=True) for i in xrange(n)] + sessions = [cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)] # Have the store delay the response for 10s for s in sessions: sn = s.sender("qq;{create:always,node:{durable:true}}") - sn.send(Message("foo", durable=True)) - self.assertEqual(n, len(cluster[1].agent().tx_queues())) + sn.send(qm.Message("foo", durable=True)) + self.assertEqual(n, len(cluster[1].agent.tx_queues())) threads = [ Thread(target=s.commit) for s in sessions] for t in threads: t.start() cluster[0].ready(timeout=1) # Check for deadlock diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index c7269ac5e9..bb82b1c956 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -25,7 +25,7 @@ from brokertest import * from ha_test import HaPort from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO -from qpidtoollibs import BrokerAgent, BrokerObject +from qpidtoollibs import BrokerObject class Domain(BrokerObject): def __init__(self, broker, values): @@ -49,7 +49,7 @@ class AmqpBrokerTest(BrokerTest): self.port_holder = HaPort(self) self.broker = self.amqp_broker(port_holder=self.port_holder) self.default_config = Config(self.broker) - self.agent = BrokerAgent(self.broker.connect()) + self.agent = self.broker.agent def sender(self, config, reply_to=None): cmd = ["qpid-send", @@ -224,7 +224,7 @@ class AmqpBrokerTest(BrokerTest): def incoming_link(self, mechanism): brokerB = self.amqp_broker() - agentB = BrokerAgent(brokerB.connect()) + agentB = brokerB.agent self.agent.create("queue", "q") agentB.create("queue", "q") self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":mechanism}) @@ -240,7 +240,7 @@ class AmqpBrokerTest(BrokerTest): def test_outgoing_link(self): brokerB = self.amqp_broker() - agentB = BrokerAgent(brokerB.connect()) + agentB = brokerB.agent self.agent.create("queue", "q") agentB.create("queue", "q") self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"}) @@ -250,7 +250,7 @@ class AmqpBrokerTest(BrokerTest): def test_relay(self): brokerB = self.amqp_broker() - agentB = BrokerAgent(brokerB.connect()) + agentB = brokerB.agent agentB.create("queue", "q") self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"}) #send to q on broker B through brokerA diff --git a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py index 3c62740a62..37c12601be 100644 --- a/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py +++ b/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py @@ -17,15 +17,20 @@ # under the License. # +import os + from brokertest import EXPECT_EXIT_OK from store_test import StoreTest, Qmf, store_args from qpid.messaging import * +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # FIXME aconway 2014-04-04: Tests fail with SWIG client. + class ExchangeQueueTests(StoreTest): """ Simple tests of the broker exchange and queue types """ - + def test_direct_exchange(self): """Test Direct exchange.""" broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK) @@ -34,11 +39,11 @@ class ExchangeQueueTests(StoreTest): broker.send_message("a", msg1) broker.send_message("b", msg2) broker.terminate() - + broker = self.broker(store_args(), name="test_direct_exchange") self.check_message(broker, "a", msg1, True) self.check_message(broker, "b", msg2, True) - + def test_topic_exchange(self): """Test Topic exchange.""" broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK) @@ -56,17 +61,17 @@ class ExchangeQueueTests(StoreTest): msg2 = Message("Message2", durable=True, correlation_id="Msg0004") snd2.send(msg2) broker.terminate() - + broker = self.broker(store_args(), name="test_topic_exchange") self.check_message(broker, "a", msg1, True) self.check_message(broker, "b", msg1, True) self.check_messages(broker, "c", [msg1, msg2], True) self.check_message(broker, "d", msg2, True) self.check_message(broker, "e", msg2, True) - - + + def test_legacy_lvq(self): - """Test legacy LVQ.""" + """Test legacy LVQ.""" broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"}) ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"}) @@ -77,7 +82,7 @@ class ExchangeQueueTests(StoreTest): broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1], xprops="arguments:{\"qpid.last_value_queue\":True}") broker.terminate() - + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False) # Add more messages while subscriber is active (no replacement): @@ -89,11 +94,11 @@ class ExchangeQueueTests(StoreTest): broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn) ssn.acknowledge() broker.terminate() - + broker = self.broker(store_args(), name="test_lvq") self.check_messages(broker, "lvq-test", [ma4, mc4], True) - - + + def test_fanout_exchange(self): """Test Fanout Exchange""" broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK) @@ -107,7 +112,7 @@ class ExchangeQueueTests(StoreTest): msg2 = Message("Msg2", durable=True, correlation_id="Msg0002") snd.send(msg2) broker.terminate() - + broker = self.broker(store_args(), name="test_fanout_exchange") self.check_messages(broker, "q1", [msg1, msg2], True) self.check_messages(broker, "q2", [msg1, msg2], True) @@ -124,18 +129,18 @@ class ExchangeQueueTests(StoreTest): m2 = rcv.fetch() ssn.acknowledge(message=m2, disposition=Disposition(REJECTED)) broker.terminate() - + broker = self.broker(store_args(), name="test_message_reject") qmf = Qmf(broker) assert qmf.queue_message_count("tmr") == 0 - + def test_route(self): """ Test the recovery of a route (link and bridge objects.""" broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK) qmf = Qmf(broker) qmf_broker_obj = qmf.get_objects("broker")[0] - + # create a "link" link_args = {"host":"a.fake.host.com", "port":9999, "durable":True, "authMechanism":"PLAIN", "username":"guest", "password":"guest", @@ -143,16 +148,16 @@ class ExchangeQueueTests(StoreTest): result = qmf_broker_obj.create("link", "test-link", link_args, False) self.assertEqual(result.status, 0, result) link = qmf.get_objects("link")[0] - + # create bridge bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout", "key":"my-key", "durable":True} result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False); self.assertEqual(result.status, 0, result) bridge = qmf.get_objects("bridge")[0] - + broker.terminate() - + # recover the link and bridge broker = self.broker(store_args(), name="test_route") qmf = Qmf(broker) @@ -189,7 +194,7 @@ class AlternateExchangePropertyTests(StoreTest): self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"), "Alternate exchange property not found or is incorrect on exchange \"testExch\".") qmf.close() - + def test_queue(self): """Queue alternate exchange property persistexchangeNamece test""" broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK) @@ -226,7 +231,7 @@ class RedeliveredTests(StoreTest): msg = Message(msg_content, durable=True) broker.send_message("testQueue", msg) broker.terminate() - + broker = self.broker(store_args(), name="test_broker_recovery") rcv_msg = broker.get_message("testQueue") self.assertEqual(msg_content, rcv_msg.content) diff --git a/qpid/cpp/src/tests/legacystore/python_tests/resize.py b/qpid/cpp/src/tests/legacystore/python_tests/resize.py index 469e0f6730..e719b755da 100644 --- a/qpid/cpp/src/tests/legacystore/python_tests/resize.py +++ b/qpid/cpp/src/tests/legacystore/python_tests/resize.py @@ -26,8 +26,11 @@ from qpid.datatypes import uuid4 from store_test import StoreTest, store_args from qpid.messaging import Message +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + class ResizeTest(StoreTest): - + resize_tool = os.getenv("QPID_STORE_RESIZE_TOOL", "qpid-store-resize") print resize_tool def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail): @@ -47,21 +50,21 @@ class ResizeTest(StoreTest): finally: p.stdout.close() return res - + def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8, init_file_size = 24, exp_fail = False, wait_time = None): # Using a sender will force the creation of an empty persistent queue which is needed for some tests broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time) ssn = broker.connect().session() snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name) - + msgs = [] for index in range(0, num_msgs): msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index) msgs.append(msg) snd.send(msg) broker.terminate() - + res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files, resize_file_size, exp_fail) if res != 0: @@ -70,95 +73,95 @@ class ResizeTest(StoreTest): self.fail("ERROR: Resize operation failed with return code %d" % res) elif exp_fail: self.fail("ERROR: Resize operation succeeded, but a failure was expected") - + broker = self.broker(store_args(), name="broker") self.check_messages(broker, queue_name, msgs, True) - - # TODO: Check the physical files to check number and size are as expected. + + # TODO: Check the physical files to check number and size are as expected. class SimpleTest(ResizeTest): """ Simple tests of the resize utility for resizing a journal to larger and smaller sizes. """ - + def test_empty_store_same(self): self._resize_test(queue_name = "empty_store_same", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 8, resize_file_size = 24) - + def test_empty_store_up(self): self._resize_test(queue_name = "empty_store_up", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 16, resize_file_size = 48) - + def test_empty_store_down(self): self._resize_test(queue_name = "empty_store_down", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 6, resize_file_size = 12) - -# TODO: Put into long tests, make sure there is > 128GB free disk space + +# TODO: Put into long tests, make sure there is > 128GB free disk space # def test_empty_store_max(self): # self._resize_test(queue_name = "empty_store_max", # num_msgs = 0, msg_size = 0, # init_num_files = 8, init_file_size = 24, # resize_num_files = 64, resize_file_size = 32768, # wait_time = 120) - + def test_empty_store_min(self): self._resize_test(queue_name = "empty_store_min", num_msgs = 0, msg_size = 0, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 1) - + def test_basic_up(self): self._resize_test(queue_name = "basic_up", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 16, resize_file_size = 48) - + def test_basic_down(self): self._resize_test(queue_name = "basic_down", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 15) - + def test_basic_low(self): self._resize_test(queue_name = "basic_low", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 4, exp_fail = True) - + def test_basic_under(self): self._resize_test(queue_name = "basic_under", num_msgs = 100, msg_size = 10000, init_num_files = 8, init_file_size = 24, resize_num_files = 4, resize_file_size = 3, exp_fail = True) - + def test_very_large_msg_up(self): self._resize_test(queue_name = "very_large_msg_up", num_msgs = 4, msg_size = 2000000, init_num_files = 8, init_file_size = 24, resize_num_files = 16, resize_file_size = 48) - + def test_very_large_msg_down(self): self._resize_test(queue_name = "very_large_msg_down", num_msgs = 4, msg_size = 2000000, init_num_files = 16, init_file_size = 64, resize_num_files = 16, resize_file_size = 48) - + def test_very_large_msg_low(self): self._resize_test(queue_name = "very_large_msg_low", num_msgs = 4, msg_size = 2000000, init_num_files = 8, init_file_size = 24, resize_num_files = 7, resize_file_size = 20, exp_fail = True) - + def test_very_large_msg_under(self): self._resize_test(queue_name = "very_large_msg_under", num_msgs = 4, msg_size = 2000000, diff --git a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py index 2fcab4e38e..cc846aefd4 100644 --- a/qpid/cpp/src/tests/legacystore/python_tests/store_test.py +++ b/qpid/cpp/src/tests/legacystore/python_tests/store_test.py @@ -22,10 +22,13 @@ from brokertest import BrokerTest from qpid.messaging import Empty from qmf.console import Session - +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. + + def store_args(store_dir = None): """Return the broker args necessary to load the async store""" - assert BrokerTest.store_lib + assert BrokerTest.store_lib if store_dir == None: return [] return ["--store-dir", store_dir] @@ -51,7 +54,7 @@ class Qmf: else: amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable, arguments=arguments) - + def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None): """Add a new queue""" amqp_session = self.__broker.getAmqpSession() @@ -62,7 +65,7 @@ class Qmf: durable=durable, arguments=arguments) else: amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments) - + def delete_queue(self, queue_name): """Delete an existing queue""" amqp_session = self.__broker.getAmqpSession() @@ -84,24 +87,24 @@ class Qmf: return found except Exception: return False - + def query_exchange(self, exchange_name, alt_exchange_name=None): """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known value.""" return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name) - + def query_queue(self, queue_name, alt_exchange_name=None): """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known value.""" return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name) - + def queue_message_count(self, queue_name): """Query the number of messages on a queue""" queue_list = self.__session.getObjects(_class="queue", _name=queue_name) if len(queue_list): return queue_list[0].msgDepth - + def queue_empty(self, queue_name): """Check if a queue is empty (has no messages waiting)""" return self.queue_message_count(queue_name) == 0 @@ -109,7 +112,7 @@ class Qmf: def get_objects(self, target_class, target_package="org.apache.qpid.broker"): return self.__session.getObjects(_class=target_class, _package=target_package) - + def close(self): self.__session.delBroker(self.__broker) self.__session = None @@ -119,7 +122,7 @@ class StoreTest(BrokerTest): """ This subclass of BrokerTest adds some convenience test/check functions """ - + def _chk_empty(self, queue, receiver): """Check if a queue is empty (has no more messages)""" try: @@ -141,9 +144,9 @@ class StoreTest(BrokerTest): else: buff += chr(ord('a') + (index % 26)) return buff + msg - + # Functions for formatting address strings - + @staticmethod def _fmt_csv(string_list, list_braces = None): """Format a list using comma-separation. Braces are optionally added.""" @@ -163,16 +166,16 @@ class StoreTest(BrokerTest): if list_braces != None: str_ += list_braces[1] return str_ - + def _fmt_map(self, string_list): """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map element('key:val').""" - return self._fmt_csv(string_list, list_braces="{}") - + return self._fmt_csv(string_list, list_braces="{}") + def _fmt_list(self, string_list): """Format a list [l1, l2, l3, ...] from a string list.""" - return self._fmt_csv(string_list, list_braces="[]") - + return self._fmt_csv(string_list, list_braces="[]") + def addr_fmt(self, node_name, **kwargs): """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address string.""" @@ -190,12 +193,12 @@ class StoreTest(BrokerTest): x_declare_list = kwargs.get("x_declare_list", []) x_bindings_list = kwargs.get("x_bindings_list", []) x_subscribe_list = kwargs.get("x_subscribe_list", []) - + node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0) link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or len(x_bindings_list) > 0 or len(x_subscribe_list) > 0) assert not (node_flag and link_flag) - + opt_str_list = [] if create_policy != None: opt_str_list.append("create: %s" % create_policy) @@ -231,7 +234,7 @@ class StoreTest(BrokerTest): if len(opt_str_list) > 0: addr_str += "; %s" % self._fmt_map(opt_str_list) return addr_str - + def snd_addr(self, node_name, **kwargs): """ Create a send (node) address""" # Get keyword args @@ -245,7 +248,7 @@ class StoreTest(BrokerTest): ftd_size = kwargs.get("ftd_size") policy = kwargs.get("policy", "flow-to-disk") exchage_type = kwargs.get("exchage_type") - + create_policy = None if auto_create: create_policy = "always" @@ -265,10 +268,10 @@ class StoreTest(BrokerTest): x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy)) if exchage_type != None: x_declare_list.append("type: %s" % exchage_type) - + return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy, node_type=node_type, durable=durable, x_declare_list=x_declare_list) - + def rcv_addr(self, node_name, **kwargs): """ Create a receive (link) address""" # Get keyword args @@ -282,7 +285,7 @@ class StoreTest(BrokerTest): ftd_count = kwargs.get("ftd_count") ftd_size = kwargs.get("ftd_size") policy = kwargs.get("policy", "flow-to-disk") - + create_policy = None if auto_create: create_policy = "always" @@ -291,7 +294,7 @@ class StoreTest(BrokerTest): delete_policy = "always" mode = None if browse: - mode = "browse" + mode = "browse" x_declare_list = ["\"exclusive\": %s" % exclusive] if ftd_count != None or ftd_size != None: queue_policy = ["\'qpid.policy_type\': %s" % policy] @@ -308,11 +311,11 @@ class StoreTest(BrokerTest): return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True, link_name=link_name, durable=durable, x_declare_list=x_declare_list, x_bindings_list=x_bindings_list, link_reliability=reliability) - + def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False): """Check that a message is on a queue by dequeuing it and comparing it to the expected message""" return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse) - + def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False, emtpy_flag=False): """Check that messages is on a queue by dequeuing them and comparing them to the expected messages""" @@ -341,8 +344,8 @@ class StoreTest(BrokerTest): if transactional: ssn.commit() return ssn - - + + # Functions for finding strings in the broker log file (or other files) @staticmethod @@ -353,7 +356,7 @@ class StoreTest(BrokerTest): return file_handle.read() finally: file_handle.close() - + def _get_hits(self, broker, search): """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple queues)""" @@ -361,9 +364,9 @@ class StoreTest(BrokerTest): hits = [] for hit in search.findall(self._read_file(broker.log)): if hit not in hits: - hits.append(hit) + hits.append(hit) return hits - + def _reconsile_hits(self, broker, ftd_msgs, release_hits): """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining release_hits.""" @@ -382,35 +385,33 @@ class StoreTest(BrokerTest): for hit in release_hits: err += " %s\n" % hit self.assert_(False, err) - + def check_msg_release(self, broker, ftd_msgs): """ Check for 'Content released' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content released$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_release_on_commit(self, broker, ftd_msgs): """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content released on commit$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_release_on_recover(self, broker, ftd_msgs): """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content released after recovery$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_block(self, broker, ftd_msgs): """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content release blocked$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - + def check_msg_block_on_commit(self, broker, ftd_msgs): """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs""" hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: " "Content release blocked on commit$", re.MULTILINE)) self._reconsile_hits(broker, ftd_msgs, hits) - - diff --git a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py index 55497ccc03..2b45cb6eea 100755 --- a/qpid/cpp/src/tests/qpidd_qmfv2_tests.py +++ b/qpid/cpp/src/tests/qpidd_qmfv2_tests.py @@ -30,6 +30,8 @@ from qpid.messaging import Message try: import qmf.console except: print "Cannot import module qmf.console, skipping tests"; exit(0); +import qpid.messaging, brokertest +brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client. class ConsoleTest(BrokerTest): """ diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index df410ec188..678f7aa223 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -50,11 +50,11 @@ export QPID_TESTS_PY=$QPID_TESTS/src/py export QPID_TOOLS=$top_srcdir/../tools export QMF_LIB=$top_srcdir/../extras/qmf/src/py export PYTHON_COMMANDS=$QPID_TOOLS/src/py -export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$PYTHONPATH +export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir +export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$PYTHONPATH_SWIG:$PYTHONPATH export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha -export PYTHONPATH_SWIG=$pythonswigdir:$pythonswiglibdir export PYTHONSWIGMODULE=$pythonswigdir/qpid_messaging.py # Executables export QPIDD_EXEC=$top_builddir/src/qpidd diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py index e7be98c486..f6243bbba5 100644 --- a/qpid/tools/src/py/qpidtoollibs/broker.py +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -17,7 +17,7 @@ # under the License. # -from qpid.messaging import Message +import sys from qpidtoollibs.disp import TimeLong try: from uuid import uuid4 @@ -26,13 +26,16 @@ except ImportError: class BrokerAgent(object): """ - Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection. + Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection + or qpid_messaging.Connection """ def __init__(self, conn): + # Use the Message class from the same module as conn which could be qpid.messaging + # or qpid_messaging + self.message_class = sys.modules[conn.__class__.__module__].Message self.conn = conn self.sess = self.conn.session() - self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ - str(uuid4()) + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4()) self.reply_rx = self.sess.receiver(self.reply_to) self.reply_rx.capacity = 10 self.tx = self.sess.sender("qmf.default.direct/broker") @@ -55,8 +58,9 @@ class BrokerAgent(object): '_method_name' : method, '_arguments' : arguments} - message = Message(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") + message = self.message_class( + content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") self.tx.send(message) response = self.reply_rx.fetch(timeout) self.sess.acknowledge() @@ -72,8 +76,9 @@ class BrokerAgent(object): 'x-amqp-0-10.app-id' : 'qmf2'} correlator = str(self.next_correlator) self.next_correlator += 1 - message = Message(content, reply_to=self.reply_to, correlation_id=correlator, - properties=props, subject="broker") + message = self.message_class( + content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") self.tx.send(message) return correlator @@ -259,7 +264,7 @@ class BrokerAgent(object): 'options': options} self._method('delete', args) - def bind(self, exchange, queue, key, options={}, **kwargs): + def bind(self, exchange, queue, key="", options={}, **kwargs): properties = options for k,v in kwargs.items(): properties[k] = v |