summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-02-11 12:11:03 +0000
committerAidan Skinner <aidan@apache.org>2008-02-11 12:11:03 +0000
commit72f22f941942f2bc811caf7c1d1aca7dd8457984 (patch)
tree2db6dfac1159ab74b100054009c6eff83ec24a99 /qpid/java/client
parentfa99183557ada9117f23290cdee0e5b05004bc47 (diff)
downloadqpid-python-72f22f941942f2bc811caf7c1d1aca7dd8457984.tar.gz
Merged revisions 615958,615968,616353,616396,616402,616404,616445,616454,616507,616511,616542,616545,616715,616736,616927,616929,617188,617286,617303,617305,617320-617321,617510-617511,617513,617524,617527,617533,617543,617556,617582,617590,617592,617594,617596-617597,617607,617662,618412,618428,618436-618437,618450,618462,618519,618770,618982,618984,618986,618989,619012,619043,619148,619182,619189,619192,619200,619204,619424,619538,619604,619611,619626,619636,619646,619888,619903,619941 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk ................ r615958 | aconway | 2008-01-28 17:17:06 +0000 (Mon, 28 Jan 2008) | 1 line Convert ClientSessionTest to boost. ................ r615968 | aconway | 2008-01-28 17:42:22 +0000 (Mon, 28 Jan 2008) | 1 line Added disabled test and FIXME note to fix client-side race. ................ r616353 | aconway | 2008-01-29 14:48:59 +0000 (Tue, 29 Jan 2008) | 9 lines Deleted unused classes, adjusted files that still mention them. D src/qpid/framing/ChannelAdapter.cpp D src/qpid/framing/ChannelAdapter.h D src/qpid/framing/HandlerUpdater.h D src/tests/BrokerChannelTest.cpp D src/tests/MockChannel.h ................ r616396 | aconway | 2008-01-29 15:45:29 +0000 (Tue, 29 Jan 2008) | 3 lines Provide public read-access to IListNode pointers, so frame handlers can use then to find the next frame. ................ r616402 | aconway | 2008-01-29 15:48:35 +0000 (Tue, 29 Jan 2008) | 2 lines Remove references to defunct broker::ChannelHandler ................ r616404 | aconway | 2008-01-29 15:49:55 +0000 (Tue, 29 Jan 2008) | 2 lines Log peer address with SEND/RECV messages. ................ r616445 | aidan | 2008-01-29 16:38:53 +0000 (Tue, 29 Jan 2008) | 3 lines Initialized merge tracking via "svnmerge" with revisions "1-616438" from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ................ r616454 | aidan | 2008-01-29 17:25:32 +0000 (Tue, 29 Jan 2008) | 13 lines Merged revisions 579126,579137 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r579126 | rgreig | 2007-09-25 09:42:53 +0100 (Tue, 25 Sep 2007) | 2 lines QPID-582: fix some 1.6 compile errors ........ r579137 | rgreig | 2007-09-25 10:00:34 +0100 (Tue, 25 Sep 2007) | 2 lines QPID-582 fix Java 6 compile errors ........ ................ r616507 | aconway | 2008-01-29 20:29:46 +0000 (Tue, 29 Jan 2008) | 2 lines Re-enabled build of cluster code when openais is installed. ................ r616511 | aconway | 2008-01-29 20:39:26 +0000 (Tue, 29 Jan 2008) | 1 line Added Observer to SessionManager for cluster use. ................ r616542 | rajith | 2008-01-29 22:24:40 +0000 (Tue, 29 Jan 2008) | 1 line added support to notify connection.close to the JMSExceptionListener ................ r616545 | rajith | 2008-01-29 22:30:20 +0000 (Tue, 29 Jan 2008) | 1 line I accidently commited some modifications done for a quick test, I reversed those changes ................ r616715 | arnaudsimon | 2008-01-30 10:38:11 +0000 (Wed, 30 Jan 2008) | 1 line changed default port value for tcp ................ r616736 | gsim | 2008-01-30 12:18:53 +0000 (Wed, 30 Jan 2008) | 3 lines Parse out the userid and password from the response; a small step on the road to authentication. ................ r616927 | aconway | 2008-01-30 22:32:17 +0000 (Wed, 30 Jan 2008) | 4 lines Remove Socket param from Connection constructor, pass a string id instead. ................ r616929 | aconway | 2008-01-30 22:34:50 +0000 (Wed, 30 Jan 2008) | 6 lines From Ted Ross, https://issues.apache.org/jira/browse/QPID-767 Bugfix: --load-dir rejected path-name-elements beginning with or ending with '.' (boost 1.33 only) ................ r617188 | gsim | 2008-01-31 18:50:46 +0000 (Thu, 31 Jan 2008) | 4 lines Make ports accesible through socket interface. Add local port to each logged frame in the client Connector ................ r617286 | aconway | 2008-01-31 23:14:49 +0000 (Thu, 31 Jan 2008) | 3 lines Generate URLs for local host. ................ r617303 | rhs | 2008-01-31 23:50:43 +0000 (Thu, 31 Jan 2008) | 1 line fixed svn:executable and svn:eol-style ................ r617305 | rhs | 2008-01-31 23:52:15 +0000 (Thu, 31 Jan 2008) | 1 line avoid use of Throwable.initCause(e) that is illegal with java 1.6 ................ r617320 | rhs | 2008-02-01 01:03:46 +0000 (Fri, 01 Feb 2008) | 1 line ant build system ................ r617321 | rhs | 2008-02-01 01:25:27 +0000 (Fri, 01 Feb 2008) | 1 line futzed with deps for faster build, and fixed bug in manifest generation ................ r617510 | rhs | 2008-02-01 14:49:33 +0000 (Fri, 01 Feb 2008) | 1 line added junit dependency ................ r617511 | rhs | 2008-02-01 14:53:52 +0000 (Fri, 01 Feb 2008) | 1 line fixed typo ................ r617513 | rhs | 2008-02-01 14:59:49 +0000 (Fri, 01 Feb 2008) | 1 line more dependency fixes ................ r617524 | rhs | 2008-02-01 15:46:44 +0000 (Fri, 01 Feb 2008) | 1 line added support for running one test, and made test output pretty ................ r617527 | rhs | 2008-02-01 15:55:30 +0000 (Fri, 01 Feb 2008) | 1 line less confusing options ................ r617533 | aconway | 2008-02-01 16:03:02 +0000 (Fri, 01 Feb 2008) | 9 lines Added cluster URL configuration, defaults to all interfaces. src/qpid/Plugin.h - added doxygen src/qpid/Url.cpp,.h - cache string rep, op==, istream/ostream ops. src/qpid/broker/Broker.h,.cpp - removed getUrl() src/qpid/cluster/Cluster.h,.cpp - use Url class src/qpid/cluster/ClusterPlugin.cpp - added --url configuration. ................ r617543 | rhs | 2008-02-01 16:16:22 +0000 (Fri, 01 Feb 2008) | 1 line more dependency fixes ................ r617556 | rhs | 2008-02-01 16:58:16 +0000 (Fri, 01 Feb 2008) | 1 line fixed build order ................ r617582 | aconway | 2008-02-01 18:02:42 +0000 (Fri, 01 Feb 2008) | 8 lines Cluster code fixed for changes in codebase. - Using SessionManager::Observer - Better ais test setup, only need to be member of ais group. - Update cluster_client - SessionState holds handler chains. - Cluster frames include next handler ptr. ................ r617590 | gsim | 2008-02-01 18:21:01 +0000 (Fri, 01 Feb 2008) | 3 lines Initial cut of inter-broker bridging ................ r617592 | gsim | 2008-02-01 18:26:08 +0000 (Fri, 01 Feb 2008) | 3 lines Oops, missed makefile in last commit. ................ r617594 | gsim | 2008-02-01 18:27:23 +0000 (Fri, 01 Feb 2008) | 3 lines Missed a couple of new files in previous commit. ................ r617596 | gsim | 2008-02-01 18:28:14 +0000 (Fri, 01 Feb 2008) | 3 lines Use 'guest' as default uid and password for tests. ................ r617597 | aconway | 2008-02-01 18:28:46 +0000 (Fri, 01 Feb 2008) | 1 line svn:ignore properties. ................ r617607 | gsim | 2008-02-01 18:54:27 +0000 (Fri, 01 Feb 2008) | 3 lines Remove includes to files no longer generated. ................ r617662 | rhs | 2008-02-01 21:43:20 +0000 (Fri, 01 Feb 2008) | 1 line added a detailed help target and cleaned up descriptions ................ r618412 | cctrieloff | 2008-02-04 20:11:39 +0000 (Mon, 04 Feb 2008) | 1 line add interval pu/sub options to test many concurrent connections ................ r618428 | cctrieloff | 2008-02-04 20:38:19 +0000 (Mon, 04 Feb 2008) | 1 line Turn management on by defualt ................ r618436 | rhs | 2008-02-04 21:05:39 +0000 (Mon, 04 Feb 2008) | 1 line set up logging for tests ................ r618437 | rhs | 2008-02-04 21:06:35 +0000 (Mon, 04 Feb 2008) | 1 line added exception callback to ConnectonDelegate ................ r618450 | rhs | 2008-02-04 21:43:01 +0000 (Mon, 04 Feb 2008) | 1 line clear the report directory before generating test report ................ r618462 | rhs | 2008-02-04 22:11:22 +0000 (Mon, 04 Feb 2008) | 1 line added a TransportException ................ r618519 | rajith | 2008-02-05 02:37:13 +0000 (Tue, 05 Feb 2008) | 2 lines Added code to connect the network error exceptions to the JMS Exception listener. ................ r618770 | aconway | 2008-02-05 20:44:14 +0000 (Tue, 05 Feb 2008) | 16 lines Added testSendToSelf for https://bugzilla.redhat.com/show_bug.cgi?id=410551 M src/tests/ClientSessionTest.cpp Disabled management for BrokerFixture - management singleton assumes only one broker per process, causes shutdown races with fixtures. M src/tests/BrokerFixture.h Made Timer::stop() idempotent M src/qpid/broker/Timer.cpp M src/qpid/broker/Timer.h Added STL-style size() and empty() M src/qpid/sys/BlockingQueue.h M src/qpid/client/LocalQueue.cpp M src/qpid/client/LocalQueue.h ................ r618982 | arnaudsimon | 2008-02-06 12:47:27 +0000 (Wed, 06 Feb 2008) | 1 line Changed for using AUTO_ACK session ................ r618984 | arnaudsimon | 2008-02-06 12:52:19 +0000 (Wed, 06 Feb 2008) | 1 line Byte message were losing their payload see QPI-779 ................ r618986 | arnaudsimon | 2008-02-06 12:56:20 +0000 (Wed, 06 Feb 2008) | 1 line QPID-777 and QPID-778 ................ r618989 | arnaudsimon | 2008-02-06 13:40:38 +0000 (Wed, 06 Feb 2008) | 1 line Changed session mode to AUTO_ACK so mesages are removed from broker between two tests. ................ r619012 | arnaudsimon | 2008-02-06 15:14:42 +0000 (Wed, 06 Feb 2008) | 1 line Changed for using Window mode see QPID-778 ................ r619043 | arnaudsimon | 2008-02-06 16:00:22 +0000 (Wed, 06 Feb 2008) | 1 line Added close logic for releasing pre-fetched messages, see QPID-778 ................ r619148 | aconway | 2008-02-06 20:49:05 +0000 (Wed, 06 Feb 2008) | 8 lines Replaced --enable-cluster option with --with-cpg to enable/disable CPG. make rpmbuild uses --with-cpg, will fail unless openais is installed. Normal builds respect explicit --with/--without, or use CPG if installed when neither is specified. ................ r619182 | rhs | 2008-02-06 22:13:31 +0000 (Wed, 06 Feb 2008) | 1 line added close notification ................ r619189 | rajith | 2008-02-06 22:36:02 +0000 (Wed, 06 Feb 2008) | 2 lines Added code to pass in the throwable to the closedListener so that it can be included in the JMS Exception thrown via the ExceptionListener ................ r619192 | rajith | 2008-02-06 22:38:12 +0000 (Wed, 06 Feb 2008) | 2 lines Removed the System.outs from the code. ................ r619200 | aconway | 2008-02-06 22:49:10 +0000 (Wed, 06 Feb 2008) | 12 lines From Ted Ross, https://issues.apache.org/jira/browse/QPID-780 Implementation of --data-dir for qpidd. Additions by myself: - set QPID_DATA_DIR= in test env so existing tests can run with no data dir. - src/Makefile.am and qpidc.spec.in install /var/lib/qpidd directory. NOTE: qpidd with no optoins will now FAIL if it cannot write /var/lib/qpidd. Start it with --data-dir= or set QPID_DATA_DIR= in your environement to run with no data directory. ................ r619204 | rhs | 2008-02-06 22:50:54 +0000 (Wed, 06 Feb 2008) | 1 line fixed log level defaults ................ r619424 | aconway | 2008-02-07 14:25:32 +0000 (Thu, 07 Feb 2008) | 2 lines Quote unprintable control characters in log output. ................ r619538 | rhs | 2008-02-07 18:15:20 +0000 (Thu, 07 Feb 2008) | 1 line added test for exception listener; fixed NPE ................ r619604 | aconway | 2008-02-07 19:51:01 +0000 (Thu, 07 Feb 2008) | 3 lines qpidc.spec.in: Build --without-cpg, no point adding the dependency till cluster functionality is available. ................ r619611 | aconway | 2008-02-07 20:02:14 +0000 (Thu, 07 Feb 2008) | 2 lines Disable QPID_DATA_DIR for verify tests. ................ r619626 | nsantos | 2008-02-07 20:18:54 +0000 (Thu, 07 Feb 2008) | 1 line create missing dir ................ r619636 | aconway | 2008-02-07 21:06:01 +0000 (Thu, 07 Feb 2008) | 2 lines Removed signal-unsafe code from shutdown handler. ................ r619646 | aconway | 2008-02-07 21:31:21 +0000 (Thu, 07 Feb 2008) | 3 lines Clean shutdown of broker: Moved signal unsafe code from Broker::shutdown to ~Broker, moved shutdown logging from shutdown handler to main() in qpidd.cpp ................ r619888 | rhs | 2008-02-08 13:50:55 +0000 (Fri, 08 Feb 2008) | 1 line simplied QpidTestCase's interface for running non vm brokers and set up build system to run cpp tests easily ................ r619903 | aconway | 2008-02-08 15:01:30 +0000 (Fri, 08 Feb 2008) | 11 lines Refactored verify scripts, added verify for python Examples. To verify an example: <qpid-trunk>/bin/verify <example-dir> See comments in bin/verify for more details. Changes: - Each example dir has its own verify script and verify.in. - Added sys.stdout.flush() to som python examples so verify can tell when they're ready. - Made python examples svn:executable. - C++ examples/Makefile.am runs python examples ................ r619941 | aconway | 2008-02-08 17:02:55 +0000 (Fri, 08 Feb 2008) | 3 lines Added verify scripts to run mixed python/cpp examples. bin/verify_all runs all examples. ................ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@620468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/build.xml37
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java23
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java169
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java143
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java89
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java165
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java28
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java32
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java3
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java62
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java22
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java6
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java12
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java4
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java6
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java188
30 files changed, 646 insertions, 439 deletions
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml
new file mode 100644
index 0000000000..847c38b3eb
--- /dev/null
+++ b/qpid/java/client/build.xml
@@ -0,0 +1,37 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="AMQ Client" default="build">
+
+ <property name="module.depends" value="common"/>
+ <property name="module.test.depends" value="broker"/>
+
+ <import file="../module.xml"/>
+
+ <property name="output.dir" value="${module.precompiled}/org/apache/qpidity/filter/selector"/>
+
+ <target name="precompile">
+ <mkdir dir="${output.dir}"/>
+ <javacc target="src/main/grammar/SelectorParser.jj"
+ outputdirectory="${output.dir}"
+ javacchome="${project.root}/lib"/>
+ </target>
+
+</project>
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
index 9cbd40f9ea..b5f8dfda9b 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
@@ -25,6 +25,8 @@ import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -80,6 +82,14 @@ public class Producer
// create the connection
Connection connection = conFac.createConnection();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+
// Create a session on the connection
// This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index c2fb1b897a..43509e66c2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -116,7 +116,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private String _virtualHost;
- private ExceptionListener _exceptionListener;
+ protected ExceptionListener _exceptionListener;
private ConnectionListener _connectionListener;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 78090b45ad..bf1ed49492 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -11,11 +11,13 @@ import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.Session;
import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.ClosedListener;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener
{
/**
* This class logger.
@@ -109,6 +111,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
}
_qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(),
_conn.getUsername(), _conn.getPassword());
+ _qpidConnection.setClosedListener(this);
}
catch (QpidException e)
{
@@ -138,4 +141,22 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
}
}
+
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode());
+ }
+ if (_conn._exceptionListener != null)
+ {
+ JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode()));
+ if (t != null)
+ {
+ ex.initCause(t);
+ }
+
+ _conn._exceptionListener.onException(ex);
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 5983440f47..9a8470bece 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -20,6 +20,43 @@
*/
package org.apache.qpid.client;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
@@ -44,7 +81,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -53,44 +89,6 @@ import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
*
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -185,14 +183,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
* keeps a record of subscriptions which have been created in the current instance. It does not remember
* subscriptions between executions of the client.
*/
- private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
+ protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
/**
* Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
* up in the {@link #_subscriptions} map.
*/
- private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+ protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
@@ -200,7 +198,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue.
*/
- private final FlowControllingBlockingQueue _queue;
+ protected final FlowControllingBlockingQueue _queue;
/**
* Holds the highest received delivery tag.
@@ -279,10 +277,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
protected final boolean _immediatePrefetch;
/** Indicates that warnings should be generated on violations of the strict AMQP. */
- private final boolean _strictAMQP;
+ protected final boolean _strictAMQP;
/** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */
- private final boolean _strictAMQPFATAL;
+ protected final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
@@ -518,8 +516,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
if (_logger.isInfoEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- _logger.info("Closing session: " + this + ":"
- + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ _logger.info("Closing session: " + this); // + ":"
+ // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
synchronized (_connection.getFailoverMutex())
@@ -781,6 +779,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false, false);
}
+ public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -831,70 +837,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
false);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
+ public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
/** Note, currently this does not handle reuse of the same name with different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
@@ -1387,7 +1330,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
{
// in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
// in 0-9 we used the cleaner addition of a new sync recover method with its own ok
- if(getProtocolVersion().equals(ProtocolVersion.v8_0))
+ if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
@@ -1985,7 +1928,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
- private AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -2353,7 +2296,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
*
* @todo Be aware of possible changes to parameter order as versions change.
*/
- private void deleteQueue(final AMQShortString queueName) throws JMSException
+ protected void deleteQueue(final AMQShortString queueName) throws JMSException
{
try
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a63d94b4ca..dd2b9a2389 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -38,13 +39,12 @@ import org.apache.qpidity.transport.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
-import java.util.HashMap;
+import java.util.Iterator;
/**
* This is a 0.10 Session
@@ -146,6 +146,25 @@ public class AMQSession_0_10 extends AMQSession
//------- overwritten methods of class AMQSession
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkNotClosed();
+ checkValidTopic(topic);
+ if( _subscriptions.containsKey(name))
+ {
+ _subscriptions.get(name).close();
+ }
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
+ BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
/**
* Acknowledge one or many messages.
*
@@ -223,6 +242,25 @@ public class AMQSession_0_10 extends AMQSession
}
/**
+ * We need to release message that may be pre-fetched in the local queue
+ *
+ * @throws JMSException
+ */
+ public void close() throws JMSException
+ {
+ super.close();
+ // We need to release pre-fetched messages
+ Iterator messages=_queue.iterator();
+ while (messages.hasNext())
+ {
+ UnprocessedMessage message=(UnprocessedMessage) messages.next();
+ messages.remove();
+ rejectMessage(message, true);
+ }
+ }
+
+
+ /**
* Commit the receipt and the delivery of all messages exchanged by this session resources.
*/
public void sendCommit() throws AMQException, FailoverException
@@ -359,9 +397,17 @@ public class AMQSession_0_10 extends AMQSession
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
+ if(consumer.isStrated())
+ {
+ // set the flow
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+
+ }
getQpidSession().sync();
getCurrentException();
}
@@ -462,11 +508,11 @@ public class AMQSession_0_10 extends AMQSession
//only set if msg list is null
try
{
- if (consumer.getMessageListener() != null)
- {
+ // if (consumer.getMessageListener() != null)
+ // {
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE,
MAX_PREFETCH);
- }
+ // }
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
}
@@ -546,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession
*/
private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
synchronized (this)
{
@@ -579,8 +625,7 @@ public class AMQSession_0_10 extends AMQSession
void start() throws AMQException
{
-
- super.suspendChannel(false);
+ suspendChannel(false);
for(BasicMessageConsumer c: _consumers.values())
{
c.start();
@@ -592,16 +637,19 @@ public class AMQSession_0_10 extends AMQSession
}
}
- void stop() throws AMQException
+
+
+
+ void stop() throws AMQException
{
super.stop();
- for(BasicMessageConsumer c: _consumers.values())
+ for(BasicMessageConsumer c: _consumers.values())
{
c.stop();
}
}
- synchronized void startDistpatcherIfNecessary()
+ synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
if (!_immediatePrefetch)
@@ -622,4 +670,71 @@ public class AMQSession_0_10 extends AMQSession
startDistpatcherIfNecessary(false);
}
+
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic=checkValidTopic(topic);
+ AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
+
+ TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName=((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName=new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 87c9c95c55..28585b79be 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -21,9 +21,8 @@
package org.apache.qpid.client;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
@@ -132,7 +131,7 @@ public class AMQSession_0_8 extends AMQSession
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
- BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
@@ -142,12 +141,12 @@ public class AMQSession_0_8 extends AMQSession
// in 0-9 we used the cleaner addition of a new sync recover method with its own ok
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+ BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
}
else if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9))
{
- BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ BasicRecoverSyncBody body = ((MethodRegistry_0_9)getProtocolHandler().getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
}
else
@@ -166,7 +165,7 @@ public class AMQSession_0_8 extends AMQSession
_logger.debug("Rejecting delivery tag:" + deliveryTag);
}
- AMQFrame basicRejectBody = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
+ AMQFrame basicRejectBody = getProtocolHandler().getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
@@ -182,7 +181,7 @@ public class AMQSession_0_8 extends AMQSession
{
public AMQMethodEvent execute() throws AMQException, FailoverException
{
- AMQFrame boundFrame = getMethodRegistry().createExchangeBoundBody
+ AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
(exchangeName, routingKey, queueName).generateFrame(_channelId);
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
@@ -225,7 +224,7 @@ public class AMQSession_0_8 extends AMQSession
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
// TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame jmsConsume = getMethodRegistry().createBasicConsumeBody(getTicket(),
+ AMQFrame jmsConsume = getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
tag,
consumer.isNoLocal(),
@@ -247,7 +246,7 @@ public class AMQSession_0_8 extends AMQSession
public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
+ AMQFrame exchangeDeclare = getProtocolHandler().getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
@@ -255,14 +254,14 @@ public class AMQSession_0_8 extends AMQSession
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
{
- AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
+ AMQFrame queueDeclare = getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
{
- QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+ QueueDeleteBody body = getProtocolHandler().getMethodRegistry().createQueueDeleteBody(getTicket(),
queueName,
false,
false,
@@ -311,4 +310,70 @@ public class AMQSession_0_8 extends AMQSession
return new AMQTemporaryQueue(this);
}
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
index a036287f1f..88f7e550f4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
@@ -71,6 +71,13 @@ public class AMQTopic extends AMQDestination implements Topic
queueName, isDurable);
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName, boolean isDurable)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable );
+ }
+
+
public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
throws JMSException
{
@@ -79,6 +86,13 @@ public class AMQTopic extends AMQDestination implements Topic
true);
}
+ public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
+ getDurableTopicQueueName(subscriptionName, connection), false);
+ }
+
public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
{
return new AMQShortString(connection.getClientID() + ":" + subscriptionName);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 87eebdfce6..206e705ecf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -85,7 +85,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
protected MessageFactoryRegistry _messageFactory;
- private final AMQSession _session;
+ protected final AMQSession _session;
protected AMQProtocolHandler _protocolHandler;
@@ -434,7 +434,23 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- public abstract Object getMessageFromQueue(long l) throws InterruptedException;
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return o;
+ }
private boolean closeOnAutoClose() throws JMSException
{
@@ -1107,6 +1123,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
// do nothing as this is a 0_10 feature
}
+ public boolean isStrated()
+ {
+ // do nothing as this is a 0_10 feature
+ return false;
+ }
+
public AMQShortString getQueuename()
{
return _queuename;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 80b63c75c8..3534bade61 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,10 +19,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
@@ -41,8 +38,8 @@ import javax.jms.JMSException;
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.Iterator;
/**
* This is a 0.10 message consumer.
@@ -50,15 +47,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * A counter for keeping the number of available messages for this consumer
- */
- private final AtomicLong _messageCounter = new AtomicLong(0);
-
- /**
- * Number of received message so far
- */
- private final AtomicLong _messagesReceived = new AtomicLong(0);
/**
* This class logger
@@ -117,6 +105,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
// ----- Interface org.apache.qpidity.client.util.MessageListener
/**
+ *
+ * This is invoked by the session thread when emptying the session message queue.
+ * We first check if the message is valid (match the selector) and then deliver it to the
+ * message listener or to the sync consumer queue.
+ *
* @param jmsMessage this message has already been processed so can't redo preDeliver
* @param channelId
*/
@@ -136,12 +129,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
catch (Exception e1)
{
- // the receiver may be waiting for a message
- if (_messageCounter.get() >= 0)
- {
- _messageCounter.decrementAndGet();
- _synchronousQueue.add(new NullTocken());
- }
// we should silently log thie exception as it only hanppens when the connection is closed
_logger.error("Exception when receiving message", e1);
}
@@ -152,20 +139,15 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
}
+
+
+ /**
+ * This method is invoked by the transport layer when a message is delivered for this
+ * consumer. The message is transformed and pass to the session.
+ * @param message an 0.10 message
+ */
public void onMessage(Message message)
{
- if (isMessageListenerSet())
- {
- _messagesReceived.incrementAndGet();
- if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
- }
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -207,8 +189,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
newMessage.setReplyToURL(replyToUrl);
}
newMessage.setContentHeader(headers);
- // increase the counter of messages
- _messageCounter.incrementAndGet();
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -246,6 +226,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
//{
super.postDeliver(msg);
//}
+
+
}
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
@@ -351,50 +333,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
}
messageOk = acquireMessage(message);
}
- if (!messageOk)
- {
- requestCreditIfCreditMode();
- }
return messageOk;
}
- private void requestCreditIfCreditMode()
- {
- try
- {
- // the current message received is not good, so we need to get a message.
- if (getMessageListener() == null)
- {
- int oldval = _messageCounter.intValue();
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.intValue() <= oldval)
- {
- // we haven't received a message so tell the receiver to return null
- _synchronousQueue.add(new NullTocken());
- }
- else
- {
- _messageCounter.decrementAndGet();
- }
- }
- // we now need to check if we have received a message
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Error getting message listener, couldn't request credit after releasing a message that failed the selector test",
- e);
- }
- }
/**
* Acknowledge a message
@@ -469,16 +410,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
super.setMessageListener(messageListener);
if (messageListener == null)
{
- _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString());
_0_10session.getQpidSession()
.messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ */
}
else
{
+ //TODO: empty the list of sync messages.
if (_connection.started())
{
_0_10session.getQpidSession()
@@ -490,66 +433,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
- _messagesReceived.set(0);
- ;
- }
- }
- }
-
- public Object getMessageFromQueue(long l) throws InterruptedException
- {
- if (!_isStarted)
- {
- return null;
- }
- Object o;
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- if (l == 0)
- {
- o = _synchronousQueue.take();
- }
- else
- {
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else
- {
- o = _synchronousQueue.poll();
- }
- if (o == null)
- {
- _logger.debug("Message Didn't arrive in time, checking if one is inflight");
- // checking if one is inflight
- _0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.get() > 0)
- {
- o = _synchronousQueue.take();
- }
}
}
- if (o instanceof NullTocken)
- {
- o = null;
- }
- return o;
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ public boolean isStrated()
{
- _messageCounter.decrementAndGet();
- super.preApplicationProcessing(jmsMsg);
- }
-
- private class NullTocken
- {
-
+ return _isStarted;
}
public void start()
@@ -561,4 +451,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
{
_isStarted = false;
}
+
+ public void close() throws JMSException
+ {
+ super.close();
+ // release message that may be staged
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
+ }
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cb21f71c1a..1635c51573 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -86,22 +86,5 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
}
-
- public Object getMessageFromQueue(long l) throws InterruptedException
- {
- Object o;
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else if (l < 0)
- {
- o = _synchronousQueue.poll();
- }
- else
- {
- o = _synchronousQueue.take();
- }
- return o;
- }
+
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index eed9f21090..7eb56acb27 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -739,4 +739,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_consumer = basicMessageConsumer;
}
+ public void receivedFromServer()
+ {
+ _changedData = false;
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 083912d8ac..4eeb702703 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -140,7 +140,9 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
props.setType(mprop.getType());
props.setUserId(mprop.getUserId());
props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));
- return createMessage(messageNbr, data, exchange, routingKey, props);
+ AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);
+ message.receivedFromServer();
+ return message;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index c3a879177c..51a052bed5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -49,11 +49,33 @@ public class Client implements org.apache.qpidity.nclient.Connection
ConnectionDelegate connectionDelegate = new ConnectionDelegate()
{
+ private boolean receivedClose = false;
+
public SessionDelegate getSessionDelegate()
{
return new ClientSessionDelegate();
}
+ public void exception(Throwable t)
+ {
+ if (_closedListner != null)
+ {
+ _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t);
+ }
+ else
+ {
+ throw new RuntimeException("connection closed",t);
+ }
+ }
+
+ public void closed()
+ {
+ if (_closedListner != null && !this.receivedClose)
+ {
+ _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null);
+ }
+ }
+
@Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode());
@@ -67,8 +89,10 @@ public class Client implements org.apache.qpidity.nclient.Connection
}
else
{
- _closedListner.onClosed(errorCode, connectionClose.getReplyText());
+ _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null);
}
+
+ this.receivedClose = true;
}
};
@@ -79,6 +103,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
{
+ System.out.println("Using NIO");
if( _logger.isDebugEnabled())
{
_logger.debug("using NIO");
@@ -180,6 +205,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
public void setClosedListener(ClosedListener closedListner)
{
+
_closedListner = closedListner;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
index 5ca598d412..c0c6978a14 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
@@ -32,8 +32,8 @@ public interface ClosedListener
* informs the connection's ExceptionListener
* @param errorCode TODO
* @param reason TODO
- *
+ * @param t TODO
* @see Connection
*/
- public void onClosed(ErrorCode errorCode, String reason);
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t);
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
index 7eb482c26b..008b85e98a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
@@ -1,5 +1,7 @@
package org.apache.qpidity.nclient;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -16,17 +18,17 @@ public class JMSTestCase
try
{
- javax.jms.Connection con = new AMQConnection("qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672");
+ javax.jms.Connection con = new AMQConnection("qpid:password=pass;username=name@tcp:localhost:5672");
con.start();
javax.jms.Session ssn = con.createSession(false, 1);
javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test");
javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
- javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ //javax.jms.MessageProducer prod = ssn.createProducer(dest);
- //javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive();
- /* cons.setMessageListener(new MessageListener()
+ javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive();
+ cons.setMessageListener(new MessageListener()
{
public void onMessage(Message m)
{
@@ -41,9 +43,25 @@ public class JMSTestCase
}
}
- });*/
+ });
- javax.jms.TextMessage msg = ssn.createTextMessage();
+ con.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+
+ System.out.println("Waiting");
+ while (m == null)
+ {
+
+ }
+
+ System.out.println("Exiting");
+
+ /*javax.jms.TextMessage msg = ssn.createTextMessage();
msg.setText("This is a test message");
msg.setBooleanProperty("targetMessage", false);
prod.send(msg);
@@ -60,7 +78,7 @@ public class JMSTestCase
else
{
System.out.println("message is not null" + m);
- }
+ }*/
}
catch(Exception e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index 0a25ea3961..5fe38889d9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -189,7 +189,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
void notifyException(QpidException ex)
{
- _exceptionListner.onClosed(null, null);
+ _exceptionListner.onClosed(null, null, null);
}
Map<String,MessagePartListener> getMessageListerners()
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
index 96ec98a45a..810082fdd3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
@@ -40,7 +40,7 @@ public class DemoClient
Session ssn = conn.createSession(50000);
ssn.setClosedListener(new ClosedListener()
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
index 17081265aa..6a80575f4b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
@@ -43,7 +43,7 @@ public class LargeMsgDemoClient
Session ssn = conn.createSession(50000);
ssn.setClosedListener(new ClosedListener()
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
index 54765ea72e..3ddded2051 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
@@ -124,7 +124,7 @@ public class BasicInteropTest implements ClosedListener
session.sync();
}
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
System.out.println("------- Broker Notified an error --------");
System.out.println("------- " + errorCode + " --------");
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
index 5dc3b34a61..5694075ef4 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
@@ -1163,7 +1163,7 @@ public class SessionImpl implements Session
*/
private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
{
- public void onClosed(ErrorCode errorCode, String reason)
+ public void onClosed(ErrorCode errorCode, String reason, Throwable t)
{
synchronized (this)
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
index e2d3832cc9..3cef57f90d 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java
@@ -82,8 +82,9 @@ public class ConnectionStartTest extends QpidTestCase
protected void tearDown() throws Exception
{
_connection.close();
+ super.tearDown();
}
-
+
public void testSimpleReceiveConnection()
{
try
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
new file mode 100644
index 0000000000..ccf16a0b6e
--- /dev/null
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.connection;
+
+import org.apache.qpid.testutil.QpidTestCase;
+
+import org.apache.qpid.util.concurrent.Condition;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+/**
+ * ExceptionListenerTest
+ *
+ */
+
+public class ExceptionListenerTest extends QpidTestCase
+{
+
+ public void testBrokerDeath() throws Exception
+ {
+ Connection conn = getConnection("guest", "guest");
+
+ conn.start();
+
+ final Condition fired = new Condition();
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ fired.set();
+ }
+ });
+
+ stopBroker();
+
+ if (!fired.get(3000))
+ {
+ fail("exception listener was not fired");
+ }
+ }
+
+}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
index b2bf7fd7e4..f43ccaf0ff 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java
@@ -98,7 +98,7 @@ public class Client implements MessageListener
if (_count < _expected)
{
- wait(1000000000);
+ wait(60000);
}
if (_count < _expected)
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index 21890e1ae9..5ebde71d6c 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -20,13 +20,12 @@
*/
package org.apache.qpid.test.unit.close;
-import junit.framework.TestCase;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.testutil.QpidClientConnection;
+import org.apache.qpid.testutil.QpidTestCase;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
@@ -41,7 +40,7 @@ import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
-public class MessageRequeueTest extends TestCase
+public class MessageRequeueTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class);
@@ -64,7 +63,8 @@ public class MessageRequeueTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- conn = new QpidClientConnection(BROKER);
+
+ conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -78,7 +78,6 @@ public class MessageRequeueTest extends TestCase
protected void tearDown() throws Exception
{
- super.tearDown();
if (!passed) // clean up
{
@@ -91,6 +90,7 @@ public class MessageRequeueTest extends TestCase
conn.disconnect();
}
+ super.tearDown();
}
/**
@@ -125,7 +125,7 @@ public class MessageRequeueTest extends TestCase
if (messageLog[msgindex] != 0)
{
_logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag()
- + ") more than once.");
+ + ") more than once.");
}
if (_logger.isInfoEnabled())
@@ -144,16 +144,18 @@ public class MessageRequeueTest extends TestCase
msg = consumer.receive(1000);
}
- _logger.info("consuming done.");
+ _logger.info("consuming done.");
conn.getSession().commit();
consumer.close();
- assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
int index = 0;
StringBuilder list = new StringBuilder();
list.append("Failed to receive:");
int failed = 0;
+ _logger.info("consumed: " + messagesReceived);
+
+ assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
// wit 0_10 we can have a delivery tag of 0
if (conn.isBroker08())
{
@@ -174,7 +176,7 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString(), 0, failed);
}
- _logger.info("consumed: " + messagesReceived);
+
conn.disconnect();
passed = true;
}
@@ -208,7 +210,7 @@ public class MessageRequeueTest extends TestCase
}
catch (InterruptedException e)
{
- fail("Uanble to join to Consumer theads");
+ fail("Unable to join to Consumer theads");
}
_logger.info("consumer 1 count is " + c1.getCount());
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index fab419e831..b905591f19 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -106,7 +106,7 @@ public class TopicSessionTest extends QpidTestCase
AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown));
AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown));
- TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0");
TopicPublisher publisher = session1.createPublisher(null);
@@ -144,11 +144,11 @@ public class TopicSessionTest extends QpidTestCase
AMQConnection con1 = (AMQConnection) getConnection("guest", "guest");
AMQTopic topic = new AMQTopic(con1, "MyTopic3");
- TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session1.createPublisher(topic);
AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE);
+ TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0");
con2.start();
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 2dfca12f87..da8e189c84 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -467,6 +467,18 @@ public class CommitRollbackTest extends QpidTestCase
}
result = _consumer.receive(1000);
+
+ if (isBroker08())
+ {
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ // assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+ }
+ else
+ {
+ assertNull("test message was consumed and not rolled back, but is redelivered", result);
+ }
+
+ result = _consumer.receive(1000);
assertNull("test message should be null:" + result, result);
_session.commit();
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 93f80645d5..1339cf9060 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -89,7 +89,7 @@ public class TransactedTest extends QpidTestCase
prepCon = (AMQConnection) getConnection("guest", "guest");
_logger.info("Create prep session");
- prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ prepSession = prepCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
_logger.info("Create prep producer to Q1");
prepProducer1 = prepSession.createProducer(queue1);
@@ -100,7 +100,7 @@ public class TransactedTest extends QpidTestCase
_logger.info("Create test connection");
testCon = (AMQConnection) getConnection("guest", "guest");
_logger.info("Create test session");
- testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ testSession = testCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE);
_logger.info("Create test consumer of q2");
testConsumer2 = testSession.createConsumer(queue2);
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
index f9c830ddae..e99a51e1c7 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -81,11 +81,8 @@ public class QpidClientConnection extends QpidTestCase implements ExceptionListe
String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
try
{
- AMQConnectionFactory factory = new AMQConnectionFactory(brokerUrl);
_logger.info("connecting to Qpid :" + brokerUrl);
- //connection = factory.createConnection();
- setUp();
- connection = getConnection("guest", "guest") ;
+ connection = getConnection("guest", "guest") ;
// register exception listener
connection.setExceptionListener(this);
@@ -112,7 +109,6 @@ public class QpidClientConnection extends QpidTestCase implements ExceptionListe
connection.close();
connected = false;
_logger.info("disconnected");
- tearDown();
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
index 8ab381db32..e7c09fca65 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java
@@ -21,8 +21,8 @@ import junit.framework.TestCase;
import javax.jms.Connection;
import javax.naming.InitialContext;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
+import java.io.InputStream;
+import java.io.IOException;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.AMQConnection;
@@ -39,117 +39,101 @@ import org.slf4j.LoggerFactory;
public class QpidTestCase extends TestCase
{
- /* this clas logger */
private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class);
- /* Test properties */
- private static final String SHEL = "broker_shel";
- private static final String BROKER_PATH = "broker_path";
- private static final String BROKER_PARAM = "broker_param";
- private static final String BROKER_VERSION = "broker_version";
- public static final String BROKER_08 = "08";
- private static final String BROKER_VM = "vm";
- private static final String EXT_BROKER = "ext" ;
- /**
- * The process where the remote broker is running.
- */
- private Process _brokerProcess;
+ // system properties
+ private static final String BROKER = "broker";
+ private static final String BROKER_VERSION = "broker.version";
+
+ // values
+ private static final String VM = "vm";
+ private static final String EXTERNAL = "external";
+ private static final String VERSION_08 = "0-8";
+ private static final String VERSION_010 = "0-10";
- /* The test property values */
- // The default broker is an in-VM one
- private String _shel = BROKER_VM;
- private String _brokerPath = "";
- private String _brokerParams = "";
- private String _brokerVersion = "08" ;
+ private String _broker = System.getProperty(BROKER, VM);
+ private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08);
+
+ private Process _brokerProcess;
- /* The broker communication objects */
private InitialContext _initialContext;
private AMQConnectionFactory _connectionFactory;
- //--------- JUnit support
-
protected void setUp() throws Exception
{
super.setUp();
- // get the propeties if they are set
- if (System.getProperties().containsKey(BROKER_VERSION ))
- {
- _brokerVersion = System.getProperties().getProperty(BROKER_VERSION );
- }
- if (System.getProperties().containsKey(SHEL))
- {
- _shel = System.getProperties().getProperty(SHEL);
- }
- if (System.getProperties().containsKey(BROKER_PATH))
- {
- _brokerPath = System.getProperties().getProperty(BROKER_PATH);
- }
- if (System.getProperties().containsKey(BROKER_PARAM))
- {
- _brokerParams = System.getProperties().getProperty(BROKER_PARAM);
- }
- if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) )
- {
- // start a new broker
- startBroker();
- }
- else if ( ! _shel.equals(EXT_BROKER) )
- {
- // create an in_VM broker
- TransportConnection.createVMBroker(1);
- }
- _logger.info("=========================================");
- _logger.info("broker version " + _brokerVersion + " ==== " + _shel + " " + _brokerPath + " " + _brokerParams);
+ startBroker();
}
- /**
- * This method _is invoked after each test case.
- *
- * @throws Exception
- */
protected void tearDown() throws Exception
{
- killBroker();
- super.tearDown();
+ stopBroker();
+ super.tearDown();
}
- public void killBroker()
+ public void startBroker() throws Exception
{
- _logger.info("Kill broker");
- if (_brokerProcess != null)
+ if (_broker.equals(VM))
{
- // destroy the currently running broker
- _brokerProcess.destroy();
- _brokerProcess = null;
+ // create an in_VM broker
+ TransportConnection.createVMBroker(1);
}
- else if ( ! _shel.equals(EXT_BROKER))
- {
- TransportConnection.killAllVMBrokers();
+ else if (!_broker.equals(EXTERNAL))
+ {
+ _logger.info("starting broker: " + _broker);
+ ProcessBuilder pb = new ProcessBuilder(_broker.split("\\s+"));
+ pb.redirectErrorStream(true);
+ _brokerProcess = pb.start();
+
+ new Thread()
+ {
+ private InputStream in = _brokerProcess.getInputStream();
+
+ public void run()
+ {
+ try
+ {
+ byte[] buf = new byte[4*1024];
+ int n;
+ while ((n = in.read(buf)) != -1)
+ {
+ System.out.write(buf, 0, n);
+ }
+ }
+ catch (IOException e)
+ {
+ _logger.info("redirector", e);
+ }
+ }
+ }.start();
+
+ Thread.sleep(1000);
+
+ try
+ {
+ int exit = _brokerProcess.exitValue();
+ throw new RuntimeException("broker aborted: " + exit);
+ }
+ catch (IllegalThreadStateException e)
+ {
+ // this is expect if the broker started succesfully
+ }
}
}
- //--------- Util method
-
- /**
- * This method starts a remote server by spawning an external process.
- *
- * @throws Exception If the broker cannot be started
- */
- public void startBroker() throws Exception
+ public void stopBroker() throws Exception
{
- _logger.info("Starting broker: " + _shel + " " + _brokerPath + " " + _brokerParams + "");
- Runtime rt = Runtime.getRuntime();
- _brokerProcess = rt.exec(_shel + " " + _brokerPath + " " + _brokerParams + "");
- BufferedReader reader = new BufferedReader(new InputStreamReader(_brokerProcess.getInputStream()));
- if (reader.ready())
+ _logger.info("stopping broker: " + _broker);
+ if (_brokerProcess != null)
{
- //bad, we had an error starting the broker
- throw new Exception("Problem when starting the broker: " + reader.readLine());
+ _brokerProcess.destroy();
+ _brokerProcess.waitFor();
+ _logger.info("broker exited: " + _brokerProcess.exitValue());
+ _brokerProcess = null;
}
- // We need to wait for th ebroker to start ideally we would need to ping it
- synchronized(this)
+ else if (_broker.equals(VM))
{
- this.wait(1000);
+ TransportConnection.killAllVMBrokers();
}
}
@@ -159,28 +143,18 @@ public class QpidTestCase extends TestCase
*/
public boolean isBroker08()
{
- return _brokerVersion.equals(BROKER_08);
+ return _brokerVersion.equals(VERSION_08);
}
- /**
- * Stop the currently running broker.
- */
- public void stopBroker()
+ public boolean isBroker010()
{
- _logger.info("Stopping broker");
- // stooping the broker
- if (_brokerProcess != null)
- {
- _brokerProcess.destroy();
- }
- _initialContext = null;
- _connectionFactory = null;
+ return _brokerVersion.equals(VERSION_010);
}
- public void shutdownServer() throws Exception
+ public void shutdownServer() throws Exception
{
- killBroker();
- setUp();
+ stopBroker();
+ startBroker();
}
/**
* we assume that the environment is correctly set
@@ -228,7 +202,7 @@ public class QpidTestCase extends TestCase
{
_logger.info("get Connection");
Connection con;
- if (_shel.equals(BROKER_VM))
+ if (_broker.equals(VM))
{
con = new AMQConnection("vm://:1", username, password, "Test", "test");
}
@@ -243,7 +217,7 @@ public class QpidTestCase extends TestCase
{
_logger.info("get Connection");
Connection con;
- if (_shel.equals(BROKER_VM))
+ if (_broker.equals(VM))
{
con = new AMQConnection("vm://:1", username, password, id, "test");
}
@@ -254,8 +228,4 @@ public class QpidTestCase extends TestCase
return con;
}
- public void testfoo()
- {
- //do nothing, just to avoid maven to report an error
- }
}