diff options
author | Alan Conway <aconway@apache.org> | 2011-03-15 14:34:02 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-03-15 14:34:02 +0000 |
commit | b51071c7876f42a6ddc44beddfb9593b915ae1b0 (patch) | |
tree | 9aba7efa175d5d41a846c34205ac297598156402 | |
parent | e1db85e48dc991e6ed96d307b7db8e93b5eec697 (diff) | |
download | qpid-python-b51071c7876f42a6ddc44beddfb9593b915ae1b0.tar.gz |
Merge branch 'trunk' into qpid-2920, trunk at r1081631
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1081801 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 9 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/BufferRef.h | 70 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/Modules.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/RefCountedBuffer.cpp | 22 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/RefCountedBuffer.h | 61 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.cpp | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/TopicExchange.h | 32 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Event.h | 12 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 44 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 68 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-receive.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-send.cpp | 2 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 1 | ||||
-rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java | 599 |
15 files changed, 528 insertions, 436 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index f27f69d182..d7be7121c1 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -176,7 +176,7 @@ nobase_include_HEADERS += \ ../include/qpid/sys/posix/Time.h \ ../include/qpid/sys/posix/check.h -if HAVE_EPOLL +if HAVE_EPOLL poller = qpid/sys/epoll/EpollPoller.cpp endif @@ -195,7 +195,7 @@ libqpidcommon_la_SOURCES += $(poller) $(systeminfo) posix_broker_src = \ qpid/broker/posix/BrokerDefaults.cpp -lib_LTLIBRARIES = libqpidtypes.la libqpidcommon.la libqpidbroker.la libqpidclient.la libqpidmessaging.la +lib_LTLIBRARIES = libqpidtypes.la libqpidcommon.la libqpidbroker.la libqpidclient.la libqpidmessaging.la # Definitions for client and daemon plugins PLUGINLDFLAGS=-no-undefined -module -avoid-version @@ -203,7 +203,7 @@ confdir=$(sysconfdir)/qpid dmoduledir=$(libdir)/qpid/daemon cmoduledir=$(libdir)/qpid/client dmodule_LTLIBRARIES = -cmodule_LTLIBRARIES = +cmodule_LTLIBRARIES = include cluster.mk include acl.mk @@ -341,6 +341,7 @@ libqpidcommon_la_SOURCES += \ qpid/RefCounted.h \ qpid/RefCountedBuffer.cpp \ qpid/RefCountedBuffer.h \ + qpid/BufferRef.h \ qpid/Sasl.h \ qpid/SaslFactory.cpp \ qpid/SaslFactory.h \ @@ -894,6 +895,6 @@ dist-hook: $(BUILT_SOURCES) install-data-local: $(mkinstalldirs) $(DESTDIR)/$(localstatedir)/lib/qpidd -# Support for pkg-config +# Support for pkg-config pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = qpid.pc diff --git a/qpid/cpp/src/qpid/BufferRef.h b/qpid/cpp/src/qpid/BufferRef.h new file mode 100644 index 0000000000..bfe1f9ebaa --- /dev/null +++ b/qpid/cpp/src/qpid/BufferRef.h @@ -0,0 +1,70 @@ +#ifndef QPID_BUFFERREF_H +#define QPID_BUFFERREF_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include <boost/intrusive_ptr.hpp> + +namespace qpid { + +/** Template for mutable or const buffer references */ +template <class T> class BufferRefT { + public: + BufferRefT() : begin_(0), end_(0) {} + + BufferRefT(boost::intrusive_ptr<RefCounted> c, T* begin, T* end) : + counter(c), begin_(begin), end_(end) {} + + template <class U> BufferRefT(const BufferRefT<U>& other) : + counter(other.counter), begin_(other.begin_), end_(other.end_) {} + + T* begin() const { return begin_; } + T* end() const { return end_; } + + /** Return a sub-buffer of the current buffer */ + BufferRefT sub_buffer(T* begin, T* end) { + assert(begin_ <= begin && begin <= end_); + assert(begin_ <= end && end <= end_); + assert(begin <= end); + return BufferRefT(counter, begin, end); + } + + private: + boost::intrusive_ptr<RefCounted> counter; + T* begin_; + T* end_; +}; + +/** + * Reference to a mutable ref-counted buffer. + */ +typedef BufferRefT<char> BufferRef; + +/** + * Reference to a const ref-counted buffer. + */ +typedef BufferRefT<const char> ConstBufferRef; + +} // namespace qpid + +#endif /*!QPID_BUFFERREF_H*/ diff --git a/qpid/cpp/src/qpid/Modules.cpp b/qpid/cpp/src/qpid/Modules.cpp index 8f58df6ed1..727e05d212 100644 --- a/qpid/cpp/src/qpid/Modules.cpp +++ b/qpid/cpp/src/qpid/Modules.cpp @@ -64,7 +64,6 @@ void tryShlib(const char* libname_, bool noThrow) { if (!isShlibName(libname)) libname += suffix(); try { sys::Shlib shlib(libname); - QPID_LOG (info, "Loaded Module: " << libname); } catch (const std::exception& /*e*/) { if (!noThrow) @@ -82,7 +81,7 @@ void loadModuleDir (std::string dirname, bool isDefault) return; throw Exception ("Directory not found: " + dirname); } - if (!fs::is_directory(dirPath)) + if (!fs::is_directory(dirPath)) { throw Exception ("Invalid value for module-dir: " + dirname + " is not a directory"); } diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.cpp b/qpid/cpp/src/qpid/RefCountedBuffer.cpp index 9b8f1ebd5e..40d620f7ad 100644 --- a/qpid/cpp/src/qpid/RefCountedBuffer.cpp +++ b/qpid/cpp/src/qpid/RefCountedBuffer.cpp @@ -24,30 +24,20 @@ namespace qpid { -RefCountedBuffer::RefCountedBuffer() : count(0) {} - -void RefCountedBuffer::destroy() const { +void RefCountedBuffer::released() const { this->~RefCountedBuffer(); ::delete[] reinterpret_cast<const char*>(this); } -char* RefCountedBuffer::addr() const { - return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer)); -} - -RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) { +BufferRef RefCountedBuffer::create(size_t n) { char* store=::new char[n+sizeof(RefCountedBuffer)]; new(store) RefCountedBuffer; - return pointer(reinterpret_cast<RefCountedBuffer*>(store)); + char* start = store+sizeof(RefCountedBuffer); + return BufferRef( + boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)), + start, start+n); } -RefCountedBuffer::pointer::pointer() {} -RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {} -RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {} -RefCountedBuffer::pointer::~pointer() {} -RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; } - -char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; } } // namespace qpid diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.h b/qpid/cpp/src/qpid/RefCountedBuffer.h index 75a23862be..f0ea86130b 100644 --- a/qpid/cpp/src/qpid/RefCountedBuffer.h +++ b/qpid/cpp/src/qpid/RefCountedBuffer.h @@ -22,68 +22,23 @@ * */ -#include <boost/utility.hpp> -#include <boost/detail/atomic_count.hpp> -#include <boost/intrusive_ptr.hpp> +#include <qpid/RefCounted.h> +#include <qpid/BufferRef.h> namespace qpid { /** - * Reference-counted byte buffer. - * No alignment guarantees. + * Reference-counted byte buffer. No alignment guarantees. */ -class RefCountedBuffer : boost::noncopyable { - mutable boost::detail::atomic_count count; - RefCountedBuffer(); - void destroy() const; - char* addr() const; - -public: - /** Smart char pointer to a reference counted buffer */ - class pointer { - boost::intrusive_ptr<RefCountedBuffer> p; - char* cp() const; - pointer(RefCountedBuffer* x); - friend class RefCountedBuffer; - - public: - pointer(); - pointer(const pointer&); - ~pointer(); - pointer& operator=(const pointer&); - - char* get() { return cp(); } - operator char*() { return cp(); } - char& operator*() { return *cp(); } - char& operator[](size_t i) { return cp()[i]; } - - const char* get() const { return cp(); } - operator const char*() const { return cp(); } - const char& operator*() const { return *cp(); } - const char& operator[](size_t i) const { return cp()[i]; } - }; - +class RefCountedBuffer : public RefCounted { + public: /** Create a reference counted buffer of size n */ - static pointer create(size_t n); - - /** Get a pointer to the start of the buffer. */ - char* get() { return addr(); } - const char* get() const { return addr(); } - char& operator[](size_t i) { return get()[i]; } - const char& operator[](size_t i) const { return get()[i]; } + static BufferRef create(size_t n); - void addRef() const { ++count; } - void release() const { if (--count==0) destroy(); } - long refCount() { return count; } + protected: + void released() const; }; } // namespace qpid -// intrusive_ptr support. -namespace boost { -inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); } -inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); } -} - - #endif /*!QPID_REFCOUNTEDBUFFER_H*/ diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index f3baf00d1f..3b295d1f86 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -38,7 +38,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; // - excessive string copying: should be 0 copy, match from original buffer. // - match/lookup: use descision tree or other more efficient structure. -namespace +namespace { const std::string STAR("*"); const std::string HASH("#"); @@ -110,7 +110,7 @@ public: // Iterate over a string of '.'-separated tokens. struct TopicExchange::TokenIterator { typedef pair<const char*,const char*> Token; - + TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {} TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {} @@ -221,7 +221,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable, bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args) { - ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. + ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); string fedTags(args ? args->getAsString(qpidFedTags) : ""); string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); @@ -282,6 +282,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } } + cc.clearCache(); // clear the cache before we IVE route. routeIVE(); if (propagate) propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); @@ -289,7 +290,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){ - ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. + ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. RWlock::ScopedWlock l(lock); string routingKey = normalize(constRoutingKey); BindingKey* bk = bindingTree.getBindingKey(routingKey); @@ -336,23 +337,24 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel { // Note: PERFORMANCE CRITICAL!!! BindingList b; - std::map<std::string, BindingList>::iterator it; - { // only lock the cache for read + std::map<std::string, BindingList>::iterator it; + { // only lock the cache for read RWlock::ScopedRlock cl(cacheLock); - it = bindingCache.find(routingKey); - } + it = bindingCache.find(routingKey); + if (it != bindingCache.end()) { + b = it->second; + } + } PreRoute pr(msg, this); - if (it == bindingCache.end()) // no cache hit + if (!b.get()) // no cache hit { RWlock::ScopedRlock l(lock); b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); BindingsFinderIter bindingsFinder(b); bindingTree.iterateMatch(routingKey, bindingsFinder); - RWlock::ScopedWlock cwl(cacheLock); - bindingCache[routingKey] = b; // update cache - }else { - b = it->second; - } + RWlock::ScopedWlock cwl(cacheLock); + bindingCache[routingKey] = b; // update cache + } doRoute(msg, b); } diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index 3d2b3a95a9..e751a2b7c7 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -136,18 +136,26 @@ class TopicExchange : public virtual Exchange { unsigned long nBindings; qpid::sys::RWlock lock; // protects bindingTree and nBindings qpid::sys::RWlock cacheLock; // protects cache - std::map<std::string, BindingList> bindingCache; // cache of matched routes. - class ClearCache { - private: - qpid::sys::RWlock* cacheLock; - std::map<std::string, BindingList>* bindingCache; - public: - ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),bindingCache(bc) {}; - ~ClearCache(){ - qpid::sys::RWlock::ScopedWlock l(*cacheLock); - bindingCache->clear(); - }; - }; + std::map<std::string, BindingList> bindingCache; // cache of matched routes. + class ClearCache { + private: + qpid::sys::RWlock* cacheLock; + std::map<std::string, BindingList>* bindingCache; + bool cleared; + public: + ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l), + bindingCache(bc),cleared(false) {}; + void clearCache() { + qpid::sys::RWlock::ScopedWlock l(*cacheLock); + if (!cleared) { + bindingCache->clear(); + cleared =true; + } + }; + ~ClearCache(){ + clearCache(); + }; + }; bool isBound(Queue::shared_ptr queue, const std::string& pattern); class ReOriginIter; diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp index cd775ce2f1..da2bc89d8c 100644 --- a/qpid/cpp/src/qpid/cluster/Event.cpp +++ b/qpid/cpp/src/qpid/cluster/Event.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,6 +23,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/RefCountedBuffer.h" #include "qpid/assert.h" #include <ostream> #include <iterator> diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index c2dca073d1..13283edff7 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -23,7 +23,7 @@ */ #include "qpid/cluster/types.h" -#include "qpid/RefCountedBuffer.h" +#include "qpid/BufferRef.h" #include "qpid/framing/AMQFrame.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -88,12 +88,12 @@ class Event : public EventHeader { static Event control(const framing::AMQFrame&, const ConnectionId&); // Data excluding header. - char* getData() { return store + HEADER_SIZE; } - const char* getData() const { return store + HEADER_SIZE; } + char* getData() { return store.begin() + HEADER_SIZE; } + const char* getData() const { return store.begin() + HEADER_SIZE; } // Store including header - char* getStore() { return store; } - const char* getStore() const { return store; } + char* getStore() { return store.begin(); } + const char* getStore() const { return store.begin(); } const framing::AMQFrame& getFrame() const; @@ -104,7 +104,7 @@ class Event : public EventHeader { private: void encodeHeader() const; - RefCountedBuffer::pointer store; + BufferRef store; mutable framing::AMQFrame frame; }; diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 4408e63866..5f0e020475 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,21 +19,37 @@ # # Benchmark script for comparing cluster performance. -#PORT=":5555" -BROKER=`echo $HOSTS | awk '{print $1}'` # Single broker -BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list -COUNT=100000 -RATE=20000 # Rate to throttle senders for latency results -run_test() { echo $*; "$@"; echo; echo; echo; } -# Thruput, unshared queue -run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT +# Default values +PORT="5672" +BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list +COUNT=10000 +FLOW=100 # Flow control limit on queue depth for latency. +REPEAT=10 +SCALE=10 -# Latency -run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE +while getopts "p:c:f:r:t:b:" opt; do + case $opt in + p) PORT=$OPTARG;; + c) COUNT=$OPTARG;; + f) FLOW=$OPTARG;; + r) REPEAT=$OPTARG;; + s) SCALE=$OPTARG;; + b) BROKERS=$OPTARG;; + *) echo "Unknown option"; exit 1;; + esac +done + +BROKER=`echo $HOSTS | sed 's/,.*//'` # First broker + +run_test() { echo $*; shift; "$@"; echo; echo; echo; } # Multiple pubs/subs connect via multiple brokers (active-active) -run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10` +run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT # Multiple pubs/subs connect via single broker (active-passive) -run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10` +run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT + +# Latency +run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW + diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index 1f77226b4d..6138108558 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -77,6 +77,20 @@ def ssh_command(host, command): """Convert command into an ssh command on host with quoting""" return ["ssh", host] + [posix_quote(arg) for arg in command] +class Clients: + def __init__(self): self.clients=[] + + def add(self, client): + self.clients.append(client) + return client + + def kill(self): + for c in self.clients: + try: c.kill() + except: pass + +clients = Clients() + def start_receive(queue, index, opts, ready_queue, broker, host): address_opts=["create:receiver"] + opts.receive_option if opts.durable: address_opts += ["node:{durable:true}"] @@ -101,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE) + return clients.add(Popen(command, stdout=PIPE)) def start_send(queue, opts, broker, host): address="%s;{%s}"%(queue,",".join(opts.send_option)) @@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host): if opts.connection_options: command += ["--connection-options",opts.connection_options] if host: command = ssh_command(host, command) - return Popen(command, stdout=PIPE) + return clients.add(Popen(command, stdout=PIPE)) def first_line(p): out,err=p.communicate() @@ -133,7 +147,11 @@ def delete_queues(queues, broker): c = qpid.messaging.Connection(broker) c.open() for q in queues: - try: s = c.session().sender("%s;{delete:always}"%(q)) + try: + s = c.session() + snd = s.sender("%s;{delete:always}"%(q)) + snd.close() + s.sync() except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue" c.close() @@ -145,7 +163,6 @@ def print_header(timestamp): def parse(parser, lines): # Parse sender/receiver output for l in lines: fn_val = zip(parser, l) - return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): @@ -156,11 +173,12 @@ def parse_receivers(receivers): def print_data(send_stats, recv_stats): for send,recv in map(None, send_stats, recv_stats): - if send: print send[0], + line="" + if send: line += "%d"%send[0] if recv: - print "\t\t%d"%recv[0], - if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]), - print + line += "\t\t%d"%recv[0] + if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + print line def print_summary(send_stats, recv_stats): def avg(s): sum(s) / len(s) @@ -184,11 +202,11 @@ class ReadyReceiver: self.receiver = self.connection.session().receiver( "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) self.receiver.session.sync() - self.timeout=2 + self.timeout=10 def wait(self, receivers): try: - for i in xrange(len(receivers)): self.receiver.fetch(self.timeout) + for i in receivers: self.receiver.fetch(self.timeout) self.connection.close() except qpid.messaging.Empty: for r in receivers: @@ -221,20 +239,22 @@ def main(): receive_out = "" ready_queue="%s-ready"%(opts.queue_name) queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] - for i in xrange(opts.repeat): - delete_queues(queues, opts.broker[0]) - ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers)] - ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. - senders = [start_send(q, opts,brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.senders)] - if opts.report_header and i == 0: print_header(opts.timestamp) - send_stats=parse_senders(senders) - recv_stats=parse_receivers(receivers) - if opts.summarize: print_summary(send_stats, recv_stats) - else: print_data(send_stats, recv_stats) - delete_queues(queues, opts.broker[0]) + try: + for i in xrange(opts.repeat): + delete_queues(queues, opts.broker[0]) + ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) + receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.receivers)] + ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. + senders = [start_send(q, opts,brokers.next(), client_hosts.next()) + for q in queues for j in xrange(opts.senders)] + if opts.report_header and i == 0: print_header(opts.timestamp) + send_stats=parse_senders(senders) + recv_stats=parse_receivers(receivers) + if opts.summarize: print_summary(send_stats, recv_stats) + else: print_data(send_stats, recv_stats) + delete_queues(queues, opts.broker[0]) + finally: clients.kill() # No strays if __name__ == "__main__": main() diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 5a85da4fd2..9c713e872a 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -262,7 +262,7 @@ int main(int argc, char ** argv) return 0; } } catch(const std::exception& error) { - std::cerr << "Failure: " << error.what() << std::endl; + std::cerr << "qpid-receive: " << error.what() << std::endl; connection.close(); return 1; } diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index 15fa284c48..ef5e98e2a0 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -368,7 +368,7 @@ int main(int argc, char ** argv) return 0; } } catch(const std::exception& error) { - std::cout << "Failed: " << error.what() << std::endl; + std::cerr << "qpid-send: " << error.what() << std::endl; connection.close(); return 1; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5c2949960c..5fc8d43e94 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1225,7 +1225,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic else { AMQQueue queue = new AMQQueue(queueName); - queue.setCreate(AddressOption.ALWAYS); return queue; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 0480ea4cab..30dc30cd81 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -1,6 +1,6 @@ package org.apache.qpid.test.client.destination; /* - * + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,16 +8,16 @@ package org.apache.qpid.test.client.destination; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * + * */ @@ -58,7 +58,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class); private Connection _connection; - + @Override public void setUp() throws Exception { @@ -66,20 +66,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _connection = getConnection() ; _connection.start(); } - + @Override public void tearDown() throws Exception { _connection.close(); super.tearDown(); } - + public void testCreateOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageProducer prod; MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; @@ -93,7 +93,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + try { prod = jmsSession.createProducer(dest); @@ -103,22 +103,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true)); - - + + // create always ------------------------------------------- addr1 = "ADDR:testQueue1; { create: always }"; dest = new AMQAnyDestination(addr1); - cons = jmsSession.createConsumer(dest); - + cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); - + // create receiver ----------------------------------------- addr1 = "ADDR:testQueue2; { create: receiver }"; dest = new AMQAnyDestination(addr1); @@ -131,32 +131,32 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - - cons = jmsSession.createConsumer(dest); - + + + cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); - + // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; dest = new AMQAnyDestination(addr1); try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } catch(JMSException e) { assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + try { prod = jmsSession.createProducer(dest); @@ -166,17 +166,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + "doesn't resolve to an exchange or a queue")); } - + assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - + // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; dest = new AMQAnyDestination(addr1); - + try { - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } catch(JMSException e) { @@ -185,162 +185,162 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - + prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs())); - + } - + // todo add tests for delete options - + public void testCreateQueue() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-queue/hello; " + - "{" + + "{" + "create: always, " + - "node: " + - "{" + + "node: " + + "{" + "durable: true ," + "x-declare: " + - "{" + + "{" + "auto-delete: true," + - "arguments: {" + + "arguments: {" + "'qpid.max_size': 1000," + "'qpid.max_count': 100" + - "}" + - "}, " + - "x-bindings: [{exchange : 'amq.direct', key : test}, " + + "}" + + "}, " + + "x-bindings: [{exchange : 'amq.direct', key : test}, " + "{exchange : 'amq.fanout'}," + "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," + "{exchange : 'amq.topic', key : 'a.#'}" + - "]," + - + "]," + + "}" + "}"; AMQDestination dest = new AMQAnyDestination(addr); - MessageConsumer cons = jmsSession.createConsumer(dest); - + MessageConsumer cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", + (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", dest.getAddressName(),null, null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", - dest.getAddressName(),"a.#", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + dest.getAddressName(),"a.#", null)); + Map<String,Object> args = new HashMap<String,Object>(); args.put("x-match","any"); args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); - + } - + public void testCreateExchange() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - - String addr = "ADDR:my-exchange/hello; " + - "{ " + - "create: always, " + - "node: " + + + String addr = "ADDR:my-exchange/hello; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true, " + - "arguments: {" + + "arguments: {" + "'qpid.msg_sequence': 1, " + "'qpid.ive': 1" + "}" + "}" + "}" + "}"; - + AMQDestination dest = new AMQAnyDestination(addr); - MessageConsumer cons = jmsSession.createConsumer(dest); - + MessageConsumer cons = jmsSession.createConsumer(dest); + assertTrue("Exchange not created as expected",( (AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true)); - + // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", dest.getQueueName(),"hello", Collections.<String, Object>emptyMap())); - + // The client should be able to query and verify the existence of my-exchange (QPID-2774) dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}"); - cons = jmsSession.createConsumer(dest); + cons = jmsSession.createConsumer(dest); } - + public void testBindQueueWithArgs() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}"; - - String addr = "ADDR:my-queue/hello; " + - "{ " + + + String addr = "ADDR:my-queue/hello; " + + "{ " + "create: always, " + - "node: " + - "{" + + "node: " + + "{" + "durable: true ," + - "x-declare: " + - "{ " + + "x-declare: " + + "{ " + "auto-delete: true," + "arguments: {'qpid.max_count': 100}" + "}, " + "x-bindings: [{exchange : 'amq.direct', key : test}, " + - "{exchange : 'amq.topic', key : 'a.#'}," + - headersBinding + + "{exchange : 'amq.topic', key : 'a.#'}," + + headersBinding + "]" + "}" + "}"; AMQDestination dest = new AMQAnyDestination(addr); - MessageConsumer cons = jmsSession.createConsumer(dest); - + MessageConsumer cons = jmsSession.createConsumer(dest); + assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); - + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", - dest.getAddressName(),"test", null)); - + (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + dest.getAddressName(),"test", null)); + assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); - + Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession_0_10)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } - + /** * Test goal: Verifies the capacity property in address string is handled properly. * Test strategy: @@ -348,22 +348,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Creates consumer with client ack. * Sends 15 messages to the queue, tries to receive 10. * Tries to receive the 11th message and checks if its null. - * - * Since capacity is 10 and we haven't acked any messages, + * + * Since capacity is 10 and we haven't acked any messages, * we should not have received the 11th. - * + * * Acks the 10th message and verifies we receive the rest of the msgs. */ public void testCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}"); } - + public void testSourceAndTargetCapacity() throws Exception { verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}"); } - + private void verifyCapacity(String address) throws Exception { if (!isCppBroker()) @@ -371,24 +371,24 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase _logger.info("Not C++ broker, exiting test"); return; } - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination dest = new AMQAnyDestination(address); - MessageConsumer cons = jmsSession.createConsumer(dest); + MessageConsumer cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); - + for (int i=0; i< 15; i++) { prod.send(jmsSession.createTextMessage("msg" + i) ); } - + for (int i=0; i< 9; i++) { cons.receive(); } Message msg = cons.receive(RECEIVE_TIMEOUT); - assertNotNull("Should have received the 10th message",msg); + assertNotNull("Should have received the 10th message",msg); assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT)); msg.acknowledge(); for (int i=11; i<16; i++) @@ -396,48 +396,48 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT)); } } - + /** * Test goal: Verifies if the new address format based destinations * can be specified and loaded correctly from the properties file. - * + * */ public void testLoadingFromPropertiesFile() throws Exception { - Hashtable<String,String> map = new Hashtable<String,String>(); - map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + + Hashtable<String,String> map = new Hashtable<String,String>(); + map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " + "{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}"); - + map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }"); map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'"); - + PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory(); Context ctx = props.getInitialContext(map); - - AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); + + AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1"); AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2"); AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3"); - + Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - MessageConsumer cons1 = jmsSession.createConsumer(dest1); + MessageConsumer cons1 = jmsSession.createConsumer(dest1); MessageConsumer cons2 = jmsSession.createConsumer(dest2); MessageConsumer cons3 = jmsSession.createConsumer(dest3); - + assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true)); - + (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true)); + assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); - + assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true)); - + (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true)); + assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession_0_10)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); - + MessageProducer producer = jmsSession.createProducer(dest3); producer.send(jmsSession.createTextMessage("Hello")); TextMessage msg = (TextMessage)cons3.receive(1000); @@ -448,64 +448,64 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase * Test goal: Verifies the subject can be overridden using "qpid.subject" message property. * Test strategy: Creates and address with a default subject "topic1" * Creates a message with "qpid.subject"="topic2" and sends it. - * Verifies that the message goes to "topic2" instead of "topic1". + * Verifies that the message goes to "topic2" instead of "topic1". */ public void testOverridingSubject() throws Exception { Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - + AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}"); - + MessageProducer prod = jmsSession.createProducer(topic1); - + Message m = jmsSession.createTextMessage("Hello"); m.setStringProperty("qpid.subject", "topic2"); - + MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1); MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}")); - + prod.send(m); Message msg = consForTopic1.receive(1000); assertNull("message shouldn't have been sent to topic1",msg); - + msg = consForTopic2.receive(1000); - assertNotNull("message should have been sent to topic2",msg); - + assertNotNull("message should have been sent to topic2",msg); + } - + /** - * Test goal: Verifies that and address based destination can be used successfully + * Test goal: Verifies that and address based destination can be used successfully * as a reply to. */ public void testAddressBasedReplyTo() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:amq.direct/x512; {create: receiver, " + - "link : {name : 'MY.RESP.QUEUE', " + + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }"; - + Destination replyTo = new AMQAnyDestination(addr); Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello"); - - MessageConsumer cons = jmsSession.createConsumer(dest); + + MessageConsumer cons = jmsSession.createConsumer(dest); MessageProducer prod = jmsSession.createProducer(dest); Message m = jmsSession.createTextMessage("Hello"); m.setJMSReplyTo(replyTo); prod.send(m); - + Message msg = cons.receive(1000); assertNotNull("consumer should have received the message",msg); - + MessageConsumer replyToCons = jmsSession.createConsumer(replyTo); MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo()); replyToProd.send(jmsSession.createTextMessage("reply")); - + Message replyToMsg = replyToCons.receive(1000); - assertNotNull("The reply to consumer should have got the message",replyToMsg); + assertNotNull("The reply to consumer should have got the message",replyToMsg); } - + /** * Test goal: Verifies that session.createQueue method * works as expected both with the new and old addressing scheme. @@ -513,46 +513,61 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSessionCreateQueue() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Destination queue = ssn.createQueue("my-queue"); - MessageProducer prod = ssn.createProducer(queue); + MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method + // default case queue = ssn.createQueue("ADDR:my-queue2"); - prod = ssn.createProducer(queue); + try + { + prod = ssn.createProducer(queue); + fail("The client should throw an exception, since there is no queue present in the broker"); + } + catch(Exception e) + { + String s = "The name 'my-queue2' supplied in the address " + + "doesn't resolve to an exchange or a queue"; + assertEquals(s,e.getCause().getCause().getMessage()); + } + + // explicit create case + queue = ssn.createQueue("ADDR:my-queue2; {create: sender}"); + prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession_0_10)ssn).isQueueBound("", "my-queue2","my-queue2", null)); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method to create a more complicated queue String addr = "ADDR:amq.direct/x512; {create: receiver, " + - "link : {name : 'MY.RESP.QUEUE', " + + "link : {name : 'MY.RESP.QUEUE', " + "x-declare : { auto-delete: true, exclusive: true, " + "arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }"; queue = ssn.createQueue(addr); - - prod = ssn.createProducer(queue); + + prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession_0_10)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } - + /** * Test goal: Verifies that session.creatTopic method * works as expected both with the new and old addressing scheme. @@ -560,71 +575,71 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSessionCreateTopic() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + // Using the BURL method Topic topic = ssn.createTopic("ACME"); - MessageProducer prod = ssn.createProducer(topic); + MessageProducer prod = ssn.createProducer(topic); MessageConsumer cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - + // Using the ADDR method topic = ssn.createTopic("ADDR:ACME"); - prod = ssn.createProducer(topic); + prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); - + prod.send(ssn.createTextMessage("test")); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); - - String addr = "ADDR:vehicles/bus; " + - "{ " + - "create: always, " + - "node: " + + + String addr = "ADDR:vehicles/bus; " + + "{ " + + "create: always, " + + "node: " + "{" + "type: topic, " + "x-declare: " + - "{ " + - "type:direct, " + + "{ " + + "type:direct, " + "auto-delete: true, " + - "arguments: {" + + "arguments: {" + "'qpid.msg_sequence': 1, " + - "'qpid.ive': 1" + + "'qpid.ive': 1" + "}" + "}" + "}, " + "link: {name : my-topic, " + "x-bindings: [{exchange : 'vehicles', key : car}, " + - "{exchange : 'vehicles', key : van}]" + - "}" + + "{exchange : 'vehicles', key : van}]" + + "}" + "}"; - + // Using the ADDR method to create a more complicated topic topic = ssn.createTopic(addr); - prod = ssn.createProducer(topic); + prod = ssn.createProducer(topic); cons = ssn.createConsumer(topic); - + assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","bus", null)); - + assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","car", null)); - + assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession_0_10)ssn).isQueueBound("vehicles", "my-topic","van", null)); - + Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "van"); prod.send(msg); assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + /** * Test Goal : Verify the default subjects used for each exchange type. * The default for amq.topic is "#" and for the rest it's "" @@ -632,92 +647,92 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testDefaultSubjects() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct")); MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic")); - + MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct")); MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather")); MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales")); - + queueProducer.send(ssn.createBytesMessage()); assertNotNull("The consumer subscribed to amq.direct " + "with empty binding key should have received the message ",queueCons.receive(1000)); - + topicProducer1.send(ssn.createTextMessage("25c")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"25c"); - + topicProducer2.send(ssn.createTextMessage("1000")); assertEquals("The consumer subscribed to amq.topic " + "with '#' binding key should have received the message ", ((TextMessage)topicCons.receive(1000)).getText(),"1000"); } - + /** * Test Goal : Verify that 'mode : browse' works as expected using a regular consumer. * This indirectly tests ring queues as well. */ public void testBrowseMode() throws Exception { - + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - + String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " + "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " + "x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}"; - + Destination dest = ssn.createQueue(addr); MessageConsumer browseCons = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); - + prod.send(ssn.createTextMessage("Test1")); prod.send(ssn.createTextMessage("Test2")); - + TextMessage msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test1"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Didn't receive the first message",msg.getText(),"Test2"); - - browseCons.close(); + + browseCons.close(); prod.send(ssn.createTextMessage("Test3")); browseCons = ssn.createConsumer(dest); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the second message again",msg.getText(),"Test2"); - + msg = (TextMessage)browseCons.receive(1000); assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3"); - + assertNull("Should not receive anymore messages",browseCons.receive(500)); } - + /** * Test Goal : When the same destination is used when creating two consumers, - * If the type == topic, verify that unique subscription queues are created, + * If the type == topic, verify that unique subscription queues are created, * unless subscription queue has a name. - * + * * If the type == queue, same queue should be shared. */ public void testSubscriptionForSameDestination() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); TextMessage m = (TextMessage)consumer1.receive(1000); assertEquals("Consumer1 should recieve message A",m.getText(),"A"); m = (TextMessage)consumer2.receive(1000); assertEquals("Consumer2 should recieve message A",m.getText(),"A"); - + consumer1.close(); consumer2.close(); - + dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}"); consumer1 = ssn.createConsumer(dest); try @@ -726,60 +741,60 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } _connection.close(); - + _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); dest = ssn.createTopic("ADDR:my_queue; {create: always}"); consumer1 = ssn.createConsumer(dest); consumer2 = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + prod.send(ssn.createTextMessage("A")); - Message m1 = consumer1.receive(1000); + Message m1 = consumer1.receive(1000); Message m2 = consumer2.receive(1000); - + if (m1 != null) { - assertNull("Only one consumer should receive the message",m2); + assertNull("Only one consumer should receive the message",m2); } else { - assertNotNull("Only one consumer should receive the message",m2); + assertNotNull("Only one consumer should receive the message",m2); } } - + public void testXBindingsWithoutExchangeName() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); String addr = "ADDR:MRKT; " + "{" + - "create: receiver," + + "create: receiver," + "node : {type: topic, x-declare: {type: topic} }," + "link:{" + "name: my-topic," + "x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" + "}" + "}"; - + // Using the ADDR method to create a more complicated topic MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr)); - + assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); - + assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession_0_10)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); - + MessageProducer prod = ssn.createProducer(ssn.createTopic(addr)); Message msg = ssn.createTextMessage("test"); msg.setStringProperty("qpid.subject", "NASDAQ.ABCD"); @@ -787,7 +802,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull("consumer should receive a message",cons.receive(1000)); cons.close(); } - + public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); @@ -800,57 +815,57 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber"); } catch(Exception e) - { + { } } - + public void testQueueReceiversAndTopicSubscriber() throws Exception { Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}"); Topic topic = new AMQAnyDestination("ADDR:amq.topic/test"); - + QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE); QueueReceiver receiver = qSession.createReceiver(queue); - + TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE); TopicSubscriber sub = tSession.createSubscriber(topic); - + Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue")); prod1.send(ssn.createTextMessage("test1")); - + MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test")); prod2.send(ssn.createTextMessage("test2")); - + Message msg1 = receiver.receive(); assertNotNull(msg1); assertEquals("test1",((TextMessage)msg1).getText()); - + Message msg2 = sub.receive(); assertNotNull(msg2); - assertEquals("test2",((TextMessage)msg2).getText()); + assertEquals("test2",((TextMessage)msg2).getText()); } - + public void testDurableSubscriber() throws Exception { - Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Topic topic = ssn.createTopic("news.us"); - + MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub"); MessageProducer prod = ssn.createProducer(topic); - + Message m = ssn.createTextMessage("A"); prod.send(m); Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("A",((TextMessage)msg).getText()); } - + public void testDeleteOptions() throws Exception { Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); MessageConsumer cons; - + // default (create never, assert never) ------------------- // create never -------------------------------------------- String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; @@ -864,11 +879,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); - - + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; dest = new AMQAnyDestination(addr2); try @@ -880,11 +895,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + - String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; dest = new AMQAnyDestination(addr3); try @@ -897,45 +912,45 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { fail("Exception should not be thrown. Exception thrown is : " + e); } - + assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + - } - + /** * Test Goals : 1. Test if the client sets the correct accept mode for unreliable * and at-least-once. * 2. Test default reliability modes for Queues and Topics. * 3. Test if an exception is thrown if exactly-once is used. * 4. Test if an exception is thrown if at-least-once is used with topics. - * + * * Test Strategy: For goal #1 & #2 * For unreliable and at-least-once the test tries to receives messages * in client_ack mode but does not ack the messages. * It will then close the session, recreate a new session * and will then try to verify the queue depth. * For unreliable the messages should have been taken off the queue. - * For at-least-once the messages should be put back onto the queue. - * + * For at-least-once the messages should be put back onto the queue. + * */ - + public void testReliabilityOptions() throws Exception { String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}"; acceptModeTest(addr1,0); - + String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}"; acceptModeTest(addr2,2); - + // Default accept-mode for topics - acceptModeTest("ADDR:amq.topic/test",0); - + acceptModeTest("ADDR:amq.topic/test",0); + // Default accept-mode for queues acceptModeTest("ADDR:testQueue1;{create: always}",2); - - String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; + + String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}"; try { AMQAnyDestination dest = new AMQAnyDestination(addr3); @@ -945,8 +960,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported")); } - - String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; + + String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}"; try { AMQAnyDestination dest = new AMQAnyDestination(addr4); @@ -959,34 +974,50 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics")); } } - + private void acceptModeTest(String address, int expectedQueueDepth) throws Exception { Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); MessageConsumer cons; MessageProducer prod; - + AMQDestination dest = new AMQAnyDestination(address); cons = ssn.createConsumer(dest); prod = ssn.createProducer(dest); - + for (int i=0; i < expectedQueueDepth; i++) { prod.send(ssn.createTextMessage("Msg" + i)); } - + for (int i=0; i < expectedQueueDepth; i++) { Message msg = cons.receive(1000); assertNotNull(msg); assertEquals("Msg" + i,((TextMessage)msg).getText()); } - + ssn.close(); ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); - long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); - assertEquals(expectedQueueDepth,queueDepth); + long queueDepth = ((AMQSession) ssn).getQueueDepth(dest); + assertEquals(expectedQueueDepth,queueDepth); + cons.close(); + prod.close(); + } + + public void testDestinationOnSend() throws Exception + { + Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); + MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test")); + MessageProducer prod = ssn.createProducer(null); + + Queue queue = ssn.createQueue("amq.topic/test"); + prod.send(queue,ssn.createTextMessage("A")); + + Message msg = cons.receive(1000); + assertNotNull(msg); + assertEquals("A",((TextMessage)msg).getText()); + prod.close(); cons.close(); - prod.close(); } } |