summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-03-15 14:34:02 +0000
committerAlan Conway <aconway@apache.org>2011-03-15 14:34:02 +0000
commitb51071c7876f42a6ddc44beddfb9593b915ae1b0 (patch)
tree9aba7efa175d5d41a846c34205ac297598156402
parente1db85e48dc991e6ed96d307b7db8e93b5eec697 (diff)
downloadqpid-python-b51071c7876f42a6ddc44beddfb9593b915ae1b0.tar.gz
Merge branch 'trunk' into qpid-2920, trunk at r1081631
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1081801 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am9
-rw-r--r--qpid/cpp/src/qpid/BufferRef.h70
-rw-r--r--qpid/cpp/src/qpid/Modules.cpp3
-rw-r--r--qpid/cpp/src/qpid/RefCountedBuffer.cpp22
-rw-r--r--qpid/cpp/src/qpid/RefCountedBuffer.h61
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.h32
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.cpp5
-rw-r--r--qpid/cpp/src/qpid/cluster/Event.h12
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark44
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark68
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp2
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java599
15 files changed, 528 insertions, 436 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index f27f69d182..d7be7121c1 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -176,7 +176,7 @@ nobase_include_HEADERS += \
../include/qpid/sys/posix/Time.h \
../include/qpid/sys/posix/check.h
-if HAVE_EPOLL
+if HAVE_EPOLL
poller = qpid/sys/epoll/EpollPoller.cpp
endif
@@ -195,7 +195,7 @@ libqpidcommon_la_SOURCES += $(poller) $(systeminfo)
posix_broker_src = \
qpid/broker/posix/BrokerDefaults.cpp
-lib_LTLIBRARIES = libqpidtypes.la libqpidcommon.la libqpidbroker.la libqpidclient.la libqpidmessaging.la
+lib_LTLIBRARIES = libqpidtypes.la libqpidcommon.la libqpidbroker.la libqpidclient.la libqpidmessaging.la
# Definitions for client and daemon plugins
PLUGINLDFLAGS=-no-undefined -module -avoid-version
@@ -203,7 +203,7 @@ confdir=$(sysconfdir)/qpid
dmoduledir=$(libdir)/qpid/daemon
cmoduledir=$(libdir)/qpid/client
dmodule_LTLIBRARIES =
-cmodule_LTLIBRARIES =
+cmodule_LTLIBRARIES =
include cluster.mk
include acl.mk
@@ -341,6 +341,7 @@ libqpidcommon_la_SOURCES += \
qpid/RefCounted.h \
qpid/RefCountedBuffer.cpp \
qpid/RefCountedBuffer.h \
+ qpid/BufferRef.h \
qpid/Sasl.h \
qpid/SaslFactory.cpp \
qpid/SaslFactory.h \
@@ -894,6 +895,6 @@ dist-hook: $(BUILT_SOURCES)
install-data-local:
$(mkinstalldirs) $(DESTDIR)/$(localstatedir)/lib/qpidd
-# Support for pkg-config
+# Support for pkg-config
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = qpid.pc
diff --git a/qpid/cpp/src/qpid/BufferRef.h b/qpid/cpp/src/qpid/BufferRef.h
new file mode 100644
index 0000000000..bfe1f9ebaa
--- /dev/null
+++ b/qpid/cpp/src/qpid/BufferRef.h
@@ -0,0 +1,70 @@
+#ifndef QPID_BUFFERREF_H
+#define QPID_BUFFERREF_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+/** Template for mutable or const buffer references */
+template <class T> class BufferRefT {
+ public:
+ BufferRefT() : begin_(0), end_(0) {}
+
+ BufferRefT(boost::intrusive_ptr<RefCounted> c, T* begin, T* end) :
+ counter(c), begin_(begin), end_(end) {}
+
+ template <class U> BufferRefT(const BufferRefT<U>& other) :
+ counter(other.counter), begin_(other.begin_), end_(other.end_) {}
+
+ T* begin() const { return begin_; }
+ T* end() const { return end_; }
+
+ /** Return a sub-buffer of the current buffer */
+ BufferRefT sub_buffer(T* begin, T* end) {
+ assert(begin_ <= begin && begin <= end_);
+ assert(begin_ <= end && end <= end_);
+ assert(begin <= end);
+ return BufferRefT(counter, begin, end);
+ }
+
+ private:
+ boost::intrusive_ptr<RefCounted> counter;
+ T* begin_;
+ T* end_;
+};
+
+/**
+ * Reference to a mutable ref-counted buffer.
+ */
+typedef BufferRefT<char> BufferRef;
+
+/**
+ * Reference to a const ref-counted buffer.
+ */
+typedef BufferRefT<const char> ConstBufferRef;
+
+} // namespace qpid
+
+#endif /*!QPID_BUFFERREF_H*/
diff --git a/qpid/cpp/src/qpid/Modules.cpp b/qpid/cpp/src/qpid/Modules.cpp
index 8f58df6ed1..727e05d212 100644
--- a/qpid/cpp/src/qpid/Modules.cpp
+++ b/qpid/cpp/src/qpid/Modules.cpp
@@ -64,7 +64,6 @@ void tryShlib(const char* libname_, bool noThrow) {
if (!isShlibName(libname)) libname += suffix();
try {
sys::Shlib shlib(libname);
- QPID_LOG (info, "Loaded Module: " << libname);
}
catch (const std::exception& /*e*/) {
if (!noThrow)
@@ -82,7 +81,7 @@ void loadModuleDir (std::string dirname, bool isDefault)
return;
throw Exception ("Directory not found: " + dirname);
}
- if (!fs::is_directory(dirPath))
+ if (!fs::is_directory(dirPath))
{
throw Exception ("Invalid value for module-dir: " + dirname + " is not a directory");
}
diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.cpp b/qpid/cpp/src/qpid/RefCountedBuffer.cpp
index 9b8f1ebd5e..40d620f7ad 100644
--- a/qpid/cpp/src/qpid/RefCountedBuffer.cpp
+++ b/qpid/cpp/src/qpid/RefCountedBuffer.cpp
@@ -24,30 +24,20 @@
namespace qpid {
-RefCountedBuffer::RefCountedBuffer() : count(0) {}
-
-void RefCountedBuffer::destroy() const {
+void RefCountedBuffer::released() const {
this->~RefCountedBuffer();
::delete[] reinterpret_cast<const char*>(this);
}
-char* RefCountedBuffer::addr() const {
- return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
-}
-
-RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) {
+BufferRef RefCountedBuffer::create(size_t n) {
char* store=::new char[n+sizeof(RefCountedBuffer)];
new(store) RefCountedBuffer;
- return pointer(reinterpret_cast<RefCountedBuffer*>(store));
+ char* start = store+sizeof(RefCountedBuffer);
+ return BufferRef(
+ boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)),
+ start, start+n);
}
-RefCountedBuffer::pointer::pointer() {}
-RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {}
-RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {}
-RefCountedBuffer::pointer::~pointer() {}
-RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; }
-
-char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; }
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.h b/qpid/cpp/src/qpid/RefCountedBuffer.h
index 75a23862be..f0ea86130b 100644
--- a/qpid/cpp/src/qpid/RefCountedBuffer.h
+++ b/qpid/cpp/src/qpid/RefCountedBuffer.h
@@ -22,68 +22,23 @@
*
*/
-#include <boost/utility.hpp>
-#include <boost/detail/atomic_count.hpp>
-#include <boost/intrusive_ptr.hpp>
+#include <qpid/RefCounted.h>
+#include <qpid/BufferRef.h>
namespace qpid {
/**
- * Reference-counted byte buffer.
- * No alignment guarantees.
+ * Reference-counted byte buffer. No alignment guarantees.
*/
-class RefCountedBuffer : boost::noncopyable {
- mutable boost::detail::atomic_count count;
- RefCountedBuffer();
- void destroy() const;
- char* addr() const;
-
-public:
- /** Smart char pointer to a reference counted buffer */
- class pointer {
- boost::intrusive_ptr<RefCountedBuffer> p;
- char* cp() const;
- pointer(RefCountedBuffer* x);
- friend class RefCountedBuffer;
-
- public:
- pointer();
- pointer(const pointer&);
- ~pointer();
- pointer& operator=(const pointer&);
-
- char* get() { return cp(); }
- operator char*() { return cp(); }
- char& operator*() { return *cp(); }
- char& operator[](size_t i) { return cp()[i]; }
-
- const char* get() const { return cp(); }
- operator const char*() const { return cp(); }
- const char& operator*() const { return *cp(); }
- const char& operator[](size_t i) const { return cp()[i]; }
- };
-
+class RefCountedBuffer : public RefCounted {
+ public:
/** Create a reference counted buffer of size n */
- static pointer create(size_t n);
-
- /** Get a pointer to the start of the buffer. */
- char* get() { return addr(); }
- const char* get() const { return addr(); }
- char& operator[](size_t i) { return get()[i]; }
- const char& operator[](size_t i) const { return get()[i]; }
+ static BufferRef create(size_t n);
- void addRef() const { ++count; }
- void release() const { if (--count==0) destroy(); }
- long refCount() { return count; }
+ protected:
+ void released() const;
};
} // namespace qpid
-// intrusive_ptr support.
-namespace boost {
-inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); }
-inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); }
-}
-
-
#endif /*!QPID_REFCOUNTEDBUFFER_H*/
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index f3baf00d1f..3b295d1f86 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,7 +38,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
// - excessive string copying: should be 0 copy, match from original buffer.
// - match/lookup: use descision tree or other more efficient structure.
-namespace
+namespace
{
const std::string STAR("*");
const std::string HASH("#");
@@ -110,7 +110,7 @@ public:
// Iterate over a string of '.'-separated tokens.
struct TopicExchange::TokenIterator {
typedef pair<const char*,const char*> Token;
-
+
TokenIterator(const char* b, const char* e) : end(e), token(make_pair(b, find(b,e,'.'))) {}
TokenIterator(const string& key) : end(&key[0]+key.size()), token(make_pair(&key[0], find(&key[0],end,'.'))) {}
@@ -221,7 +221,7 @@ TopicExchange::TopicExchange(const std::string& _name, bool _durable,
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args)
{
- ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+ ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind);
string fedTags(args ? args->getAsString(qpidFedTags) : "");
string fedOrigin(args ? args->getAsString(qpidFedOrigin) : "");
@@ -282,6 +282,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
}
}
+ cc.clearCache(); // clear the cache before we IVE route.
routeIVE();
if (propagate)
propagateFedOp(routingKey, fedTags, fedOp, fedOrigin);
@@ -289,7 +290,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* /*args*/){
- ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
+ ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit.
RWlock::ScopedWlock l(lock);
string routingKey = normalize(constRoutingKey);
BindingKey* bk = bindingTree.getBindingKey(routingKey);
@@ -336,23 +337,24 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, const Fiel
{
// Note: PERFORMANCE CRITICAL!!!
BindingList b;
- std::map<std::string, BindingList>::iterator it;
- { // only lock the cache for read
+ std::map<std::string, BindingList>::iterator it;
+ { // only lock the cache for read
RWlock::ScopedRlock cl(cacheLock);
- it = bindingCache.find(routingKey);
- }
+ it = bindingCache.find(routingKey);
+ if (it != bindingCache.end()) {
+ b = it->second;
+ }
+ }
PreRoute pr(msg, this);
- if (it == bindingCache.end()) // no cache hit
+ if (!b.get()) // no cache hit
{
RWlock::ScopedRlock l(lock);
b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >);
BindingsFinderIter bindingsFinder(b);
bindingTree.iterateMatch(routingKey, bindingsFinder);
- RWlock::ScopedWlock cwl(cacheLock);
- bindingCache[routingKey] = b; // update cache
- }else {
- b = it->second;
- }
+ RWlock::ScopedWlock cwl(cacheLock);
+ bindingCache[routingKey] = b; // update cache
+ }
doRoute(msg, b);
}
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h
index 3d2b3a95a9..e751a2b7c7 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.h
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.h
@@ -136,18 +136,26 @@ class TopicExchange : public virtual Exchange {
unsigned long nBindings;
qpid::sys::RWlock lock; // protects bindingTree and nBindings
qpid::sys::RWlock cacheLock; // protects cache
- std::map<std::string, BindingList> bindingCache; // cache of matched routes.
- class ClearCache {
- private:
- qpid::sys::RWlock* cacheLock;
- std::map<std::string, BindingList>* bindingCache;
- public:
- ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),bindingCache(bc) {};
- ~ClearCache(){
- qpid::sys::RWlock::ScopedWlock l(*cacheLock);
- bindingCache->clear();
- };
- };
+ std::map<std::string, BindingList> bindingCache; // cache of matched routes.
+ class ClearCache {
+ private:
+ qpid::sys::RWlock* cacheLock;
+ std::map<std::string, BindingList>* bindingCache;
+ bool cleared;
+ public:
+ ClearCache(qpid::sys::RWlock* l, std::map<std::string, BindingList>* bc): cacheLock(l),
+ bindingCache(bc),cleared(false) {};
+ void clearCache() {
+ qpid::sys::RWlock::ScopedWlock l(*cacheLock);
+ if (!cleared) {
+ bindingCache->clear();
+ cleared =true;
+ }
+ };
+ ~ClearCache(){
+ clearCache();
+ };
+ };
bool isBound(Queue::shared_ptr queue, const std::string& pattern);
class ReOriginIter;
diff --git a/qpid/cpp/src/qpid/cluster/Event.cpp b/qpid/cpp/src/qpid/cluster/Event.cpp
index cd775ce2f1..da2bc89d8c 100644
--- a/qpid/cpp/src/qpid/cluster/Event.cpp
+++ b/qpid/cpp/src/qpid/cluster/Event.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/RefCountedBuffer.h"
#include "qpid/assert.h"
#include <ostream>
#include <iterator>
diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h
index c2dca073d1..13283edff7 100644
--- a/qpid/cpp/src/qpid/cluster/Event.h
+++ b/qpid/cpp/src/qpid/cluster/Event.h
@@ -23,7 +23,7 @@
*/
#include "qpid/cluster/types.h"
-#include "qpid/RefCountedBuffer.h"
+#include "qpid/BufferRef.h"
#include "qpid/framing/AMQFrame.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -88,12 +88,12 @@ class Event : public EventHeader {
static Event control(const framing::AMQFrame&, const ConnectionId&);
// Data excluding header.
- char* getData() { return store + HEADER_SIZE; }
- const char* getData() const { return store + HEADER_SIZE; }
+ char* getData() { return store.begin() + HEADER_SIZE; }
+ const char* getData() const { return store.begin() + HEADER_SIZE; }
// Store including header
- char* getStore() { return store; }
- const char* getStore() const { return store; }
+ char* getStore() { return store.begin(); }
+ const char* getStore() const { return store.begin(); }
const framing::AMQFrame& getFrame() const;
@@ -104,7 +104,7 @@ class Event : public EventHeader {
private:
void encodeHeader() const;
- RefCountedBuffer::pointer store;
+ BufferRef store;
mutable framing::AMQFrame frame;
};
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 4408e63866..5f0e020475 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,21 +19,37 @@
#
# Benchmark script for comparing cluster performance.
-#PORT=":5555"
-BROKER=`echo $HOSTS | awk '{print $1}'` # Single broker
-BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list
-COUNT=100000
-RATE=20000 # Rate to throttle senders for latency results
-run_test() { echo $*; "$@"; echo; echo; echo; }
-# Thruput, unshared queue
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT
+# Default values
+PORT="5672"
+BROKERS=`echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g"` # Broker URL list
+COUNT=10000
+FLOW=100 # Flow control limit on queue depth for latency.
+REPEAT=10
+SCALE=10
-# Latency
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE
+while getopts "p:c:f:r:t:b:" opt; do
+ case $opt in
+ p) PORT=$OPTARG;;
+ c) COUNT=$OPTARG;;
+ f) FLOW=$OPTARG;;
+ r) REPEAT=$OPTARG;;
+ s) SCALE=$OPTARG;;
+ b) BROKERS=$OPTARG;;
+ *) echo "Unknown option"; exit 1;;
+ esac
+done
+
+BROKER=`echo $HOSTS | sed 's/,.*//'` # First broker
+
+run_test() { echo $*; shift; "$@"; echo; echo; echo; }
# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
# Multiple pubs/subs connect via single broker (active-passive)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -s$SCALE -r$SCALE -m $COUNT
+
+# Latency
+run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index 1f77226b4d..6138108558 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -77,6 +77,20 @@ def ssh_command(host, command):
"""Convert command into an ssh command on host with quoting"""
return ["ssh", host] + [posix_quote(arg) for arg in command]
+class Clients:
+ def __init__(self): self.clients=[]
+
+ def add(self, client):
+ self.clients.append(client)
+ return client
+
+ def kill(self):
+ for c in self.clients:
+ try: c.kill()
+ except: pass
+
+clients = Clients()
+
def start_receive(queue, index, opts, ready_queue, broker, host):
address_opts=["create:receiver"] + opts.receive_option
if opts.durable: address_opts += ["node:{durable:true}"]
@@ -101,7 +115,7 @@ def start_receive(queue, index, opts, ready_queue, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE)
+ return clients.add(Popen(command, stdout=PIPE))
def start_send(queue, opts, broker, host):
address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host):
if opts.connection_options:
command += ["--connection-options",opts.connection_options]
if host: command = ssh_command(host, command)
- return Popen(command, stdout=PIPE)
+ return clients.add(Popen(command, stdout=PIPE))
def first_line(p):
out,err=p.communicate()
@@ -133,7 +147,11 @@ def delete_queues(queues, broker):
c = qpid.messaging.Connection(broker)
c.open()
for q in queues:
- try: s = c.session().sender("%s;{delete:always}"%(q))
+ try:
+ s = c.session()
+ snd = s.sender("%s;{delete:always}"%(q))
+ snd.close()
+ s.sync()
except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
c.close()
@@ -145,7 +163,6 @@ def print_header(timestamp):
def parse(parser, lines): # Parse sender/receiver output
for l in lines:
fn_val = zip(parser, l)
-
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
@@ -156,11 +173,12 @@ def parse_receivers(receivers):
def print_data(send_stats, recv_stats):
for send,recv in map(None, send_stats, recv_stats):
- if send: print send[0],
+ line=""
+ if send: line += "%d"%send[0]
if recv:
- print "\t\t%d"%recv[0],
- if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]),
- print
+ line += "\t\t%d"%recv[0]
+ if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ print line
def print_summary(send_stats, recv_stats):
def avg(s): sum(s) / len(s)
@@ -184,11 +202,11 @@ class ReadyReceiver:
self.receiver = self.connection.session().receiver(
"%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
self.receiver.session.sync()
- self.timeout=2
+ self.timeout=10
def wait(self, receivers):
try:
- for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
+ for i in receivers: self.receiver.fetch(self.timeout)
self.connection.close()
except qpid.messaging.Empty:
for r in receivers:
@@ -221,20 +239,22 @@ def main():
receive_out = ""
ready_queue="%s-ready"%(opts.queue_name)
queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
- for i in xrange(opts.repeat):
- delete_queues(queues, opts.broker[0])
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
- receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers)]
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
- senders = [start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders)]
- if opts.report_header and i == 0: print_header(opts.timestamp)
- send_stats=parse_senders(senders)
- recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats)
- else: print_data(send_stats, recv_stats)
- delete_queues(queues, opts.broker[0])
+ try:
+ for i in xrange(opts.repeat):
+ delete_queues(queues, opts.broker[0])
+ ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+ receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.receivers)]
+ ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+ senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+ for q in queues for j in xrange(opts.senders)]
+ if opts.report_header and i == 0: print_header(opts.timestamp)
+ send_stats=parse_senders(senders)
+ recv_stats=parse_receivers(receivers)
+ if opts.summarize: print_summary(send_stats, recv_stats)
+ else: print_data(send_stats, recv_stats)
+ delete_queues(queues, opts.broker[0])
+ finally: clients.kill() # No strays
if __name__ == "__main__": main()
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 5a85da4fd2..9c713e872a 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -262,7 +262,7 @@ int main(int argc, char ** argv)
return 0;
}
} catch(const std::exception& error) {
- std::cerr << "Failure: " << error.what() << std::endl;
+ std::cerr << "qpid-receive: " << error.what() << std::endl;
connection.close();
return 1;
}
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index 15fa284c48..ef5e98e2a0 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -368,7 +368,7 @@ int main(int argc, char ** argv)
return 0;
}
} catch(const std::exception& error) {
- std::cout << "Failed: " << error.what() << std::endl;
+ std::cerr << "qpid-send: " << error.what() << std::endl;
connection.close();
return 1;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5c2949960c..5fc8d43e94 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1225,7 +1225,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
else
{
AMQQueue queue = new AMQQueue(queueName);
- queue.setCreate(AddressOption.ALWAYS);
return queue;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 0480ea4cab..30dc30cd81 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -1,6 +1,6 @@
package org.apache.qpid.test.client.destination;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.qpid.test.client.destination;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -58,7 +58,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AddressBasedDestinationTest.class);
private Connection _connection;
-
+
@Override
public void setUp() throws Exception
{
@@ -66,20 +66,20 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
_connection = getConnection() ;
_connection.start();
}
-
+
@Override
public void tearDown() throws Exception
{
_connection.close();
super.tearDown();
}
-
+
public void testCreateOptions() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer prod;
MessageConsumer cons;
-
+
// default (create never, assert never) -------------------
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1";
@@ -93,7 +93,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
try
{
prod = jmsSession.createProducer(dest);
@@ -103,22 +103,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest, (QueueNode)dest.getSourceNode() ,true));
-
-
+
+
// create always -------------------------------------------
addr1 = "ADDR:testQueue1; { create: always }";
dest = new AMQAnyDestination(addr1);
- cons = jmsSession.createConsumer(dest);
-
+ cons = jmsSession.createConsumer(dest);
+
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
-
+
// create receiver -----------------------------------------
addr1 = "ADDR:testQueue2; { create: receiver }";
dest = new AMQAnyDestination(addr1);
@@ -131,32 +131,32 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
- cons = jmsSession.createConsumer(dest);
-
+
+
+ cons = jmsSession.createConsumer(dest);
+
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
-
+
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
dest = new AMQAnyDestination(addr1);
try
{
- cons = jmsSession.createConsumer(dest);
+ cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
try
{
prod = jmsSession.createProducer(dest);
@@ -166,17 +166,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
"doesn't resolve to an exchange or a queue"));
}
-
+
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
+
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
dest = new AMQAnyDestination(addr1);
-
+
try
{
- cons = jmsSession.createConsumer(dest);
+ cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
@@ -185,162 +185,162 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
(AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
+
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), dest.getSourceNode().getDeclareArgs()));
-
+
}
-
+
// todo add tests for delete options
-
+
public void testCreateQueue() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:my-queue/hello; " +
- "{" +
+ "{" +
"create: always, " +
- "node: " +
- "{" +
+ "node: " +
+ "{" +
"durable: true ," +
"x-declare: " +
- "{" +
+ "{" +
"auto-delete: true," +
- "arguments: {" +
+ "arguments: {" +
"'qpid.max_size': 1000," +
"'qpid.max_count': 100" +
- "}" +
- "}, " +
- "x-bindings: [{exchange : 'amq.direct', key : test}, " +
+ "}" +
+ "}, " +
+ "x-bindings: [{exchange : 'amq.direct', key : test}, " +
"{exchange : 'amq.fanout'}," +
"{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}," +
"{exchange : 'amq.topic', key : 'a.#'}" +
- "]," +
-
+ "]," +
+
"}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
- MessageConsumer cons = jmsSession.createConsumer(dest);
-
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
dest.getAddressName(),"test", null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
dest.getAddressName(),null, null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
- dest.getAddressName(),"a.#", null));
-
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ dest.getAddressName(),"a.#", null));
+
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-match","any");
args.put("dep","sales");
args.put("loc","CA");
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, args));
-
+
}
-
+
public void testCreateExchange() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
- String addr = "ADDR:my-exchange/hello; " +
- "{ " +
- "create: always, " +
- "node: " +
+
+ String addr = "ADDR:my-exchange/hello; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
"{" +
"type: topic, " +
"x-declare: " +
- "{ " +
- "type:direct, " +
+ "{ " +
+ "type:direct, " +
"auto-delete: true, " +
- "arguments: {" +
+ "arguments: {" +
"'qpid.msg_sequence': 1, " +
"'qpid.ive': 1" +
"}" +
"}" +
"}" +
"}";
-
+
AMQDestination dest = new AMQAnyDestination(addr);
- MessageConsumer cons = jmsSession.createConsumer(dest);
-
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+
assertTrue("Exchange not created as expected",(
(AMQSession_0_10)jmsSession).isExchangeExist(dest, (ExchangeNode)dest.getTargetNode() , true));
-
+
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
+ (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
dest.getQueueName(),"hello", Collections.<String, Object>emptyMap()));
-
+
// The client should be able to query and verify the existence of my-exchange (QPID-2774)
dest = new AMQAnyDestination("ADDR:my-exchange; {create: never}");
- cons = jmsSession.createConsumer(dest);
+ cons = jmsSession.createConsumer(dest);
}
-
+
public void testBindQueueWithArgs() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String headersBinding = "{exchange: 'amq.match', arguments: {x-match: any, dep: sales, loc: CA}}";
-
- String addr = "ADDR:my-queue/hello; " +
- "{ " +
+
+ String addr = "ADDR:my-queue/hello; " +
+ "{ " +
"create: always, " +
- "node: " +
- "{" +
+ "node: " +
+ "{" +
"durable: true ," +
- "x-declare: " +
- "{ " +
+ "x-declare: " +
+ "{ " +
"auto-delete: true," +
"arguments: {'qpid.max_count': 100}" +
"}, " +
"x-bindings: [{exchange : 'amq.direct', key : test}, " +
- "{exchange : 'amq.topic', key : 'a.#'}," +
- headersBinding +
+ "{exchange : 'amq.topic', key : 'a.#'}," +
+ headersBinding +
"]" +
"}" +
"}";
AMQDestination dest = new AMQAnyDestination(addr);
- MessageConsumer cons = jmsSession.createConsumer(dest);
-
+ MessageConsumer cons = jmsSession.createConsumer(dest);
+
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
-
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
- dest.getAddressName(),"test", null));
-
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ dest.getAddressName(),"test", null));
+
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
dest.getAddressName(),"a.#", null));
-
+
Address a = Address.parse(headersBinding);
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, a.getOptions()));
}
-
+
/**
* Test goal: Verifies the capacity property in address string is handled properly.
* Test strategy:
@@ -348,22 +348,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
* Creates consumer with client ack.
* Sends 15 messages to the queue, tries to receive 10.
* Tries to receive the 11th message and checks if its null.
- *
- * Since capacity is 10 and we haven't acked any messages,
+ *
+ * Since capacity is 10 and we haven't acked any messages,
* we should not have received the 11th.
- *
+ *
* Acks the 10th message and verifies we receive the rest of the msgs.
*/
public void testCapacity() throws Exception
{
verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: 10}}");
}
-
+
public void testSourceAndTargetCapacity() throws Exception
{
verifyCapacity("ADDR:my-queue; {create: always, link:{capacity: {source:10, target:15} }}");
}
-
+
private void verifyCapacity(String address) throws Exception
{
if (!isCppBroker())
@@ -371,24 +371,24 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
_logger.info("Not C++ broker, exiting test");
return;
}
-
+
Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-
+
AMQDestination dest = new AMQAnyDestination(address);
- MessageConsumer cons = jmsSession.createConsumer(dest);
+ MessageConsumer cons = jmsSession.createConsumer(dest);
MessageProducer prod = jmsSession.createProducer(dest);
-
+
for (int i=0; i< 15; i++)
{
prod.send(jmsSession.createTextMessage("msg" + i) );
}
-
+
for (int i=0; i< 9; i++)
{
cons.receive();
}
Message msg = cons.receive(RECEIVE_TIMEOUT);
- assertNotNull("Should have received the 10th message",msg);
+ assertNotNull("Should have received the 10th message",msg);
assertNull("Shouldn't have received the 11th message as capacity is 10",cons.receive(RECEIVE_TIMEOUT));
msg.acknowledge();
for (int i=11; i<16; i++)
@@ -396,48 +396,48 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("Should have received the " + i + "th message as we acked the last 10",cons.receive(RECEIVE_TIMEOUT));
}
}
-
+
/**
* Test goal: Verifies if the new address format based destinations
* can be specified and loaded correctly from the properties file.
- *
+ *
*/
public void testLoadingFromPropertiesFile() throws Exception
{
- Hashtable<String,String> map = new Hashtable<String,String>();
- map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
+ Hashtable<String,String> map = new Hashtable<String,String>();
+ map.put("destination.myQueue1", "ADDR:my-queue/hello; {create: always, node: " +
"{x-declare: {auto-delete: true, arguments : {'qpid.max_size': 1000}}}}");
-
+
map.put("destination.myQueue2", "ADDR:my-queue2; { create: receiver }");
map.put("destination.myQueue3", "BURL:direct://amq.direct/my-queue3?routingkey='test'");
-
+
PropertiesFileInitialContextFactory props = new PropertiesFileInitialContextFactory();
Context ctx = props.getInitialContext(map);
-
- AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");
+
+ AMQDestination dest1 = (AMQDestination)ctx.lookup("myQueue1");
AMQDestination dest2 = (AMQDestination)ctx.lookup("myQueue2");
AMQDestination dest3 = (AMQDestination)ctx.lookup("myQueue3");
-
+
Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons1 = jmsSession.createConsumer(dest1);
+ MessageConsumer cons1 = jmsSession.createConsumer(dest1);
MessageConsumer cons2 = jmsSession.createConsumer(dest2);
MessageConsumer cons3 = jmsSession.createConsumer(dest3);
-
+
assertTrue("Destination1 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest1,(QueueNode)dest1.getSourceNode(), true));
+
assertTrue("Destination1 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest1.getAddressName(),dest1.getAddressName(), null));
-
+
assertTrue("Destination2 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest2,(QueueNode)dest2.getSourceNode(), true));
+
assertTrue("Destination2 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession_0_10)jmsSession).isQueueBound("",
dest2.getAddressName(),dest2.getAddressName(), null));
-
+
MessageProducer producer = jmsSession.createProducer(dest3);
producer.send(jmsSession.createTextMessage("Hello"));
TextMessage msg = (TextMessage)cons3.receive(1000);
@@ -448,64 +448,64 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
* Test goal: Verifies the subject can be overridden using "qpid.subject" message property.
* Test strategy: Creates and address with a default subject "topic1"
* Creates a message with "qpid.subject"="topic2" and sends it.
- * Verifies that the message goes to "topic2" instead of "topic1".
+ * Verifies that the message goes to "topic2" instead of "topic1".
*/
public void testOverridingSubject() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
-
+
AMQDestination topic1 = new AMQAnyDestination("ADDR:amq.topic/topic1; {link:{name: queue1}}");
-
+
MessageProducer prod = jmsSession.createProducer(topic1);
-
+
Message m = jmsSession.createTextMessage("Hello");
m.setStringProperty("qpid.subject", "topic2");
-
+
MessageConsumer consForTopic1 = jmsSession.createConsumer(topic1);
MessageConsumer consForTopic2 = jmsSession.createConsumer(new AMQAnyDestination("ADDR:amq.topic/topic2; {link:{name: queue2}}"));
-
+
prod.send(m);
Message msg = consForTopic1.receive(1000);
assertNull("message shouldn't have been sent to topic1",msg);
-
+
msg = consForTopic2.receive(1000);
- assertNotNull("message should have been sent to topic2",msg);
-
+ assertNotNull("message should have been sent to topic2",msg);
+
}
-
+
/**
- * Test goal: Verifies that and address based destination can be used successfully
+ * Test goal: Verifies that and address based destination can be used successfully
* as a reply to.
*/
public void testAddressBasedReplyTo() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:amq.direct/x512; {create: receiver, " +
- "link : {name : 'MY.RESP.QUEUE', " +
+ "link : {name : 'MY.RESP.QUEUE', " +
"x-declare : { auto-delete: true, exclusive: true, " +
"arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring }} } }";
-
+
Destination replyTo = new AMQAnyDestination(addr);
Destination dest =new AMQAnyDestination("ADDR:amq.direct/Hello");
-
- MessageConsumer cons = jmsSession.createConsumer(dest);
+
+ MessageConsumer cons = jmsSession.createConsumer(dest);
MessageProducer prod = jmsSession.createProducer(dest);
Message m = jmsSession.createTextMessage("Hello");
m.setJMSReplyTo(replyTo);
prod.send(m);
-
+
Message msg = cons.receive(1000);
assertNotNull("consumer should have received the message",msg);
-
+
MessageConsumer replyToCons = jmsSession.createConsumer(replyTo);
MessageProducer replyToProd = jmsSession.createProducer(msg.getJMSReplyTo());
replyToProd.send(jmsSession.createTextMessage("reply"));
-
+
Message replyToMsg = replyToCons.receive(1000);
- assertNotNull("The reply to consumer should have got the message",replyToMsg);
+ assertNotNull("The reply to consumer should have got the message",replyToMsg);
}
-
+
/**
* Test goal: Verifies that session.createQueue method
* works as expected both with the new and old addressing scheme.
@@ -513,46 +513,61 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testSessionCreateQueue() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
// Using the BURL method
Destination queue = ssn.createQueue("my-queue");
- MessageProducer prod = ssn.createProducer(queue);
+ MessageProducer prod = ssn.createProducer(queue);
MessageConsumer cons = ssn.createConsumer(queue);
assertTrue("my-queue was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
"my-queue","my-queue", null));
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
// Using the ADDR method
+ // default case
queue = ssn.createQueue("ADDR:my-queue2");
- prod = ssn.createProducer(queue);
+ try
+ {
+ prod = ssn.createProducer(queue);
+ fail("The client should throw an exception, since there is no queue present in the broker");
+ }
+ catch(Exception e)
+ {
+ String s = "The name 'my-queue2' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
+ assertEquals(s,e.getCause().getCause().getMessage());
+ }
+
+ // explicit create case
+ queue = ssn.createQueue("ADDR:my-queue2; {create: sender}");
+ prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("my-queue2 was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("",
+ (AMQSession_0_10)ssn).isQueueBound("",
"my-queue2","my-queue2", null));
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
// Using the ADDR method to create a more complicated queue
String addr = "ADDR:amq.direct/x512; {create: receiver, " +
- "link : {name : 'MY.RESP.QUEUE', " +
+ "link : {name : 'MY.RESP.QUEUE', " +
"x-declare : { auto-delete: true, exclusive: true, " +
"arguments : {'qpid.max_size': 1000, 'qpid.policy_type': ring} } } }";
queue = ssn.createQueue(addr);
-
- prod = ssn.createProducer(queue);
+
+ prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("MY.RESP.QUEUE was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession_0_10)ssn).isQueueBound("amq.direct",
"MY.RESP.QUEUE","x512", null));
cons.close();
}
-
+
/**
* Test goal: Verifies that session.creatTopic method
* works as expected both with the new and old addressing scheme.
@@ -560,71 +575,71 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testSessionCreateTopic() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
// Using the BURL method
Topic topic = ssn.createTopic("ACME");
- MessageProducer prod = ssn.createProducer(topic);
+ MessageProducer prod = ssn.createProducer(topic);
MessageConsumer cons = ssn.createConsumer(topic);
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
+
// Using the ADDR method
topic = ssn.createTopic("ADDR:ACME");
- prod = ssn.createProducer(topic);
+ prod = ssn.createProducer(topic);
cons = ssn.createConsumer(topic);
-
+
prod.send(ssn.createTextMessage("test"));
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
-
- String addr = "ADDR:vehicles/bus; " +
- "{ " +
- "create: always, " +
- "node: " +
+
+ String addr = "ADDR:vehicles/bus; " +
+ "{ " +
+ "create: always, " +
+ "node: " +
"{" +
"type: topic, " +
"x-declare: " +
- "{ " +
- "type:direct, " +
+ "{ " +
+ "type:direct, " +
"auto-delete: true, " +
- "arguments: {" +
+ "arguments: {" +
"'qpid.msg_sequence': 1, " +
- "'qpid.ive': 1" +
+ "'qpid.ive': 1" +
"}" +
"}" +
"}, " +
"link: {name : my-topic, " +
"x-bindings: [{exchange : 'vehicles', key : car}, " +
- "{exchange : 'vehicles', key : van}]" +
- "}" +
+ "{exchange : 'vehicles', key : van}]" +
+ "}" +
"}";
-
+
// Using the ADDR method to create a more complicated topic
topic = ssn.createTopic(addr);
- prod = ssn.createProducer(topic);
+ prod = ssn.createProducer(topic);
cons = ssn.createConsumer(topic);
-
+
assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
"my-topic","bus", null));
-
+
assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
"my-topic","car", null));
-
+
assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession_0_10)ssn).isQueueBound("vehicles",
"my-topic","van", null));
-
+
Message msg = ssn.createTextMessage("test");
msg.setStringProperty("qpid.subject", "van");
prod.send(msg);
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
}
-
+
/**
* Test Goal : Verify the default subjects used for each exchange type.
* The default for amq.topic is "#" and for the rest it's ""
@@ -632,92 +647,92 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testDefaultSubjects() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
MessageConsumer queueCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.direct"));
MessageConsumer topicCons = ssn.createConsumer(new AMQAnyDestination("ADDR:amq.topic"));
-
+
MessageProducer queueProducer = ssn.createProducer(new AMQAnyDestination("ADDR:amq.direct"));
MessageProducer topicProducer1 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/usa.weather"));
MessageProducer topicProducer2 = ssn.createProducer(new AMQAnyDestination("ADDR:amq.topic/sales"));
-
+
queueProducer.send(ssn.createBytesMessage());
assertNotNull("The consumer subscribed to amq.direct " +
"with empty binding key should have received the message ",queueCons.receive(1000));
-
+
topicProducer1.send(ssn.createTextMessage("25c"));
assertEquals("The consumer subscribed to amq.topic " +
"with '#' binding key should have received the message ",
((TextMessage)topicCons.receive(1000)).getText(),"25c");
-
+
topicProducer2.send(ssn.createTextMessage("1000"));
assertEquals("The consumer subscribed to amq.topic " +
"with '#' binding key should have received the message ",
((TextMessage)topicCons.receive(1000)).getText(),"1000");
}
-
+
/**
* Test Goal : Verify that 'mode : browse' works as expected using a regular consumer.
* This indirectly tests ring queues as well.
*/
public void testBrowseMode() throws Exception
{
-
+
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-
+
String addr = "ADDR:my-ring-queue; {create: always, mode: browse, " +
"node: {x-bindings: [{exchange : 'amq.direct', key : test}], " +
"x-declare:{arguments : {'qpid.policy_type':ring, 'qpid.max_count':2}}}}";
-
+
Destination dest = ssn.createQueue(addr);
MessageConsumer browseCons = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test"));
-
+
prod.send(ssn.createTextMessage("Test1"));
prod.send(ssn.createTextMessage("Test2"));
-
+
TextMessage msg = (TextMessage)browseCons.receive(1000);
assertEquals("Didn't receive the first message",msg.getText(),"Test1");
-
+
msg = (TextMessage)browseCons.receive(1000);
assertEquals("Didn't receive the first message",msg.getText(),"Test2");
-
- browseCons.close();
+
+ browseCons.close();
prod.send(ssn.createTextMessage("Test3"));
browseCons = ssn.createConsumer(dest);
-
+
msg = (TextMessage)browseCons.receive(1000);
assertEquals("Should receive the second message again",msg.getText(),"Test2");
-
+
msg = (TextMessage)browseCons.receive(1000);
assertEquals("Should receive the third message since it's a ring queue",msg.getText(),"Test3");
-
+
assertNull("Should not receive anymore messages",browseCons.receive(500));
}
-
+
/**
* Test Goal : When the same destination is used when creating two consumers,
- * If the type == topic, verify that unique subscription queues are created,
+ * If the type == topic, verify that unique subscription queues are created,
* unless subscription queue has a name.
- *
+ *
* If the type == queue, same queue should be shared.
*/
public void testSubscriptionForSameDestination() throws Exception
{
- Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
MessageConsumer consumer1 = ssn.createConsumer(dest);
MessageConsumer consumer2 = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(dest);
-
+
prod.send(ssn.createTextMessage("A"));
TextMessage m = (TextMessage)consumer1.receive(1000);
assertEquals("Consumer1 should recieve message A",m.getText(),"A");
m = (TextMessage)consumer2.receive(1000);
assertEquals("Consumer2 should recieve message A",m.getText(),"A");
-
+
consumer1.close();
consumer2.close();
-
+
dest = ssn.createTopic("ADDR:amq.topic/foo; { link: {name: my-queue}}");
consumer1 = ssn.createConsumer(dest);
try
@@ -726,60 +741,60 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
}
catch(Exception e)
- {
+ {
}
_connection.close();
-
+
_connection = getConnection() ;
_connection.start();
- ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
dest = ssn.createTopic("ADDR:my_queue; {create: always}");
consumer1 = ssn.createConsumer(dest);
consumer2 = ssn.createConsumer(dest);
prod = ssn.createProducer(dest);
-
+
prod.send(ssn.createTextMessage("A"));
- Message m1 = consumer1.receive(1000);
+ Message m1 = consumer1.receive(1000);
Message m2 = consumer2.receive(1000);
-
+
if (m1 != null)
{
- assertNull("Only one consumer should receive the message",m2);
+ assertNull("Only one consumer should receive the message",m2);
}
else
{
- assertNotNull("Only one consumer should receive the message",m2);
+ assertNotNull("Only one consumer should receive the message",m2);
}
}
-
+
public void testXBindingsWithoutExchangeName() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
String addr = "ADDR:MRKT; " +
"{" +
- "create: receiver," +
+ "create: receiver," +
"node : {type: topic, x-declare: {type: topic} }," +
"link:{" +
"name: my-topic," +
"x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]" +
"}" +
"}";
-
+
// Using the ADDR method to create a more complicated topic
MessageConsumer cons = ssn.createConsumer(new AMQAnyDestination(addr));
-
+
assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","NYSE.#", null));
-
+
assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","NASDAQ.#", null));
-
+
assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession_0_10)ssn).isQueueBound("MRKT",
"my-topic","CNTL.#", null));
-
+
MessageProducer prod = ssn.createProducer(ssn.createTopic(addr));
Message msg = ssn.createTextMessage("test");
msg.setStringProperty("qpid.subject", "NASDAQ.ABCD");
@@ -787,7 +802,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull("consumer should receive a message",cons.receive(1000));
cons.close();
}
-
+
public void testXSubscribeOverrides() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -800,57 +815,57 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
fail("An exception should be thrown as 'my-queue' already have an exclusive subscriber");
}
catch(Exception e)
- {
+ {
}
}
-
+
public void testQueueReceiversAndTopicSubscriber() throws Exception
{
Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
-
+
QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = qSession.createReceiver(queue);
-
+
TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = tSession.createSubscriber(topic);
-
+
Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
prod1.send(ssn.createTextMessage("test1"));
-
+
MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
prod2.send(ssn.createTextMessage("test2"));
-
+
Message msg1 = receiver.receive();
assertNotNull(msg1);
assertEquals("test1",((TextMessage)msg1).getText());
-
+
Message msg2 = sub.receive();
assertNotNull(msg2);
- assertEquals("test2",((TextMessage)msg2).getText());
+ assertEquals("test2",((TextMessage)msg2).getText());
}
-
+
public void testDurableSubscriber() throws Exception
{
- Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = ssn.createTopic("news.us");
-
+
MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub");
MessageProducer prod = ssn.createProducer(topic);
-
+
Message m = ssn.createTextMessage("A");
prod.send(m);
Message msg = cons.receive(1000);
assertNotNull(msg);
assertEquals("A",((TextMessage)msg).getText());
}
-
+
public void testDeleteOptions() throws Exception
{
Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons;
-
+
// default (create never, assert never) -------------------
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
@@ -864,11 +879,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
fail("Exception should not be thrown. Exception thrown is : " + e);
}
-
+
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
-
-
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
dest = new AMQAnyDestination(addr2);
try
@@ -880,11 +895,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
fail("Exception should not be thrown. Exception thrown is : " + e);
}
-
+
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
-
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
dest = new AMQAnyDestination(addr3);
try
@@ -897,45 +912,45 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
fail("Exception should not be thrown. Exception thrown is : " + e);
}
-
+
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
-
}
-
+
/**
* Test Goals : 1. Test if the client sets the correct accept mode for unreliable
* and at-least-once.
* 2. Test default reliability modes for Queues and Topics.
* 3. Test if an exception is thrown if exactly-once is used.
* 4. Test if an exception is thrown if at-least-once is used with topics.
- *
+ *
* Test Strategy: For goal #1 & #2
* For unreliable and at-least-once the test tries to receives messages
* in client_ack mode but does not ack the messages.
* It will then close the session, recreate a new session
* and will then try to verify the queue depth.
* For unreliable the messages should have been taken off the queue.
- * For at-least-once the messages should be put back onto the queue.
- *
+ * For at-least-once the messages should be put back onto the queue.
+ *
*/
-
+
public void testReliabilityOptions() throws Exception
{
String addr1 = "ADDR:testQueue1;{create: always, delete : receiver, link : {reliability : unreliable}}";
acceptModeTest(addr1,0);
-
+
String addr2 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : at-least-once}}";
acceptModeTest(addr2,2);
-
+
// Default accept-mode for topics
- acceptModeTest("ADDR:amq.topic/test",0);
-
+ acceptModeTest("ADDR:amq.topic/test",0);
+
// Default accept-mode for queues
acceptModeTest("ADDR:testQueue1;{create: always}",2);
-
- String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
+
+ String addr3 = "ADDR:testQueue2;{create: always, delete : receiver, link : {reliability : exactly-once}}";
try
{
AMQAnyDestination dest = new AMQAnyDestination(addr3);
@@ -945,8 +960,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
assertTrue(e.getCause().getMessage().contains("The reliability mode 'exactly-once' is not yet supported"));
}
-
- String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
+
+ String addr4 = "ADDR:amq.topic/test;{link : {reliability : at-least-once}}";
try
{
AMQAnyDestination dest = new AMQAnyDestination(addr4);
@@ -959,34 +974,50 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertTrue(e.getCause().getMessage().contains("AT-LEAST-ONCE is not yet supported for Topics"));
}
}
-
+
private void acceptModeTest(String address, int expectedQueueDepth) throws Exception
{
Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons;
MessageProducer prod;
-
+
AMQDestination dest = new AMQAnyDestination(address);
cons = ssn.createConsumer(dest);
prod = ssn.createProducer(dest);
-
+
for (int i=0; i < expectedQueueDepth; i++)
{
prod.send(ssn.createTextMessage("Msg" + i));
}
-
+
for (int i=0; i < expectedQueueDepth; i++)
{
Message msg = cons.receive(1000);
assertNotNull(msg);
assertEquals("Msg" + i,((TextMessage)msg).getText());
}
-
+
ssn.close();
ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
- long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
- assertEquals(expectedQueueDepth,queueDepth);
+ long queueDepth = ((AMQSession) ssn).getQueueDepth(dest);
+ assertEquals(expectedQueueDepth,queueDepth);
+ cons.close();
+ prod.close();
+ }
+
+ public void testDestinationOnSend() throws Exception
+ {
+ Session ssn = _connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = ssn.createConsumer(ssn.createTopic("amq.topic/test"));
+ MessageProducer prod = ssn.createProducer(null);
+
+ Queue queue = ssn.createQueue("amq.topic/test");
+ prod.send(queue,ssn.createTextMessage("A"));
+
+ Message msg = cons.receive(1000);
+ assertNotNull(msg);
+ assertEquals("A",((TextMessage)msg).getText());
+ prod.close();
cons.close();
- prod.close();
}
}