diff options
author | Aidan Skinner <aidan@apache.org> | 2008-02-11 12:11:03 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-02-11 12:11:03 +0000 |
commit | 72f22f941942f2bc811caf7c1d1aca7dd8457984 (patch) | |
tree | 2db6dfac1159ab74b100054009c6eff83ec24a99 /qpid/java/client | |
parent | fa99183557ada9117f23290cdee0e5b05004bc47 (diff) | |
download | qpid-python-72f22f941942f2bc811caf7c1d1aca7dd8457984.tar.gz |
Merged revisions 615958,615968,616353,616396,616402,616404,616445,616454,616507,616511,616542,616545,616715,616736,616927,616929,617188,617286,617303,617305,617320-617321,617510-617511,617513,617524,617527,617533,617543,617556,617582,617590,617592,617594,617596-617597,617607,617662,618412,618428,618436-618437,618450,618462,618519,618770,618982,618984,618986,618989,619012,619043,619148,619182,619189,619192,619200,619204,619424,619538,619604,619611,619626,619636,619646,619888,619903,619941 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/trunk
................
r615958 | aconway | 2008-01-28 17:17:06 +0000 (Mon, 28 Jan 2008) | 1 line
Convert ClientSessionTest to boost.
................
r615968 | aconway | 2008-01-28 17:42:22 +0000 (Mon, 28 Jan 2008) | 1 line
Added disabled test and FIXME note to fix client-side race.
................
r616353 | aconway | 2008-01-29 14:48:59 +0000 (Tue, 29 Jan 2008) | 9 lines
Deleted unused classes, adjusted files that still mention them.
D src/qpid/framing/ChannelAdapter.cpp
D src/qpid/framing/ChannelAdapter.h
D src/qpid/framing/HandlerUpdater.h
D src/tests/BrokerChannelTest.cpp
D src/tests/MockChannel.h
................
r616396 | aconway | 2008-01-29 15:45:29 +0000 (Tue, 29 Jan 2008) | 3 lines
Provide public read-access to IListNode pointers, so frame handlers
can use then to find the next frame.
................
r616402 | aconway | 2008-01-29 15:48:35 +0000 (Tue, 29 Jan 2008) | 2 lines
Remove references to defunct broker::ChannelHandler
................
r616404 | aconway | 2008-01-29 15:49:55 +0000 (Tue, 29 Jan 2008) | 2 lines
Log peer address with SEND/RECV messages.
................
r616445 | aidan | 2008-01-29 16:38:53 +0000 (Tue, 29 Jan 2008) | 3 lines
Initialized merge tracking via "svnmerge" with revisions "1-616438" from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
................
r616454 | aidan | 2008-01-29 17:25:32 +0000 (Tue, 29 Jan 2008) | 13 lines
Merged revisions 579126,579137 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r579126 | rgreig | 2007-09-25 09:42:53 +0100 (Tue, 25 Sep 2007) | 2 lines
QPID-582: fix some 1.6 compile errors
........
r579137 | rgreig | 2007-09-25 10:00:34 +0100 (Tue, 25 Sep 2007) | 2 lines
QPID-582 fix Java 6 compile errors
........
................
r616507 | aconway | 2008-01-29 20:29:46 +0000 (Tue, 29 Jan 2008) | 2 lines
Re-enabled build of cluster code when openais is installed.
................
r616511 | aconway | 2008-01-29 20:39:26 +0000 (Tue, 29 Jan 2008) | 1 line
Added Observer to SessionManager for cluster use.
................
r616542 | rajith | 2008-01-29 22:24:40 +0000 (Tue, 29 Jan 2008) | 1 line
added support to notify connection.close to the JMSExceptionListener
................
r616545 | rajith | 2008-01-29 22:30:20 +0000 (Tue, 29 Jan 2008) | 1 line
I accidently commited some modifications done for a quick test, I reversed those changes
................
r616715 | arnaudsimon | 2008-01-30 10:38:11 +0000 (Wed, 30 Jan 2008) | 1 line
changed default port value for tcp
................
r616736 | gsim | 2008-01-30 12:18:53 +0000 (Wed, 30 Jan 2008) | 3 lines
Parse out the userid and password from the response; a small step on the road to authentication.
................
r616927 | aconway | 2008-01-30 22:32:17 +0000 (Wed, 30 Jan 2008) | 4 lines
Remove Socket param from Connection constructor, pass a string id instead.
................
r616929 | aconway | 2008-01-30 22:34:50 +0000 (Wed, 30 Jan 2008) | 6 lines
From Ted Ross, https://issues.apache.org/jira/browse/QPID-767
Bugfix: --load-dir rejected path-name-elements beginning with or ending with '.' (boost 1.33 only)
................
r617188 | gsim | 2008-01-31 18:50:46 +0000 (Thu, 31 Jan 2008) | 4 lines
Make ports accesible through socket interface.
Add local port to each logged frame in the client Connector
................
r617286 | aconway | 2008-01-31 23:14:49 +0000 (Thu, 31 Jan 2008) | 3 lines
Generate URLs for local host.
................
r617303 | rhs | 2008-01-31 23:50:43 +0000 (Thu, 31 Jan 2008) | 1 line
fixed svn:executable and svn:eol-style
................
r617305 | rhs | 2008-01-31 23:52:15 +0000 (Thu, 31 Jan 2008) | 1 line
avoid use of Throwable.initCause(e) that is illegal with java 1.6
................
r617320 | rhs | 2008-02-01 01:03:46 +0000 (Fri, 01 Feb 2008) | 1 line
ant build system
................
r617321 | rhs | 2008-02-01 01:25:27 +0000 (Fri, 01 Feb 2008) | 1 line
futzed with deps for faster build, and fixed bug in manifest generation
................
r617510 | rhs | 2008-02-01 14:49:33 +0000 (Fri, 01 Feb 2008) | 1 line
added junit dependency
................
r617511 | rhs | 2008-02-01 14:53:52 +0000 (Fri, 01 Feb 2008) | 1 line
fixed typo
................
r617513 | rhs | 2008-02-01 14:59:49 +0000 (Fri, 01 Feb 2008) | 1 line
more dependency fixes
................
r617524 | rhs | 2008-02-01 15:46:44 +0000 (Fri, 01 Feb 2008) | 1 line
added support for running one test, and made test output pretty
................
r617527 | rhs | 2008-02-01 15:55:30 +0000 (Fri, 01 Feb 2008) | 1 line
less confusing options
................
r617533 | aconway | 2008-02-01 16:03:02 +0000 (Fri, 01 Feb 2008) | 9 lines
Added cluster URL configuration, defaults to all interfaces.
src/qpid/Plugin.h - added doxygen
src/qpid/Url.cpp,.h - cache string rep, op==, istream/ostream ops.
src/qpid/broker/Broker.h,.cpp - removed getUrl()
src/qpid/cluster/Cluster.h,.cpp - use Url class
src/qpid/cluster/ClusterPlugin.cpp - added --url configuration.
................
r617543 | rhs | 2008-02-01 16:16:22 +0000 (Fri, 01 Feb 2008) | 1 line
more dependency fixes
................
r617556 | rhs | 2008-02-01 16:58:16 +0000 (Fri, 01 Feb 2008) | 1 line
fixed build order
................
r617582 | aconway | 2008-02-01 18:02:42 +0000 (Fri, 01 Feb 2008) | 8 lines
Cluster code fixed for changes in codebase.
- Using SessionManager::Observer
- Better ais test setup, only need to be member of ais group.
- Update cluster_client
- SessionState holds handler chains.
- Cluster frames include next handler ptr.
................
r617590 | gsim | 2008-02-01 18:21:01 +0000 (Fri, 01 Feb 2008) | 3 lines
Initial cut of inter-broker bridging
................
r617592 | gsim | 2008-02-01 18:26:08 +0000 (Fri, 01 Feb 2008) | 3 lines
Oops, missed makefile in last commit.
................
r617594 | gsim | 2008-02-01 18:27:23 +0000 (Fri, 01 Feb 2008) | 3 lines
Missed a couple of new files in previous commit.
................
r617596 | gsim | 2008-02-01 18:28:14 +0000 (Fri, 01 Feb 2008) | 3 lines
Use 'guest' as default uid and password for tests.
................
r617597 | aconway | 2008-02-01 18:28:46 +0000 (Fri, 01 Feb 2008) | 1 line
svn:ignore properties.
................
r617607 | gsim | 2008-02-01 18:54:27 +0000 (Fri, 01 Feb 2008) | 3 lines
Remove includes to files no longer generated.
................
r617662 | rhs | 2008-02-01 21:43:20 +0000 (Fri, 01 Feb 2008) | 1 line
added a detailed help target and cleaned up descriptions
................
r618412 | cctrieloff | 2008-02-04 20:11:39 +0000 (Mon, 04 Feb 2008) | 1 line
add interval pu/sub options to test many concurrent connections
................
r618428 | cctrieloff | 2008-02-04 20:38:19 +0000 (Mon, 04 Feb 2008) | 1 line
Turn management on by defualt
................
r618436 | rhs | 2008-02-04 21:05:39 +0000 (Mon, 04 Feb 2008) | 1 line
set up logging for tests
................
r618437 | rhs | 2008-02-04 21:06:35 +0000 (Mon, 04 Feb 2008) | 1 line
added exception callback to ConnectonDelegate
................
r618450 | rhs | 2008-02-04 21:43:01 +0000 (Mon, 04 Feb 2008) | 1 line
clear the report directory before generating test report
................
r618462 | rhs | 2008-02-04 22:11:22 +0000 (Mon, 04 Feb 2008) | 1 line
added a TransportException
................
r618519 | rajith | 2008-02-05 02:37:13 +0000 (Tue, 05 Feb 2008) | 2 lines
Added code to connect the network error exceptions to the JMS Exception listener.
................
r618770 | aconway | 2008-02-05 20:44:14 +0000 (Tue, 05 Feb 2008) | 16 lines
Added testSendToSelf for https://bugzilla.redhat.com/show_bug.cgi?id=410551
M src/tests/ClientSessionTest.cpp
Disabled management for BrokerFixture - management singleton assumes
only one broker per process, causes shutdown races with fixtures.
M src/tests/BrokerFixture.h
Made Timer::stop() idempotent
M src/qpid/broker/Timer.cpp
M src/qpid/broker/Timer.h
Added STL-style size() and empty()
M src/qpid/sys/BlockingQueue.h
M src/qpid/client/LocalQueue.cpp
M src/qpid/client/LocalQueue.h
................
r618982 | arnaudsimon | 2008-02-06 12:47:27 +0000 (Wed, 06 Feb 2008) | 1 line
Changed for using AUTO_ACK session
................
r618984 | arnaudsimon | 2008-02-06 12:52:19 +0000 (Wed, 06 Feb 2008) | 1 line
Byte message were losing their payload see QPI-779
................
r618986 | arnaudsimon | 2008-02-06 12:56:20 +0000 (Wed, 06 Feb 2008) | 1 line
QPID-777 and QPID-778
................
r618989 | arnaudsimon | 2008-02-06 13:40:38 +0000 (Wed, 06 Feb 2008) | 1 line
Changed session mode to AUTO_ACK so mesages are removed from broker between two tests.
................
r619012 | arnaudsimon | 2008-02-06 15:14:42 +0000 (Wed, 06 Feb 2008) | 1 line
Changed for using Window mode see QPID-778
................
r619043 | arnaudsimon | 2008-02-06 16:00:22 +0000 (Wed, 06 Feb 2008) | 1 line
Added close logic for releasing pre-fetched messages, see QPID-778
................
r619148 | aconway | 2008-02-06 20:49:05 +0000 (Wed, 06 Feb 2008) | 8 lines
Replaced --enable-cluster option with --with-cpg to enable/disable CPG.
make rpmbuild uses --with-cpg, will fail unless openais is installed.
Normal builds respect explicit --with/--without, or use CPG if
installed when neither is specified.
................
r619182 | rhs | 2008-02-06 22:13:31 +0000 (Wed, 06 Feb 2008) | 1 line
added close notification
................
r619189 | rajith | 2008-02-06 22:36:02 +0000 (Wed, 06 Feb 2008) | 2 lines
Added code to pass in the throwable to the closedListener so that it can be included in the JMS Exception thrown via the ExceptionListener
................
r619192 | rajith | 2008-02-06 22:38:12 +0000 (Wed, 06 Feb 2008) | 2 lines
Removed the System.outs from the code.
................
r619200 | aconway | 2008-02-06 22:49:10 +0000 (Wed, 06 Feb 2008) | 12 lines
From Ted Ross, https://issues.apache.org/jira/browse/QPID-780
Implementation of --data-dir for qpidd.
Additions by myself:
- set QPID_DATA_DIR= in test env so existing tests can run with no data dir.
- src/Makefile.am and qpidc.spec.in install /var/lib/qpidd directory.
NOTE: qpidd with no optoins will now FAIL if it cannot write /var/lib/qpidd.
Start it with --data-dir= or set QPID_DATA_DIR= in your environement to
run with no data directory.
................
r619204 | rhs | 2008-02-06 22:50:54 +0000 (Wed, 06 Feb 2008) | 1 line
fixed log level defaults
................
r619424 | aconway | 2008-02-07 14:25:32 +0000 (Thu, 07 Feb 2008) | 2 lines
Quote unprintable control characters in log output.
................
r619538 | rhs | 2008-02-07 18:15:20 +0000 (Thu, 07 Feb 2008) | 1 line
added test for exception listener; fixed NPE
................
r619604 | aconway | 2008-02-07 19:51:01 +0000 (Thu, 07 Feb 2008) | 3 lines
qpidc.spec.in: Build --without-cpg, no point adding the dependency till cluster
functionality is available.
................
r619611 | aconway | 2008-02-07 20:02:14 +0000 (Thu, 07 Feb 2008) | 2 lines
Disable QPID_DATA_DIR for verify tests.
................
r619626 | nsantos | 2008-02-07 20:18:54 +0000 (Thu, 07 Feb 2008) | 1 line
create missing dir
................
r619636 | aconway | 2008-02-07 21:06:01 +0000 (Thu, 07 Feb 2008) | 2 lines
Removed signal-unsafe code from shutdown handler.
................
r619646 | aconway | 2008-02-07 21:31:21 +0000 (Thu, 07 Feb 2008) | 3 lines
Clean shutdown of broker: Moved signal unsafe code from Broker::shutdown
to ~Broker, moved shutdown logging from shutdown handler to main() in qpidd.cpp
................
r619888 | rhs | 2008-02-08 13:50:55 +0000 (Fri, 08 Feb 2008) | 1 line
simplied QpidTestCase's interface for running non vm brokers and set up build system to run cpp tests easily
................
r619903 | aconway | 2008-02-08 15:01:30 +0000 (Fri, 08 Feb 2008) | 11 lines
Refactored verify scripts, added verify for python Examples.
To verify an example: <qpid-trunk>/bin/verify <example-dir>
See comments in bin/verify for more details.
Changes:
- Each example dir has its own verify script and verify.in.
- Added sys.stdout.flush() to som python examples so verify can tell when they're ready.
- Made python examples svn:executable.
- C++ examples/Makefile.am runs python examples
................
r619941 | aconway | 2008-02-08 17:02:55 +0000 (Fri, 08 Feb 2008) | 3 lines
Added verify scripts to run mixed python/cpp examples.
bin/verify_all runs all examples.
................
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@620468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
30 files changed, 646 insertions, 439 deletions
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml new file mode 100644 index 0000000000..847c38b3eb --- /dev/null +++ b/qpid/java/client/build.xml @@ -0,0 +1,37 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQ Client" default="build"> + + <property name="module.depends" value="common"/> + <property name="module.test.depends" value="broker"/> + + <import file="../module.xml"/> + + <property name="output.dir" value="${module.precompiled}/org/apache/qpidity/filter/selector"/> + + <target name="precompile"> + <mkdir dir="${output.dir}"/> + <javacc target="src/main/grammar/SelectorParser.jj" + outputdirectory="${output.dir}" + javacchome="${project.root}/lib"/> + </target> + +</project> diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java index 9cbd40f9ea..b5f8dfda9b 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java @@ -25,6 +25,8 @@ import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; @@ -80,6 +82,14 @@ public class Producer // create the connection Connection connection = conFac.createConnection(); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + // Create a session on the connection // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index c2fb1b897a..43509e66c2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -116,7 +116,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private String _virtualHost; - private ExceptionListener _exceptionListener; + protected ExceptionListener _exceptionListener; private ConnectionListener _connectionListener; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 78090b45ad..bf1ed49492 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -11,11 +11,13 @@ import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.Session; import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.ClosedListener; +import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate +public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ClosedListener { /** * This class logger. @@ -109,6 +111,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate } _qpidConnection.connect(brokerDetail.getHost(), brokerDetail.getPort(), _conn.getVirtualHost(), _conn.getUsername(), _conn.getPassword()); + _qpidConnection.setClosedListener(this); } catch (QpidException e) { @@ -138,4 +141,22 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate } } + + public void onClosed(ErrorCode errorCode, String reason, Throwable t) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Received a connection close from the broker: Error code : " + errorCode.getCode()); + } + if (_conn._exceptionListener != null) + { + JMSException ex = new JMSException(reason,String.valueOf(errorCode.getCode())); + if (t != null) + { + ex.initCause(t); + } + + _conn._exceptionListener.onException(ex); + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 5983440f47..9a8470bece 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,6 +20,43 @@ */ package org.apache.qpid.client; +import java.io.Serializable; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionRolledBackException; + import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidArgumentException; @@ -44,7 +81,6 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -53,44 +89,6 @@ import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueReceiver; -import javax.jms.QueueSender; -import javax.jms.QueueSession; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.jms.TransactionRolledBackException; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - /** * * <p/><table id="crc"><caption>CRC Card</caption> @@ -185,14 +183,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * keeps a record of subscriptions which have been created in the current instance. It does not remember * subscriptions between executions of the client. */ - private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = + protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions = new ConcurrentHashMap<String, TopicSubscriberAdaptor>(); /** * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked * up in the {@link #_subscriptions} map. */ - private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = + protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap = new ConcurrentHashMap<BasicMessageConsumer, String>(); /** @@ -200,7 +198,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Weaken the type once {@link FlowControllingBlockingQueue} implements Queue. */ - private final FlowControllingBlockingQueue _queue; + protected final FlowControllingBlockingQueue _queue; /** * Holds the highest received delivery tag. @@ -279,10 +277,10 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess protected final boolean _immediatePrefetch; /** Indicates that warnings should be generated on violations of the strict AMQP. */ - private final boolean _strictAMQP; + protected final boolean _strictAMQP; /** Indicates that runtime exceptions should be generated on vilations of the strict AMQP. */ - private final boolean _strictAMQPFATAL; + protected final boolean _strictAMQPFATAL; private final Object _messageDeliveryLock = new Object(); /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */ @@ -518,8 +516,8 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess if (_logger.isInfoEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - _logger.info("Closing session: " + this + ":" - + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + _logger.info("Closing session: " + this); // + ":" + // + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } synchronized (_connection.getFailoverMutex()) @@ -781,6 +779,14 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false, false); } + public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + false, false); + } + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -831,70 +837,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess false); } - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic = checkValidTopic(topic); - AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); - TopicSubscriberAdaptor subscriber = _subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName = ((AMQTopic) topic).getRoutingKey(); - } - else - { - topicName = new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } + public abstract TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException; /** Note, currently this does not handle reuse of the same name with different topics correctly. */ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) @@ -1387,7 +1330,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess { // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad // in 0-9 we used the cleaner addition of a new sync recover method with its own ok - if(getProtocolVersion().equals(ProtocolVersion.v8_0)) + if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class); @@ -1985,7 +1928,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess /* * I could have combined the last 3 methods, but this way it improves readability */ - private AMQTopic checkValidTopic(Topic topic) throws JMSException + protected AMQTopic checkValidTopic(Topic topic) throws JMSException { if (topic == null) { @@ -2353,7 +2296,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess * * @todo Be aware of possible changes to parameter order as versions change. */ - private void deleteQueue(final AMQShortString queueName) throws JMSException + protected void deleteQueue(final AMQShortString queueName) throws JMSException { try { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a63d94b4ca..dd2b9a2389 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,6 +27,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.ErrorCode; @@ -38,13 +39,12 @@ import org.apache.qpidity.transport.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.JMSException; -import javax.jms.Destination; -import javax.jms.TemporaryQueue; +import javax.jms.*; +import javax.jms.IllegalStateException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.UUID; import java.util.Map; -import java.util.HashMap; +import java.util.Iterator; /** * This is a 0.10 Session @@ -146,6 +146,25 @@ public class AMQSession_0_10 extends AMQSession //------- overwritten methods of class AMQSession + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException + { + checkNotClosed(); + checkValidTopic(topic); + if( _subscriptions.containsKey(name)) + { + _subscriptions.get(name).close(); + } + AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection); + BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); + TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } + /** * Acknowledge one or many messages. * @@ -223,6 +242,25 @@ public class AMQSession_0_10 extends AMQSession } /** + * We need to release message that may be pre-fetched in the local queue + * + * @throws JMSException + */ + public void close() throws JMSException + { + super.close(); + // We need to release pre-fetched messages + Iterator messages=_queue.iterator(); + while (messages.hasNext()) + { + UnprocessedMessage message=(UnprocessedMessage) messages.next(); + messages.remove(); + rejectMessage(message, true); + } + } + + + /** * Commit the receipt and the delivery of all messages exchanged by this session resources. */ public void sendCommit() throws AMQException, FailoverException @@ -359,9 +397,17 @@ public class AMQSession_0_10 extends AMQSession consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION); - getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); + getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW); getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); // We need to sync so that we get notify of an error. + if(consumer.isStrated()) + { + // set the flow + getQpidSession().messageFlow(consumer.getConsumerTag().toString(), + org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, + AMQSession_0_10.MAX_PREFETCH); + + } getQpidSession().sync(); getCurrentException(); } @@ -462,11 +508,11 @@ public class AMQSession_0_10 extends AMQSession //only set if msg list is null try { - if (consumer.getMessageListener() != null) - { + // if (consumer.getMessageListener() != null) + // { getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_MESSAGE, MAX_PREFETCH); - } + // } getQpidSession() .messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); } @@ -546,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession */ private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener { - public void onClosed(ErrorCode errorCode, String reason) + public void onClosed(ErrorCode errorCode, String reason, Throwable t) { synchronized (this) { @@ -579,8 +625,7 @@ public class AMQSession_0_10 extends AMQSession void start() throws AMQException { - - super.suspendChannel(false); + suspendChannel(false); for(BasicMessageConsumer c: _consumers.values()) { c.start(); @@ -592,16 +637,19 @@ public class AMQSession_0_10 extends AMQSession } } - void stop() throws AMQException + + + + void stop() throws AMQException { super.stop(); - for(BasicMessageConsumer c: _consumers.values()) + for(BasicMessageConsumer c: _consumers.values()) { c.stop(); } } - synchronized void startDistpatcherIfNecessary() + synchronized void startDistpatcherIfNecessary() { // If IMMEDIATE_PREFETCH is not set then we need to start fetching if (!_immediatePrefetch) @@ -622,4 +670,71 @@ public class AMQSession_0_10 extends AMQSession startDistpatcherIfNecessary(false); } + + + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + + checkNotClosed(); + AMQTopic origTopic=checkValidTopic(topic); + AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); + + TopicSubscriberAdaptor subscriber=_subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName=((AMQTopic) topic).getRoutingKey(); + } + else + { + topicName=new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + + subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 87c9c95c55..28585b79be 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,9 +21,8 @@ package org.apache.qpid.client; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.TemporaryQueue; +import javax.jms.*; +import javax.jms.IllegalStateException; import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; @@ -132,7 +131,7 @@ public class AMQSession_0_8 extends AMQSession { // We can't use the BasicRecoverBody-OK method as it isn't part of the spec. - BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); + BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false); _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId)); _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order."); } @@ -142,12 +141,12 @@ public class AMQSession_0_8 extends AMQSession // in 0-9 we used the cleaner addition of a new sync recover method with its own ok if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0)) { - BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false); + BasicRecoverBody body = getProtocolHandler().getMethodRegistry().createBasicRecoverBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class); } else if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9)) { - BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false); + BasicRecoverSyncBody body = ((MethodRegistry_0_9)getProtocolHandler().getMethodRegistry()).createBasicRecoverSyncBody(false); _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class); } else @@ -166,7 +165,7 @@ public class AMQSession_0_8 extends AMQSession _logger.debug("Rejecting delivery tag:" + deliveryTag); } - AMQFrame basicRejectBody = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId); + AMQFrame basicRejectBody = getProtocolHandler().getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId); _connection.getProtocolHandler().writeFrame(basicRejectBody); } @@ -182,7 +181,7 @@ public class AMQSession_0_8 extends AMQSession { public AMQMethodEvent execute() throws AMQException, FailoverException { - AMQFrame boundFrame = getMethodRegistry().createExchangeBoundBody + AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody (exchangeName, routingKey, queueName).generateFrame(_channelId); return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); @@ -225,7 +224,7 @@ public class AMQSession_0_8 extends AMQSession // we must register the consumer in the map before we actually start listening _consumers.put(tag, consumer); // TODO: Be aware of possible changes to parameter order as versions change. - AMQFrame jmsConsume = getMethodRegistry().createBasicConsumeBody(getTicket(), + AMQFrame jmsConsume = getProtocolHandler().getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, tag, consumer.isNoLocal(), @@ -247,7 +246,7 @@ public class AMQSession_0_8 extends AMQSession public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException { - AMQFrame exchangeDeclare = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null). + AMQFrame exchangeDeclare = getProtocolHandler().getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null). generateFrame(_channelId); protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); @@ -255,14 +254,14 @@ public class AMQSession_0_8 extends AMQSession public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException { - AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId); + AMQFrame queueDeclare = getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId); protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class); } public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException { - QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(), + QueueDeleteBody body = getProtocolHandler().getMethodRegistry().createQueueDeleteBody(getTicket(), queueName, false, false, @@ -311,4 +310,70 @@ public class AMQSession_0_8 extends AMQSession return new AMQTemporaryQueue(this); } + + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + + checkNotClosed(); + AMQTopic origTopic = checkValidTopic(topic); + AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection); + TopicSubscriberAdaptor subscriber = _subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName = ((AMQTopic) topic).getRoutingKey(); + } + else + { + topicName = new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + + subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java index a036287f1f..88f7e550f4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java @@ -71,6 +71,13 @@ public class AMQTopic extends AMQDestination implements Topic queueName, isDurable); } + protected AMQTopic(AMQShortString exchangeName, AMQShortString exchangeClass, AMQShortString routingKey, boolean isExclusive, + boolean isAutoDelete, AMQShortString queueName, boolean isDurable) + { + super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable ); + } + + public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection) throws JMSException { @@ -79,6 +86,13 @@ public class AMQTopic extends AMQDestination implements Topic true); } + public static AMQTopic createDurable010Topic(AMQTopic topic, String subscriptionName, AMQConnection connection) + throws JMSException + { + return new AMQTopic(topic.getExchangeName(), ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false, + getDurableTopicQueueName(subscriptionName, connection), false); + } + public static AMQShortString getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException { return new AMQShortString(connection.getClientID() + ":" + subscriptionName); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 87eebdfce6..206e705ecf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -85,7 +85,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me protected MessageFactoryRegistry _messageFactory; - private final AMQSession _session; + protected final AMQSession _session; protected AMQProtocolHandler _protocolHandler; @@ -434,7 +434,23 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me } } - public abstract Object getMessageFromQueue(long l) throws InterruptedException; + public Object getMessageFromQueue(long l) throws InterruptedException + { + Object o; + if (l > 0) + { + o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); + } + else if (l < 0) + { + o = _synchronousQueue.poll(); + } + else + { + o = _synchronousQueue.take(); + } + return o; + } private boolean closeOnAutoClose() throws JMSException { @@ -1107,6 +1123,12 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me // do nothing as this is a 0_10 feature } + public boolean isStrated() + { + // do nothing as this is a 0_10 feature + return false; + } + public AMQShortString getQueuename() { return _queuename; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 80b63c75c8..3534bade61 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,10 +19,7 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; @@ -41,8 +38,8 @@ import javax.jms.JMSException; import javax.jms.MessageListener; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.Iterator; /** * This is a 0.10 message consumer. @@ -50,15 +47,6 @@ import java.util.concurrent.atomic.AtomicLong; public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer> implements org.apache.qpidity.nclient.util.MessageListener { - /** - * A counter for keeping the number of available messages for this consumer - */ - private final AtomicLong _messageCounter = new AtomicLong(0); - - /** - * Number of received message so far - */ - private final AtomicLong _messagesReceived = new AtomicLong(0); /** * This class logger @@ -117,6 +105,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By // ----- Interface org.apache.qpidity.client.util.MessageListener /** + * + * This is invoked by the session thread when emptying the session message queue. + * We first check if the message is valid (match the selector) and then deliver it to the + * message listener or to the sync consumer queue. + * * @param jmsMessage this message has already been processed so can't redo preDeliver * @param channelId */ @@ -136,12 +129,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } catch (Exception e1) { - // the receiver may be waiting for a message - if (_messageCounter.get() >= 0) - { - _messageCounter.decrementAndGet(); - _synchronousQueue.add(new NullTocken()); - } // we should silently log thie exception as it only hanppens when the connection is closed _logger.error("Exception when receiving message", e1); } @@ -152,20 +139,15 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } } + + + /** + * This method is invoked by the transport layer when a message is delivered for this + * consumer. The message is transformed and pass to the session. + * @param message an 0.10 message + */ public void onMessage(Message message) { - if (isMessageListenerSet()) - { - _messagesReceived.incrementAndGet(); - if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH) - { - // require more credit - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - AMQSession_0_10.MAX_PREFETCH); - _messagesReceived.set(0); - } - } int channelId = getSession().getChannelId(); long deliveryId = message.getMessageTransferId(); String consumerTag = getConsumerTag().toString(); @@ -207,8 +189,6 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By newMessage.setReplyToURL(replyToUrl); } newMessage.setContentHeader(headers); - // increase the counter of messages - _messageCounter.incrementAndGet(); getSession().messageReceived(newMessage); // else ignore this message } @@ -246,6 +226,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By //{ super.postDeliver(msg); //} + + } void notifyMessage(UnprocessedMessage messageFrame, int channelId) @@ -351,50 +333,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By } messageOk = acquireMessage(message); } - if (!messageOk) - { - requestCreditIfCreditMode(); - } return messageOk; } - private void requestCreditIfCreditMode() - { - try - { - // the current message received is not good, so we need to get a message. - if (getMessageListener() == null) - { - int oldval = _messageCounter.intValue(); - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, - 1); - _0_10session.getQpidSession() - .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); - _0_10session.getQpidSession().sync(); - _0_10session.getQpidSession() - .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - if (_messageCounter.intValue() <= oldval) - { - // we haven't received a message so tell the receiver to return null - _synchronousQueue.add(new NullTocken()); - } - else - { - _messageCounter.decrementAndGet(); - } - } - // we now need to check if we have received a message - - } - catch (Exception e) - { - _logger.error( - "Error getting message listener, couldn't request credit after releasing a message that failed the selector test", - e); - } - } /** * Acknowledge a message @@ -469,16 +410,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By super.setMessageListener(messageListener); if (messageListener == null) { - _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); + /* _0_10session.getQpidSession().messageStop(getConsumerTag().toString()); _0_10session.getQpidSession() .messageFlowMode(getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT); _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); _0_10session.getQpidSession().sync(); + */ } else { + //TODO: empty the list of sync messages. if (_connection.started()) { _0_10session.getQpidSession() @@ -490,66 +433,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); _0_10session.getQpidSession().sync(); - _messagesReceived.set(0); - ; - } - } - } - - public Object getMessageFromQueue(long l) throws InterruptedException - { - if (!_isStarted) - { - return null; - } - Object o; - _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(), - org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); - - if (l == 0) - { - o = _synchronousQueue.take(); - } - else - { - if (l > 0) - { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - } - else - { - o = _synchronousQueue.poll(); - } - if (o == null) - { - _logger.debug("Message Didn't arrive in time, checking if one is inflight"); - // checking if one is inflight - _0_10session.getQpidSession().messageFlush(getConsumerTag().toString()); - _0_10session.getQpidSession().sync(); - _0_10session.getQpidSession() - .messageFlow(getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); - if (_messageCounter.get() > 0) - { - o = _synchronousQueue.take(); - } } } - if (o instanceof NullTocken) - { - o = null; - } - return o; } - protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException + public boolean isStrated() { - _messageCounter.decrementAndGet(); - super.preApplicationProcessing(jmsMsg); - } - - private class NullTocken - { - + return _isStarted; } public void start() @@ -561,4 +451,17 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By { _isStarted = false; } + + public void close() throws JMSException + { + super.close(); + // release message that may be staged + Iterator messages=_synchronousQueue.iterator(); + while (messages.hasNext()) + { + AbstractJMSMessage message=(AbstractJMSMessage) messages.next(); + messages.remove(); + _session.rejectMessage(message, true); + } + } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cb21f71c1a..1635c51573 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -86,22 +86,5 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); } - - public Object getMessageFromQueue(long l) throws InterruptedException - { - Object o; - if (l > 0) - { - o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS); - } - else if (l < 0) - { - o = _synchronousQueue.poll(); - } - else - { - o = _synchronousQueue.take(); - } - return o; - } + }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index eed9f21090..7eb56acb27 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -739,4 +739,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _consumer = basicMessageConsumer; } + public void receivedFromServer() + { + _changedData = false; + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 083912d8ac..4eeb702703 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -140,7 +140,9 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory props.setType(mprop.getType()); props.setUserId(mprop.getUserId()); props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders())); - return createMessage(messageNbr, data, exchange, routingKey, props); + AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props); + message.receivedFromServer(); + return message; } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index c3a879177c..51a052bed5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -49,11 +49,33 @@ public class Client implements org.apache.qpidity.nclient.Connection ConnectionDelegate connectionDelegate = new ConnectionDelegate() { + private boolean receivedClose = false; + public SessionDelegate getSessionDelegate() { return new ClientSessionDelegate(); } + public void exception(Throwable t) + { + if (_closedListner != null) + { + _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),t); + } + else + { + throw new RuntimeException("connection closed",t); + } + } + + public void closed() + { + if (_closedListner != null && !this.receivedClose) + { + _closedListner.onClosed(ErrorCode.CONNECTION_ERROR,ErrorCode.CONNECTION_ERROR.getDesc(),null); + } + } + @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode()); @@ -67,8 +89,10 @@ public class Client implements org.apache.qpidity.nclient.Connection } else { - _closedListner.onClosed(errorCode, connectionClose.getReplyText()); + _closedListner.onClosed(errorCode, connectionClose.getReplyText(),null); } + + this.receivedClose = true; } }; @@ -79,6 +103,7 @@ public class Client implements org.apache.qpidity.nclient.Connection if (System.getProperty("transport","mina").equalsIgnoreCase("nio")) { + System.out.println("Using NIO"); if( _logger.isDebugEnabled()) { _logger.debug("using NIO"); @@ -180,6 +205,7 @@ public class Client implements org.apache.qpidity.nclient.Connection public void setClosedListener(ClosedListener closedListner) { + _closedListner = closedListner; } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java index 5ca598d412..c0c6978a14 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java @@ -32,8 +32,8 @@ public interface ClosedListener * informs the connection's ExceptionListener * @param errorCode TODO * @param reason TODO - * + * @param t TODO * @see Connection */ - public void onClosed(ErrorCode errorCode, String reason); + public void onClosed(ErrorCode errorCode, String reason, Throwable t); }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java index 7eb482c26b..008b85e98a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java @@ -1,5 +1,7 @@ package org.apache.qpidity.nclient; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -16,17 +18,17 @@ public class JMSTestCase try { - javax.jms.Connection con = new AMQConnection("qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672"); + javax.jms.Connection con = new AMQConnection("qpid:password=pass;username=name@tcp:localhost:5672"); con.start(); javax.jms.Session ssn = con.createSession(false, 1); javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test"); javax.jms.MessageConsumer cons = ssn.createConsumer(dest); - javax.jms.MessageProducer prod = ssn.createProducer(dest); + //javax.jms.MessageProducer prod = ssn.createProducer(dest); - //javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive(); - /* cons.setMessageListener(new MessageListener() + javax.jms.TextMessage m = null; // (javax.jms.TextMessage)cons.receive(); + cons.setMessageListener(new MessageListener() { public void onMessage(Message m) { @@ -41,9 +43,25 @@ public class JMSTestCase } } - });*/ + }); - javax.jms.TextMessage msg = ssn.createTextMessage(); + con.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + + System.out.println("Waiting"); + while (m == null) + { + + } + + System.out.println("Exiting"); + + /*javax.jms.TextMessage msg = ssn.createTextMessage(); msg.setText("This is a test message"); msg.setBooleanProperty("targetMessage", false); prod.send(msg); @@ -60,7 +78,7 @@ public class JMSTestCase else { System.out.println("message is not null" + m); - } + }*/ } catch(Exception e) diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index 0a25ea3961..5fe38889d9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -189,7 +189,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen void notifyException(QpidException ex) { - _exceptionListner.onClosed(null, null); + _exceptionListner.onClosed(null, null, null); } Map<String,MessagePartListener> getMessageListerners() diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java index 96ec98a45a..810082fdd3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java @@ -40,7 +40,7 @@ public class DemoClient Session ssn = conn.createSession(50000); ssn.setClosedListener(new ClosedListener() { - public void onClosed(ErrorCode errorCode, String reason) + public void onClosed(ErrorCode errorCode, String reason, Throwable t) { System.out.println("ErrorCode : " + errorCode + " reason : " + reason); } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java index 17081265aa..6a80575f4b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java @@ -43,7 +43,7 @@ public class LargeMsgDemoClient Session ssn = conn.createSession(50000); ssn.setClosedListener(new ClosedListener() { - public void onClosed(ErrorCode errorCode, String reason) + public void onClosed(ErrorCode errorCode, String reason, Throwable t) { System.out.println("ErrorCode : " + errorCode + " reason : " + reason); } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java index 54765ea72e..3ddded2051 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java @@ -124,7 +124,7 @@ public class BasicInteropTest implements ClosedListener session.sync(); } - public void onClosed(ErrorCode errorCode, String reason) + public void onClosed(ErrorCode errorCode, String reason, Throwable t) { System.out.println("------- Broker Notified an error --------"); System.out.println("------- " + errorCode + " --------"); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java index 5dc3b34a61..5694075ef4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java @@ -1163,7 +1163,7 @@ public class SessionImpl implements Session */ private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener { - public void onClosed(ErrorCode errorCode, String reason) + public void onClosed(ErrorCode errorCode, String reason, Throwable t) { synchronized (this) { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java index e2d3832cc9..3cef57f90d 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionStartTest.java @@ -82,8 +82,9 @@ public class ConnectionStartTest extends QpidTestCase protected void tearDown() throws Exception { _connection.close(); + super.tearDown(); } - + public void testSimpleReceiveConnection() { try diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java new file mode 100644 index 0000000000..ccf16a0b6e --- /dev/null +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.client.connection; + +import org.apache.qpid.testutil.QpidTestCase; + +import org.apache.qpid.util.concurrent.Condition; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; + +/** + * ExceptionListenerTest + * + */ + +public class ExceptionListenerTest extends QpidTestCase +{ + + public void testBrokerDeath() throws Exception + { + Connection conn = getConnection("guest", "guest"); + + conn.start(); + + final Condition fired = new Condition(); + conn.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + fired.set(); + } + }); + + stopBroker(); + + if (!fired.get(3000)) + { + fail("exception listener was not fired"); + } + } + +} diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java index b2bf7fd7e4..f43ccaf0ff 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/forwardall/Client.java @@ -98,7 +98,7 @@ public class Client implements MessageListener if (_count < _expected) { - wait(1000000000); + wait(60000); } if (_count < _expected) diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java index 21890e1ae9..5ebde71d6c 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.test.unit.close; -import junit.framework.TestCase; - import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.testutil.QpidTestCase; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; @@ -41,7 +40,7 @@ import javax.jms.Session; import java.util.concurrent.atomic.AtomicInteger; -public class MessageRequeueTest extends TestCase +public class MessageRequeueTest extends QpidTestCase { private static final Logger _logger = LoggerFactory.getLogger(MessageRequeueTest.class); @@ -64,7 +63,8 @@ public class MessageRequeueTest extends TestCase protected void setUp() throws Exception { super.setUp(); - conn = new QpidClientConnection(BROKER); + + conn = new QpidClientConnection(BROKER); conn.connect(); // clear queue @@ -78,7 +78,6 @@ public class MessageRequeueTest extends TestCase protected void tearDown() throws Exception { - super.tearDown(); if (!passed) // clean up { @@ -91,6 +90,7 @@ public class MessageRequeueTest extends TestCase conn.disconnect(); } + super.tearDown(); } /** @@ -125,7 +125,7 @@ public class MessageRequeueTest extends TestCase if (messageLog[msgindex] != 0) { _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() - + ") more than once."); + + ") more than once."); } if (_logger.isInfoEnabled()) @@ -144,16 +144,18 @@ public class MessageRequeueTest extends TestCase msg = consumer.receive(1000); } - _logger.info("consuming done."); + _logger.info("consuming done."); conn.getSession().commit(); consumer.close(); - assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); int index = 0; StringBuilder list = new StringBuilder(); list.append("Failed to receive:"); int failed = 0; + _logger.info("consumed: " + messagesReceived); + + assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived); // wit 0_10 we can have a delivery tag of 0 if (conn.isBroker08()) { @@ -174,7 +176,7 @@ public class MessageRequeueTest extends TestCase assertEquals(list.toString(), 0, failed); } - _logger.info("consumed: " + messagesReceived); + conn.disconnect(); passed = true; } @@ -208,7 +210,7 @@ public class MessageRequeueTest extends TestCase } catch (InterruptedException e) { - fail("Uanble to join to Consumer theads"); + fail("Unable to join to Consumer theads"); } _logger.info("consumer 1 count is " + c1.getCount()); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index fab419e831..b905591f19 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -106,7 +106,7 @@ public class TopicSessionTest extends QpidTestCase AMQTopic topic = new AMQTopic(con, "MyTopic1" + String.valueOf(shutdown)); AMQTopic topic2 = new AMQTopic(con, "MyOtherTopic1" + String.valueOf(shutdown)); - TopicSession session1 = con.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber sub = session1.createDurableSubscriber(topic, "subscription0"); TopicPublisher publisher = session1.createPublisher(null); @@ -144,11 +144,11 @@ public class TopicSessionTest extends QpidTestCase AMQConnection con1 = (AMQConnection) getConnection("guest", "guest"); AMQTopic topic = new AMQTopic(con1, "MyTopic3"); - TopicSession session1 = con1.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session1 = con1.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); TopicPublisher publisher = session1.createPublisher(topic); AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - TopicSession session2 = con2.createTopicSession(false, AMQSession.NO_ACKNOWLEDGE); + TopicSession session2 = con2.createTopicSession(false, AMQSession.AUTO_ACKNOWLEDGE); TopicSubscriber sub = session2.createDurableSubscriber(topic, "subscription0"); con2.start(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index 2dfca12f87..da8e189c84 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -467,6 +467,18 @@ public class CommitRollbackTest extends QpidTestCase } result = _consumer.receive(1000); + + if (isBroker08()) + { + assertNotNull("test message was consumed and rolled back, but is gone", result); + // assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); + } + else + { + assertNull("test message was consumed and not rolled back, but is redelivered", result); + } + + result = _consumer.receive(1000); assertNull("test message should be null:" + result, result); _session.commit(); diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 93f80645d5..1339cf9060 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -89,7 +89,7 @@ public class TransactedTest extends QpidTestCase prepCon = (AMQConnection) getConnection("guest", "guest"); _logger.info("Create prep session"); - prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + prepSession = prepCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); _logger.info("Create prep producer to Q1"); prepProducer1 = prepSession.createProducer(queue1); @@ -100,7 +100,7 @@ public class TransactedTest extends QpidTestCase _logger.info("Create test connection"); testCon = (AMQConnection) getConnection("guest", "guest"); _logger.info("Create test session"); - testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE); + testSession = testCon.createSession(false, AMQSession.AUTO_ACKNOWLEDGE); _logger.info("Create test consumer of q2"); testConsumer2 = testSession.createConsumer(queue2); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java index f9c830ddae..e99a51e1c7 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java @@ -81,11 +81,8 @@ public class QpidClientConnection extends QpidTestCase implements ExceptionListe String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; try { - AMQConnectionFactory factory = new AMQConnectionFactory(brokerUrl); _logger.info("connecting to Qpid :" + brokerUrl); - //connection = factory.createConnection(); - setUp(); - connection = getConnection("guest", "guest") ; + connection = getConnection("guest", "guest") ; // register exception listener connection.setExceptionListener(this); @@ -112,7 +109,6 @@ public class QpidClientConnection extends QpidTestCase implements ExceptionListe connection.close(); connected = false; _logger.info("disconnected"); - tearDown(); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java index 8ab381db32..e7c09fca65 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/testutil/QpidTestCase.java @@ -21,8 +21,8 @@ import junit.framework.TestCase; import javax.jms.Connection; import javax.naming.InitialContext; -import java.io.BufferedReader; -import java.io.InputStreamReader; +import java.io.InputStream; +import java.io.IOException; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQConnection; @@ -39,117 +39,101 @@ import org.slf4j.LoggerFactory; public class QpidTestCase extends TestCase { - /* this clas logger */ private static final Logger _logger = LoggerFactory.getLogger(QpidTestCase.class); - /* Test properties */ - private static final String SHEL = "broker_shel"; - private static final String BROKER_PATH = "broker_path"; - private static final String BROKER_PARAM = "broker_param"; - private static final String BROKER_VERSION = "broker_version"; - public static final String BROKER_08 = "08"; - private static final String BROKER_VM = "vm"; - private static final String EXT_BROKER = "ext" ; - /** - * The process where the remote broker is running. - */ - private Process _brokerProcess; + // system properties + private static final String BROKER = "broker"; + private static final String BROKER_VERSION = "broker.version"; + + // values + private static final String VM = "vm"; + private static final String EXTERNAL = "external"; + private static final String VERSION_08 = "0-8"; + private static final String VERSION_010 = "0-10"; - /* The test property values */ - // The default broker is an in-VM one - private String _shel = BROKER_VM; - private String _brokerPath = ""; - private String _brokerParams = ""; - private String _brokerVersion = "08" ; + private String _broker = System.getProperty(BROKER, VM); + private String _brokerVersion = System.getProperty(BROKER_VERSION, VERSION_08); + + private Process _brokerProcess; - /* The broker communication objects */ private InitialContext _initialContext; private AMQConnectionFactory _connectionFactory; - //--------- JUnit support - protected void setUp() throws Exception { super.setUp(); - // get the propeties if they are set - if (System.getProperties().containsKey(BROKER_VERSION )) - { - _brokerVersion = System.getProperties().getProperty(BROKER_VERSION ); - } - if (System.getProperties().containsKey(SHEL)) - { - _shel = System.getProperties().getProperty(SHEL); - } - if (System.getProperties().containsKey(BROKER_PATH)) - { - _brokerPath = System.getProperties().getProperty(BROKER_PATH); - } - if (System.getProperties().containsKey(BROKER_PARAM)) - { - _brokerParams = System.getProperties().getProperty(BROKER_PARAM); - } - if (!_shel.equals(BROKER_VM) && ! _shel.equals(EXT_BROKER) ) - { - // start a new broker - startBroker(); - } - else if ( ! _shel.equals(EXT_BROKER) ) - { - // create an in_VM broker - TransportConnection.createVMBroker(1); - } - _logger.info("========================================="); - _logger.info("broker version " + _brokerVersion + " ==== " + _shel + " " + _brokerPath + " " + _brokerParams); + startBroker(); } - /** - * This method _is invoked after each test case. - * - * @throws Exception - */ protected void tearDown() throws Exception { - killBroker(); - super.tearDown(); + stopBroker(); + super.tearDown(); } - public void killBroker() + public void startBroker() throws Exception { - _logger.info("Kill broker"); - if (_brokerProcess != null) + if (_broker.equals(VM)) { - // destroy the currently running broker - _brokerProcess.destroy(); - _brokerProcess = null; + // create an in_VM broker + TransportConnection.createVMBroker(1); } - else if ( ! _shel.equals(EXT_BROKER)) - { - TransportConnection.killAllVMBrokers(); + else if (!_broker.equals(EXTERNAL)) + { + _logger.info("starting broker: " + _broker); + ProcessBuilder pb = new ProcessBuilder(_broker.split("\\s+")); + pb.redirectErrorStream(true); + _brokerProcess = pb.start(); + + new Thread() + { + private InputStream in = _brokerProcess.getInputStream(); + + public void run() + { + try + { + byte[] buf = new byte[4*1024]; + int n; + while ((n = in.read(buf)) != -1) + { + System.out.write(buf, 0, n); + } + } + catch (IOException e) + { + _logger.info("redirector", e); + } + } + }.start(); + + Thread.sleep(1000); + + try + { + int exit = _brokerProcess.exitValue(); + throw new RuntimeException("broker aborted: " + exit); + } + catch (IllegalThreadStateException e) + { + // this is expect if the broker started succesfully + } } } - //--------- Util method - - /** - * This method starts a remote server by spawning an external process. - * - * @throws Exception If the broker cannot be started - */ - public void startBroker() throws Exception + public void stopBroker() throws Exception { - _logger.info("Starting broker: " + _shel + " " + _brokerPath + " " + _brokerParams + ""); - Runtime rt = Runtime.getRuntime(); - _brokerProcess = rt.exec(_shel + " " + _brokerPath + " " + _brokerParams + ""); - BufferedReader reader = new BufferedReader(new InputStreamReader(_brokerProcess.getInputStream())); - if (reader.ready()) + _logger.info("stopping broker: " + _broker); + if (_brokerProcess != null) { - //bad, we had an error starting the broker - throw new Exception("Problem when starting the broker: " + reader.readLine()); + _brokerProcess.destroy(); + _brokerProcess.waitFor(); + _logger.info("broker exited: " + _brokerProcess.exitValue()); + _brokerProcess = null; } - // We need to wait for th ebroker to start ideally we would need to ping it - synchronized(this) + else if (_broker.equals(VM)) { - this.wait(1000); + TransportConnection.killAllVMBrokers(); } } @@ -159,28 +143,18 @@ public class QpidTestCase extends TestCase */ public boolean isBroker08() { - return _brokerVersion.equals(BROKER_08); + return _brokerVersion.equals(VERSION_08); } - /** - * Stop the currently running broker. - */ - public void stopBroker() + public boolean isBroker010() { - _logger.info("Stopping broker"); - // stooping the broker - if (_brokerProcess != null) - { - _brokerProcess.destroy(); - } - _initialContext = null; - _connectionFactory = null; + return _brokerVersion.equals(VERSION_010); } - public void shutdownServer() throws Exception + public void shutdownServer() throws Exception { - killBroker(); - setUp(); + stopBroker(); + startBroker(); } /** * we assume that the environment is correctly set @@ -228,7 +202,7 @@ public class QpidTestCase extends TestCase { _logger.info("get Connection"); Connection con; - if (_shel.equals(BROKER_VM)) + if (_broker.equals(VM)) { con = new AMQConnection("vm://:1", username, password, "Test", "test"); } @@ -243,7 +217,7 @@ public class QpidTestCase extends TestCase { _logger.info("get Connection"); Connection con; - if (_shel.equals(BROKER_VM)) + if (_broker.equals(VM)) { con = new AMQConnection("vm://:1", username, password, id, "test"); } @@ -254,8 +228,4 @@ public class QpidTestCase extends TestCase return con; } - public void testfoo() - { - //do nothing, just to avoid maven to report an error - } } |