summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-04-21 13:55:26 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-04-21 13:55:26 +0000
commit1d69ea16b30dd67ac32683e6dc512f4c58ef93f1 (patch)
treeb7d326570570b53936c6512d58c465c76244e925 /java/client/src
parenta53cbaad17df415e98f22cc42f2512467936bbc6 (diff)
parentc0c3c38f032200e786cf5a4404cfa40a0c95f5e8 (diff)
downloadqpid-python-1d69ea16b30dd67ac32683e6dc512f4c58ef93f1.tar.gz
create branch for broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650146 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java478
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java300
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java50
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java392
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java268
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java54
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java235
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java186
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java74
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java34
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java59
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java12
-rw-r--r--java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java4
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java2
-rw-r--r--java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java187
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java106
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java19
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java31
34 files changed, 2081 insertions, 593 deletions
diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
new file mode 100644
index 0000000000..ab3bc28d83
--- /dev/null
+++ b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class ExistingSocketConnector extends BaseIoConnector
+{
+ /** @noinspection StaticNonFinalField */
+ private static volatile int nextId = 0;
+
+ private final Object lock = new Object();
+ private final int id = nextId++;
+ private final String threadName = "SocketConnector-" + id;
+ private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
+ private final Queue connectQueue = new Queue();
+ private final SocketIoProcessor[] ioProcessors;
+ private final int processorCount;
+ private final Executor executor;
+
+ /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
+ private Selector selector;
+ private Worker worker;
+ private int processorDistributor = 0;
+ private int workerTimeout = 60; // 1 min.
+ private Socket _openSocket = null;
+
+ /** Create a connector with a single processing thread using a NewThreadExecutor */
+ public ExistingSocketConnector()
+ {
+ this(1, new NewThreadExecutor());
+ }
+
+ /**
+ * Create a connector with the desired number of processing threads
+ *
+ * @param processorCount Number of processing threads
+ * @param executor Executor to use for launching threads
+ */
+ public ExistingSocketConnector(int processorCount, Executor executor)
+ {
+ if (processorCount < 1)
+ {
+ throw new IllegalArgumentException("Must have at least one processor");
+ }
+
+ this.executor = executor;
+ this.processorCount = processorCount;
+ ioProcessors = new SocketIoProcessor[processorCount];
+
+ for (int i = 0; i < processorCount; i++)
+ {
+ ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+ }
+ }
+
+ /**
+ * How many seconds to keep the connection thread alive between connection requests
+ *
+ * @return Number of seconds to keep connection thread alive
+ */
+ public int getWorkerTimeout()
+ {
+ return workerTimeout;
+ }
+
+ /**
+ * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
+ *
+ * @param workerTimeout Number of seconds to keep thread alive. Must be >=0
+ */
+ public void setWorkerTimeout(int workerTimeout)
+ {
+ if (workerTimeout < 0)
+ {
+ throw new IllegalArgumentException("Must be >= 0");
+ }
+ this.workerTimeout = workerTimeout;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ /** Changes here from the Mina OpenSocketConnector.
+ * Ignoreing all address as they are not needed */
+
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ if (_openSocket == null)
+ {
+ throw new IllegalArgumentException("Specifed Socket not active");
+ }
+
+ boolean success = false;
+
+ try
+ {
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ newSession(_openSocket, handler, config, future);
+ success = true;
+ return future;
+ }
+ catch (IOException e)
+ {
+ return DefaultConnectFuture.newFailedFuture(e);
+ }
+ finally
+ {
+ if (!success && _openSocket != null)
+ {
+ try
+ {
+ _openSocket.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+
+ /**
+ * Sets the config this connector will use by default.
+ *
+ * @param defaultConfig the default config.
+ *
+ * @throws NullPointerException if the specified value is <code>null</code>.
+ */
+ public void setDefaultConfig(SocketConnectorConfig defaultConfig)
+ {
+ if (defaultConfig == null)
+ {
+ throw new NullPointerException("defaultConfig");
+ }
+ this.defaultConfig = defaultConfig;
+ }
+
+ private synchronized void startupWorker() throws IOException
+ {
+ if (worker == null)
+ {
+ selector = Selector.open();
+ worker = new Worker();
+ executor.execute(new NamePreservingRunnable(worker));
+ }
+ }
+
+ private void registerNew()
+ {
+ if (connectQueue.isEmpty())
+ {
+ return;
+ }
+
+ for (; ;)
+ {
+ ConnectionRequest req;
+ synchronized (connectQueue)
+ {
+ req = (ConnectionRequest) connectQueue.pop();
+ }
+
+ if (req == null)
+ {
+ break;
+ }
+
+ SocketChannel ch = req.channel;
+ try
+ {
+ ch.register(selector, SelectionKey.OP_CONNECT, req);
+ }
+ catch (IOException e)
+ {
+ req.setException(e);
+ }
+ }
+ }
+
+ private void processSessions(Set keys)
+ {
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isConnectable())
+ {
+ continue;
+ }
+
+ SocketChannel ch = (SocketChannel) key.channel();
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ boolean success = false;
+ try
+ {
+ ch.finishConnect();
+ newSession(ch, entry.handler, entry.config, entry);
+ success = true;
+ }
+ catch (Throwable e)
+ {
+ entry.setException(e);
+ }
+ finally
+ {
+ key.cancel();
+ if (!success)
+ {
+ try
+ {
+ ch.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ }
+ }
+ }
+
+ keys.clear();
+ }
+
+ private void processTimedOutSessions(Set keys)
+ {
+ long currentTime = System.currentTimeMillis();
+ Iterator it = keys.iterator();
+
+ while (it.hasNext())
+ {
+ SelectionKey key = (SelectionKey) it.next();
+
+ if (!key.isValid())
+ {
+ continue;
+ }
+
+ ConnectionRequest entry = (ConnectionRequest) key.attachment();
+
+ if (currentTime >= entry.deadline)
+ {
+ entry.setException(new ConnectException());
+ try
+ {
+ key.channel().close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ key.cancel();
+ }
+ }
+ }
+ }
+
+ private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl(this,
+ nextProcessor(),
+ getListeners(),
+ config,
+ socket.getChannel(),
+ handler,
+ socket.getRemoteSocketAddress());
+
+ newSession(session, config, connectFuture);
+ }
+
+ private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+
+ {
+ SocketSessionImpl session = new SocketSessionImpl(this,
+ nextProcessor(),
+ getListeners(),
+ config,
+ ch,
+ handler,
+ ch.socket().getRemoteSocketAddress());
+
+ newSession(session, config, connectFuture);
+ }
+
+ private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture)
+ throws IOException
+ {
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+ session.getIoProcessor().addNew(session);
+ connectFuture.setSession(session);
+ }
+
+ private SocketIoProcessor nextProcessor()
+ {
+ return ioProcessors[processorDistributor++ % processorCount];
+ }
+
+ public void setOpenSocket(Socket openSocket)
+ {
+ _openSocket = openSocket;
+ }
+
+ private class Worker implements Runnable
+ {
+ private long lastActive = System.currentTimeMillis();
+
+ public void run()
+ {
+ Thread.currentThread().setName(ExistingSocketConnector.this.threadName);
+
+ for (; ;)
+ {
+ try
+ {
+ int nKeys = selector.select(1000);
+
+ registerNew();
+
+ if (nKeys > 0)
+ {
+ processSessions(selector.selectedKeys());
+ }
+
+ processTimedOutSessions(selector.keys());
+
+ if (selector.keys().isEmpty())
+ {
+ if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L)
+ {
+ synchronized (lock)
+ {
+ if (selector.keys().isEmpty() &&
+ connectQueue.isEmpty())
+ {
+ worker = null;
+ try
+ {
+ selector.close();
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+ }
+ finally
+ {
+ selector = null;
+ }
+ break;
+ }
+ }
+ }
+ }
+ else
+ {
+ lastActive = System.currentTimeMillis();
+ }
+ }
+ catch (IOException e)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e);
+
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e1)
+ {
+ ExceptionMonitor.getInstance().exceptionCaught(e1);
+ }
+ }
+ }
+ }
+ }
+
+ private class ConnectionRequest extends DefaultConnectFuture
+ {
+ private final SocketChannel channel;
+ private final long deadline;
+ private final IoHandler handler;
+ private final IoServiceConfig config;
+
+ private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config)
+ {
+ this.channel = channel;
+ long timeout;
+ if (config instanceof IoConnectorConfig)
+ {
+ timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis();
+ }
+ else
+ {
+ timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis();
+ }
+ this.deadline = System.currentTimeMillis() + timeout;
+ this.handler = handler;
+ this.config = config;
+ }
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
index b6fbb6c6bf..69ff7a2c19 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
@@ -39,4 +39,9 @@ public class AMQAuthenticationException extends AMQException
{
super(error, msg);
}
+ public boolean isHardError()
+ {
+ return true;
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index c04380ba8c..f3e71d2035 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -57,8 +57,9 @@ public class AMQBrokerDetails implements BrokerDetails
if (transport != null)
{
//todo this list of valid transports should be enumerated somewhere
- if ((!(transport.equalsIgnoreCase("vm") ||
- transport.equalsIgnoreCase("tcp"))))
+ if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) ||
+ transport.equalsIgnoreCase(BrokerDetails.TCP) ||
+ transport.equalsIgnoreCase(BrokerDetails.SOCKET))))
{
if (transport.equalsIgnoreCase("localhost"))
{
@@ -156,7 +157,10 @@ public class AMQBrokerDetails implements BrokerDetails
}
else
{
- setPort(port);
+ if (!_transport.equalsIgnoreCase(SOCKET))
+ {
+ setPort(port);
+ }
}
String queryString = connection.getQuery();
@@ -263,13 +267,16 @@ public class AMQBrokerDetails implements BrokerDetails
sb.append(_transport);
sb.append("://");
- if (!(_transport.equalsIgnoreCase("vm")))
+ if (!(_transport.equalsIgnoreCase(VM)))
{
sb.append(_host);
}
- sb.append(':');
- sb.append(_port);
+ if (!(_transport.equalsIgnoreCase(SOCKET)))
+ {
+ sb.append(':');
+ sb.append(_port);
+ }
sb.append(printOptionsURL());
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 39b3b80e74..4b8143cfb5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -39,6 +39,7 @@ import org.apache.qpid.jms.Connection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,105 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
+ private static final class ChannelToSessionMap
+ {
+ private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+ private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+ private int _size = 0;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+
+ public AMQSession get(int channelId)
+ {
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ return _fastAccessSessions[channelId];
+ }
+ else
+ {
+ return _slowAccessSessions.get(channelId);
+ }
+ }
+
+ public AMQSession put(int channelId, AMQSession session)
+ {
+ AMQSession oldVal;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ oldVal = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = session;
+ }
+ else
+ {
+ oldVal = _slowAccessSessions.put(channelId, session);
+ }
+ if((oldVal != null) && (session == null))
+ {
+ _size--;
+ }
+ else if((oldVal == null) && (session != null))
+ {
+ _size++;
+ }
+
+ return session;
+
+ }
+
+
+ public AMQSession remove(int channelId)
+ {
+ AMQSession session;
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ session = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = null;
+ }
+ else
+ {
+ session = _slowAccessSessions.remove(channelId);
+ }
+
+ if(session != null)
+ {
+ _size--;
+ }
+ return session;
+
+ }
+
+ public Collection<AMQSession> values()
+ {
+ ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessSessions[i] != null)
+ {
+ values.add(_fastAccessSessions[i]);
+ }
+ }
+ values.addAll(_slowAccessSessions.values());
+
+ return values;
+ }
+
+ public int size()
+ {
+ return _size;
+ }
+
+ public void clear()
+ {
+ _size = 0;
+ _slowAccessSessions.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessSessions[i] = null;
+ }
+ }
+ }
+
+
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private AtomicInteger _idFactory = new AtomicInteger(0);
@@ -101,7 +201,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+ private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
private String _clientName;
@@ -157,6 +257,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final long DEFAULT_TIMEOUT = 1000 * 30;
private ProtocolVersion _protocolVersion;
+
/**
* @param broker brokerdetails
* @param username username
@@ -232,6 +333,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
+ final ArrayList<JMSException> exceptions = new ArrayList<JMSException>();
+
+ class Listener implements ExceptionListener
+ {
+ public void onException(JMSException e)
+ {
+ exceptions.add(e);
+ }
+ }
+
+ try
+ {
+ setExceptionListener(new Listener());
+ }
+ catch (JMSException e)
+ {
+ // Shouldn't happen
+ throw new AMQException(null, null, e);
+ }
+
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
@@ -288,8 +409,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
try
{
makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
- lastException = null;
- _connected = true;
}
catch (Exception e)
{
@@ -316,8 +435,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if (!_connected)
{
String message = null;
+ try
+ {
+ Thread.sleep(150);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it, we've hopefully got all the exceptions if this happened
+ }
+ if (exceptions.size() > 0)
+ {
+ JMSException e = exceptions.get(0);
+ int code = -1;
+ try
+ {
+ code = new Integer(e.getErrorCode()).intValue();
+ }
+ catch (NumberFormatException nfe)
+ {
+ // Ignore this, we have some error codes and messages swapped around
+ }
- if (lastException != null)
+ throw new AMQConnectionFailureException(AMQConstant.getConstant(code),
+ e.getMessage(), e);
+ }
+ else if (lastException != null)
{
if (lastException.getCause() != null)
{
@@ -395,20 +537,27 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
try
{
+
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
//_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
AMQState state = _protocolHandler.attainState(openOrClosedStates);
- if(state == AMQState.CONNECTION_OPEN)
+ if (state == AMQState.CONNECTION_OPEN)
{
-
_failoverPolicy.attainedConnection();
// Again this should be changed to a suitable notify
_connected = true;
}
+ else if (state == AMQState.CONNECTION_CLOSED)
+ {
+ //We need to change protocol handler here as an error during the connect will not
+ // cause the StateManager to be replaced. So the state is out of sync on reconnect
+ // This occurs here when we need to re-negotiate protocol versions
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED);
+ }
}
catch (AMQException e)
{
@@ -513,71 +662,71 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
- synchronized(_sessionCreationLock)
+ synchronized (_sessionCreationLock)
{
- checkNotClosed();
+ checkNotClosed();
- if (channelLimitReached())
- {
- throw new ChannelLimitReachedException(_maximumChannelCount);
- }
+ if (channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_maximumChannelCount);
+ }
- return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
- new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
- {
- public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+ return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+ new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
{
- int channelId = _idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
+ public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
{
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session =
- new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
- // _protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
+ int channelId = _idFactory.incrementAndGet();
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
+ if (_logger.isDebugEnabled())
{
- deregisterSession(channelId);
+ _logger.debug("Write channel open frame for channel id " + channelId);
}
- }
- if (_started)
- {
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ // _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
+
+ boolean success = false;
try
{
- session.start();
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
}
catch (AMQException e)
{
- throw new JMSAMQException(e);
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
+ {
+ deregisterSession(channelId);
+ }
}
- }
- return session;
- }
- }, this).execute();
+ if (_started)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ return session;
+ }
+ }, this).execute();
}
}
@@ -589,13 +738,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
+ _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
- BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
+ _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class);
if (transacted)
{
@@ -720,10 +869,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
checkNotClosed();
if (!_started)
{
- final Iterator it = _sessions.entrySet().iterator();
+ final Iterator it = _sessions.values().iterator();
while (it.hasNext())
{
- final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue();
+ final AMQSession s = (AMQSession) (it.next());
try
{
s.start();
@@ -783,17 +932,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else
{
- synchronized (getFailoverMutex())
- {
if (!_closed.getAndSet(true))
{
- try
+
+ synchronized (getFailoverMutex())
{
- long startCloseTime = System.currentTimeMillis();
+ try
+ {
+ long startCloseTime = System.currentTimeMillis();
- _taskPool.shutdown();
closeAllSessions(null, timeout, startCloseTime);
+ //This MUST occur after we have successfully closed all Channels/Sessions
+ _taskPool.shutdown();
+
if (!_taskPool.isTerminated())
{
try
@@ -977,11 +1129,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return _maximumFrameSize;
}
- public Map getSessions()
- {
- return _sessions;
- }
-
public String getUsername()
{
return _username;
@@ -1154,7 +1301,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.error("Throwable Received but no listener set: " + cause.getMessage());
}
- if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException))
+ if (hardError(cause))
{
try
{
@@ -1178,6 +1325,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+ private boolean hardError(Throwable cause)
+ {
+ if (cause instanceof AMQException)
+ {
+ return ((AMQException)cause).isHardError();
+ }
+
+ return true;
+ }
+
void registerSession(int channelId, AMQSession session)
{
_sessions.put(channelId, session);
@@ -1202,6 +1359,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// _protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
+ s.setFlowControl(true);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 28e5992b26..a3cf39003d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -50,7 +51,9 @@ public class AMQQueueBrowser implements QueueBrowser
_messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
// Create Consumer to verify message selector.
BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ // Close this consumer as we are not looking to consume only to establish that, at least for now,
+ // the QB can be created
consumer.close();
}
@@ -88,39 +91,40 @@ public class AMQQueueBrowser implements QueueBrowser
checkState();
final BasicMessageConsumer consumer =
(BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+
_consumers.add(consumer);
return new Enumeration()
+ {
+
+ Message _nextMessage = consumer == null ? null : consumer.receive();
+
+ public boolean hasMoreElements()
{
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
- Message _nextMessage = consumer.receive();
+ return (_nextMessage != null);
+ }
- public boolean hasMoreElements()
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
{
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ _logger.info("QB:nextElement about to receive");
- return (_nextMessage != null);
+ _nextMessage = consumer.receive();
+ _logger.info("QB:nextElement received:" + _nextMessage);
}
-
- public Object nextElement()
+ catch (JMSException e)
{
- Message msg = _nextMessage;
- try
- {
- _logger.info("QB:nextElement about to receive");
-
- _nextMessage = consumer.receive();
- _logger.info("QB:nextElement received:" + _nextMessage);
- }
- catch (JMSException e)
- {
- _logger.warn("Exception caught while queue browsing", e);
- _nextMessage = null;
- }
-
- return msg;
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
}
- };
+
+ return msg;
+ }
+ };
}
public void close() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 42f07f97f9..c3219e6564 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -39,10 +39,10 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
-import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -78,10 +78,7 @@ import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
import java.io.Serializable;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@@ -107,6 +104,89 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
+ private static final class IdToConsumerMap
+ {
+ private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
+ private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+
+
+ public BasicMessageConsumer get(int id)
+ {
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ return _fastAccessConsumers[id];
+ }
+ else
+ {
+ return _slowAccessConsumers.get(id);
+ }
+ }
+
+ public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+ {
+ BasicMessageConsumer oldVal;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ oldVal = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = consumer;
+ }
+ else
+ {
+ oldVal = _slowAccessConsumers.put(id, consumer);
+ }
+
+ return consumer;
+
+ }
+
+
+ public BasicMessageConsumer remove(int id)
+ {
+ BasicMessageConsumer consumer;
+ if((id & 0xFFFFFFF0) == 0)
+ {
+ consumer = _fastAccessConsumers[id];
+ _fastAccessConsumers[id] = null;
+ }
+ else
+ {
+ consumer = _slowAccessConsumers.remove(id);
+ }
+
+ return consumer;
+
+ }
+
+ public Collection<BasicMessageConsumer> values()
+ {
+ ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+
+ for(int i = 0; i < 16; i++)
+ {
+ if(_fastAccessConsumers[i] != null)
+ {
+ values.add(_fastAccessConsumers[i]);
+ }
+ }
+ values.addAll(_slowAccessConsumers.values());
+
+ return values;
+ }
+
+
+ public void clear()
+ {
+ _slowAccessConsumers.clear();
+ for(int i = 0; i<16; i++)
+ {
+ _fastAccessConsumers[i] = null;
+ }
+ }
+ }
+
+
+
+
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -156,7 +236,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _transacted;
/** Holds the sessions acknowledgement mode. */
- private int _acknowledgeMode;
+ private final int _acknowledgeMode;
/** Holds this session unique identifier, used to distinguish it from other sessions. */
private int _channelId;
@@ -217,8 +297,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
* consumer.
*/
- private Map<AMQShortString, BasicMessageConsumer> _consumers =
- new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
+ private final IdToConsumerMap _consumers = new IdToConsumerMap();
+
+ //Map<AMQShortString, BasicMessageConsumer> _consumers =
+ //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
/**
* Contains a list of consumers which have been removed but which might still have
@@ -281,6 +363,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
/** Has failover occured on this session */
private boolean _failedOver;
+
+
+ private static final class FlowControlIndicator
+ {
+ private volatile boolean _flowControl = true;
+
+ public synchronized void setFlowControl(boolean flowControl)
+ {
+ _flowControl= flowControl;
+ notify();
+ }
+
+ public boolean getFlowControl()
+ {
+ return _flowControl;
+ }
+ }
+
+ /** Flow control */
+ private FlowControlIndicator _flowControl = new FlowControlIndicator();
+
/**
* Creates a new session on a connection.
*
@@ -327,24 +430,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
public void aboveThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Above threshold(" + _defaultPrefetchHighMark
+ ") so suspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(true)).start();
- }
+
}
public void underThreshold(int currentValue)
{
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
_logger.debug(
"Below threshold(" + _defaultPrefetchLowMark
+ ") so unsuspending channel. Current value is " + currentValue);
new Thread(new SuspenderRunner(false)).start();
- }
+
}
});
}
@@ -495,14 +594,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
}
- synchronized (_connection.getFailoverMutex())
+ // Ensure we only try and close an open session.
+ if (!_closed.getAndSet(true))
{
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session.
- synchronized (_messageDeliveryLock)
+ synchronized (_connection.getFailoverMutex())
{
- // Ensure we only try and close an open session.
- if (!_closed.getAndSet(true))
+ // We must close down all producers and consumers in an orderly fashion. This is the only method
+ // that can be called from a different thread of control from the one controlling the session.
+ synchronized (_messageDeliveryLock)
{
// we pass null since this is not an error case
closeProducersAndConsumers(null);
@@ -516,7 +615,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
final AMQFrame frame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
-
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully.
@@ -550,33 +648,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+ // calls through connection.closeAllSessions which is also called by the public connection.close()
+ // with a null cause
+ // When we are closing the Session due to a protocol session error we simply create a new AMQException
+ // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+ // We need to determin here if the connection should be
+
+ if (e instanceof AMQDisconnectedException)
{
- if (e instanceof AMQDisconnectedException)
+ if (_dispatcher != null)
{
- if (_dispatcher != null)
- {
- // Failover failed and ain't coming back. Knife the dispatcher.
- _dispatcher.interrupt();
- }
+ // Failover failed and ain't coming back. Knife the dispatcher.
+ _dispatcher.interrupt();
}
- synchronized (_messageDeliveryLock)
+ }
+
+ if (!_closed.getAndSet(true))
+ {
+ synchronized (_connection.getFailoverMutex())
{
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- _closed.set(true);
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
+ synchronized (_messageDeliveryLock)
{
- amqe = new AMQException("Closing session forcibly", e);
- }
+ // An AMQException has an error code and message already and will be passed in when closure occurs as a
+ // result of a channel close request
+ AMQException amqe;
+ if (e instanceof AMQException)
+ {
+ amqe = (AMQException) e;
+ }
+ else
+ {
+ amqe = new AMQException("Closing session forcibly", e);
+ }
+
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ _connection.deregisterSession(_channelId);
+ closeProducersAndConsumers(amqe);
+ }
}
}
}
@@ -662,16 +771,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
// Remove the consumer from the map
- BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
if (consumer != null)
{
- // fixme this isn't right.. needs to check if _queue contains data for this consumer
- if (consumer.isAutoClose()) // && _queue.isEmpty())
- {
- consumer.closeWhenNoMessages(true);
- }
-
- if (!consumer.isNoConsume())
+ if (!consumer.isNoConsume()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -687,7 +790,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_dispatcher.rejectPending(consumer);
}
- else
+ else // Queue Browser
{
// Just close the consumer
// fixme the CancelOK is being processed before the arriving messages..
@@ -695,13 +798,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// has yet to receive before the close comes in.
// consumer.markClosed();
+
+
+
+ if (consumer.isAutoClose())
+ { // There is a small window where the message is between the two queues in the dispatcher.
+ if (consumer.isClosed())
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing consumer:" + consumer.debugIdentity());
+ }
+
+ deregisterConsumer(consumer);
+
+ }
+ else
+ {
+ _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ }
+ }
}
}
- else
- {
- _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
- }
-
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -744,6 +862,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -761,6 +889,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
messageSelector, null, false, false);
}
+
+ public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true,
+ messageSelector, null, false, false);
+ }
+
+
public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive,
String selector) throws JMSException
{
@@ -925,7 +1064,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
- return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic);
+ return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic);
}
public Queue createQueue(String queueName) throws JMSException
@@ -1089,9 +1228,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
}
+
/**
* Creates a non-durable subscriber with a message selector
*
@@ -1109,7 +1249,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
AMQTopic dest = checkValidTopic(topic);
// AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
+ return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1276,15 +1416,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+ "] received in session with channel id " + _channelId);
}
- if (message.getDeliverBody() == null)
+ if (message.isDeliverMessage())
{
- // Return of the bounced message.
- returnBouncedMessage(message);
+ _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
+ _queue.add(message);
}
else
{
- _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag());
- _queue.add(message);
+ // Return of the bounced message.
+ returnBouncedMessage(message);
}
}
@@ -1393,9 +1533,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag());
+ _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag());
}
rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue);
@@ -1403,9 +1543,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(AbstractJMSMessage message, boolean requeue)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
+ _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
}
rejectMessage(message.getDeliveryTag(), requeue);
@@ -1638,11 +1778,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
JMSException ex = new JMSException("Error registering consumer: " + e);
- if (_logger.isDebugEnabled())
- {
- e.printStackTrace();
- }
-
ex.setLinkedException(e);
throw ex;
}
@@ -1666,7 +1801,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- if (_consumers.remove(consumer.getConsumerTag()) != null)
+ if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
{
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
@@ -2063,8 +2198,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
{
+ int tagId = _nextTag++;
// need to generate a consumer tag on the client so we can exploit the nowait flag
- AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+ AMQShortString tag = new AMQShortString(Integer.toString(tagId));
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
@@ -2084,7 +2220,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
- _consumers.put(tag, consumer);
+ _consumers.put(tagId, consumer);
try
{
@@ -2112,7 +2248,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
catch (AMQException e)
{
// clean-up the map in the event of an error
- _consumers.remove(tag);
+ _consumers.remove(tagId);
throw e;
}
}
@@ -2148,6 +2284,73 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
+
+ /**
+ * Returns the number of messages currently queued for the given destination.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqd The destination to be checked
+ *
+ * @return the number of queued messages.
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ */
+ public long getQueueDepth(final AMQDestination amqd)
+ throws AMQException
+ {
+
+ class QueueDeclareOkHandler extends SpecificMethodFrameListener
+ {
+
+ private long _messageCount;
+ private long _consumerCount;
+
+ public QueueDeclareOkHandler()
+ {
+ super(getChannelId(), QueueDeclareOkBody.class);
+ }
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+ {
+ boolean matches = super.processMethod(channelId, frame);
+ if (matches)
+ {
+ QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+ _messageCount = declareOk.getMessageCount();
+ _consumerCount = declareOk.getConsumerCount();
+ }
+ return matches;
+ }
+
+ }
+
+ return new FailoverNoopSupport<Long, AMQException>(
+ new FailoverProtectedOperation<Long, AMQException>()
+ {
+ public Long execute() throws AMQException, FailoverException
+ {
+
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null).generateFrame(_channelId);
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+
+ return okHandler._messageCount;
+ }
+ }, _connection).execute();
+
+ }
+
+
+
/**
* Declares the named exchange and type of exchange.
*
@@ -2595,6 +2798,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_ticket = ticket;
}
+ public void setFlowControl(final boolean active)
+ {
+ _flowControl.setFlowControl(active);
+ }
+
+
+ public void checkFlowControl() throws InterruptedException
+ {
+ synchronized(_flowControl)
+ {
+ while(!_flowControl.getFlowControl())
+ {
+ _flowControl.wait();
+ }
+ }
+
+ }
+
+
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
private class Dispatcher extends Thread
{
@@ -2732,7 +2954,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait(2000);
}
- if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())
+ if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ && (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get()))
{
rejectMessage(message, true);
}
@@ -2786,10 +3009,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.getDeliverBody() != null)
+ final BasicDeliverBody deliverBody = message.getDeliverBody();
+ if (deliverBody != null)
{
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag());
+ _consumers.get(deliverBody.getConsumerTag().toIntValue());
if ((consumer == null) || consumer.isClosed())
{
@@ -2798,13 +3022,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (consumer == null)
{
_dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().getDeliveryTag() + "] from queue "
- + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)...");
+ + deliverBody.getDeliveryTag() + "] from queue "
+ + deliverBody.getConsumerTag() + " )without a handler - rejecting(requeue)...");
}
else
{
_dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
- + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer("
+ + deliverBody.getDeliveryTag() + "] from queue " + " consumer("
+ consumer.debugIdentity() + ") is closed rejecting(requeue)...");
}
}
@@ -2816,7 +3040,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
else
{
- consumer.notifyMessage(message, _channelId);
+ consumer.notifyMessage(message);
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 610e0109b1..efbce6033b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
@@ -42,6 +38,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -53,13 +50,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
/** The connection being used by this consumer */
- private AMQConnection _connection;
+ private final AMQConnection _connection;
- private String _messageSelector;
+ private final String _messageSelector;
- private boolean _noLocal;
+ private final boolean _noLocal;
- private AMQDestination _destination;
+ private final AMQDestination _destination;
/** When true indicates that a blocking receive call is in progress */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
@@ -70,7 +67,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private AMQShortString _consumerTag;
/** We need to know the channel id when constructing frames */
- private int _channelId;
+ private final int _channelId;
/**
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -78,45 +75,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private final ArrayBlockingQueue _synchronousQueue;
- private MessageFactoryRegistry _messageFactory;
+ private final MessageFactoryRegistry _messageFactory;
private final AMQSession _session;
- private AMQProtocolHandler _protocolHandler;
+ private final AMQProtocolHandler _protocolHandler;
/** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
- private FieldTable _rawSelectorFieldTable;
+ private final FieldTable _rawSelectorFieldTable;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchHigh;
+ private final int _prefetchHigh;
/**
* We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchLow;
+ private final int _prefetchLow;
/** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */
- private boolean _exclusive;
+ private final boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
- private int _acknowledgeMode;
-
- /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
+ private final int _acknowledgeMode;
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
@@ -133,10 +121,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
*/
- private boolean _autoClose;
- private boolean _closeWhenNoMessages;
+ private final boolean _autoClose;
- private boolean _noConsume;
+ private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
@@ -156,7 +143,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
- _acknowledgeMode = acknowledgeMode;
+
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
@@ -166,6 +153,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
}
public AMQDestination getDestination()
@@ -253,10 +244,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
switch (_acknowledgeMode)
{
- case Session.DUPS_OK_ACKNOWLEDGE:
- _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag());
- _receivedDeliveryTags.add(msg.getDeliveryTag());
- break;
case Session.CLIENT_ACKNOWLEDGE:
_unacknowledgedDeliveryTags.add(msg.getDeliveryTag());
@@ -269,7 +256,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
else
{
- _logger.info("Recording tag for commit:" + msg.getDeliveryTag());
_receivedDeliveryTags.add(msg.getDeliveryTag());
}
@@ -284,8 +270,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*
* @return boolean if the acquisition was successful
*
- * @throws JMSException
- * @throws InterruptedException
+ * @throws JMSException if a listener has already been set or another thread is receiving
+ * @throws InterruptedException if interrupted
*/
private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
@@ -372,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted acquire: " + e);
if (isClosed())
{
return null;
@@ -383,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = null;
if (l > 0)
{
@@ -400,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted poll: " + e);
if (isClosed())
{
return null;
@@ -418,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted take: " + e);
if (isClosed())
{
return null;
@@ -440,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private boolean closeOnAutoClose() throws JMSException
- {
- if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
- {
- close(false);
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -482,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -507,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* We can get back either a Message or an exception from the queue. This method examines the argument and deals with
* it by throwing it (if an exception) or returning it (in any other case).
*
- * @param o
+ * @param o the object to return or throw
*
* @return a message only if o is a Message
*
@@ -527,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw e;
}
+ else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -540,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- // synchronized (_closed)
-
if (_logger.isInfoEnabled())
{
_logger.info("Closing consumer:" + debugIdentity());
}
- synchronized (_connection.getFailoverMutex())
+ if (!_closed.getAndSet(true))
{
- if (!_closed.getAndSet(true))
+ if (_logger.isDebugEnabled())
{
- if (_logger.isTraceEnabled())
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ if (_closedStack != null)
{
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- if (_closedStack != null)
- {
- _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
- }
- else
- {
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
}
+ }
- if (sendClose)
+ if (sendClose)
+ {
+ // The Synchronized block only needs to protect network traffic.
+ synchronized (_connection.getFailoverMutex())
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
@@ -578,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
-
}
catch (AMQException e)
{
@@ -589,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
- else
- {
- // //fixme this probably is not right
- // if (!isNoConsume())
- { // done in BasicCancelOK Handler but not sending one so just deregister.
- deregisterConsumer();
- }
+ }
+ else
+ {
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
}
+ }
- if ((_messageListener != null) && _receiving.get())
+ // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
+ // so we need to let it know it is time to close.
+ if ((_messageListener != null) && _receiving.get())
+ {
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Interrupting thread: " + _receivingThread);
- }
-
- _receivingThread.interrupt();
+ _logger.info("Interrupting thread: " + _receivingThread);
}
+
+ _receivingThread.interrupt();
}
}
}
@@ -621,14 +589,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_closed.set(true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():"
+ _logger.debug(_consumerTag + " markClosed():"
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
}
else
{
@@ -645,10 +613,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
*/
- void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ void notifyMessage(UnprocessedMessage messageFrame)
{
+ if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ return;
+ }
+
final boolean debug = _logger.isDebugEnabled();
if (debug)
@@ -658,10 +631,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
+ final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
+
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
- messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(),
- messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+ deliverBody.getRedelivered(),
+ deliverBody.getExchange(),
+ deliverBody.getRoutingKey(),
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{
@@ -673,11 +651,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// if (!_closed.get())
{
- jmsMessage.setConsumer(this);
-
preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage);
}
// else
// {
@@ -700,11 +676,33 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- /**
- * @param jmsMessage this message has already been processed so can't redo preDeliver
- * @param channelId
- */
- public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ /** @param closeMessage this message signals that we should close the browser */
+ public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close this consumer.
+ // Though an AutoClose consumer with message listener is quite odd...
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," +
+ "but we shouldn't have close yet");
+ }
+ }
+ }
+
+ /** @param jmsMessage this message has already been processed so can't redo preDeliver */
+ public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
@@ -773,28 +771,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
break;
case Session.DUPS_OK_ACKNOWLEDGE:
- /*( if (++_outstanding >= _prefetchHigh)
- {
- _dups_ok_acknowledge_send = true;
- }
-
- //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur.
- if (_outstanding < _prefetchLow)
- {
- _dups_ok_acknowledge_send = false;
- }
-
- if (_dups_ok_acknowledge_send)
- {
- if (!_session.isInRecovery())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
- _outstanding = 0;
- }
- }
-
- break;
- */
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
if (!_session.isInRecovery())
@@ -845,14 +821,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// synchronized (_closed)
{
_closed.set(true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():"
+ _logger.debug(_consumerTag + " notifyError():"
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+ _logger.debug(_consumerTag + " previously" + _closedStack.toString());
}
else
{
@@ -948,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _noConsume;
}
- public void closeWhenNoMessages(boolean b)
- {
- _closeWhenNoMessages = b;
-
- if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
- {
- _closed.set(true);
- _receivingThread.interrupt();
- }
-
- }
-
public void rollback()
{
clearUnackedMessages();
@@ -982,9 +946,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (tag != null)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+ _logger.debug("Rejecting tag from _receivedDTs:" + tag);
}
_session.rejectMessage(tag, true);
@@ -1025,9 +989,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_session.rejectMessage(((AbstractJMSMessage) o), true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
+ _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
iterator.remove();
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 7e96fb537c..ae71846870 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -538,6 +538,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ try
+ {
+ _session.checkFlowControl();
+ }
+ catch (InterruptedException e)
+ {
+ JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+
_protocolHandler.writeFrame(compositeFrame, wait);
if (message != origMessage)
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 60f95bfe33..a944ff6bec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -120,13 +120,17 @@ public class FailoverHandler implements Runnable
// We wake up listeners. If they can handle failover, they will extend the
// FailoverRetrySupport class and will in turn block on the latch until failover
// has completed before retrying the operation.
- _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start"));
+ _amqProtocolHandler.notifyFailoverStarting();
// Since failover impacts several structures we protect them all with a single mutex. These structures
// are also in child objects of the connection. This allows us to manipulate them without affecting
// client code which runs in a separate thread.
synchronized (_amqProtocolHandler.getConnection().getFailoverMutex())
{
+ //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so
+ // we can clear the exception.
+ _amqProtocolHandler.failoverInProgress();
+
// We switch in a new state manager temporarily so that the interaction to get to the "connection open"
// state works, without us having to terminate any existing "state waiters". We could theoretically
// have a state waiter waiting until the connection is closed for some reason. Or in future we may have
@@ -138,6 +142,9 @@ public class FailoverHandler implements Runnable
_logger.info("Failover process veto-ed by client");
_amqProtocolHandler.setStateManager(existingStateManager);
+
+ //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that
+ // prompted the failover event.
if (_host != null)
{
_amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException(
diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
index 120a07f0fc..e756d7baf9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
@@ -122,6 +122,13 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup
{
_log.debug("Failover exception caught during operation: " + e, e);
}
+ catch (IllegalStateException e)
+ {
+ if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
+ {
+ throw e;
+ }
+ }
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
index 49c8a83833..d05e99d210 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +45,8 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic
throws AMQException
{
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
+ final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body);
_logger.debug("New JmsDeliver method received");
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
index 428d366f07..2ebc9288c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
@@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,9 +46,9 @@ public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, i
{
_logger.debug("New JmsBounce method received");
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage msg = new UnprocessedMessage(channelId, body);
+ final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedBouncedMessage(body);
- session.unprocessedMessageReceived(msg);
+ session.unprocessedMessageReceived(channelId, msg);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 8c8814e9b7..a580a6466d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -104,6 +104,11 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
}
// fixme why is this only done when the close is expected...
// should the above forced closes not also cause a close?
+ // ----------
+ // Closing the session only when it is expected allows the errors to be processed
+ // Calling this here will prevent failover. So we should do this for all exceptions
+ // that should never cause failover. Such as authentication errors.
+
session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
new file mode 100644
index 0000000000..b47fe751d6
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
@@ -0,0 +1,54 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.AMQException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowMethodHandler.class);
+ private static final ChannelFlowMethodHandler _instance = new ChannelFlowMethodHandler();
+
+ public static ChannelFlowMethodHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ChannelFlowMethodHandler()
+ { }
+
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+ throws AMQException
+ {
+
+ final AMQProtocolSession session = stateManager.getProtocolSession();
+ session.setFlowControl(channelId, body.getActive());
+ }
+
+
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
index 4d805cf123..fdcb493f38 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
@@ -73,10 +73,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- if (errorCode == AMQConstant.NOT_ALLOWED)
+ if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
{
- _logger.info("Authentication Error:" + Thread.currentThread().getName());
+ _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName());
+ // todo ritchiem : Why do this here when it is going to be done in the finally block?
session.closeProtocolSession();
// todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
@@ -98,6 +99,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co
session.closeProtocolSession();
+ // ritchiem: Doing this though will cause any waiting connection start to be released without being able to
+ // see what the cause was.
stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index b029770946..6d4c61fb29 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -55,8 +55,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
protected boolean _changedData;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
- private BasicMessageConsumer _consumer;
- private boolean _strictAMQP;
+
+ private static final boolean STRICT_AMQP_COMPLIANCE =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
protected AbstractJMSMessage(ByteBuffer data)
{
@@ -72,8 +73,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
_changedData = (data == null);
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
- _strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -121,7 +120,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
if (getContentHeaderProperties().getMessageIdAsString() == null)
{
- getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
+ StringBuilder b = new StringBuilder(39);
+ b.append("ID:");
+ b.append(UUID.randomUUID());
+ getContentHeaderProperties().setMessageId(b.toString());
}
return getContentHeaderProperties().getMessageIdAsString();
@@ -301,7 +303,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -311,7 +313,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public boolean getBooleanProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -321,7 +323,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte getByteProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -331,7 +333,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -341,7 +343,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public short getShortProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -351,7 +353,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public int getIntProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -361,7 +363,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public long getLongProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -371,7 +373,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public float getFloatProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -381,7 +383,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public double getDoubleProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -392,19 +394,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public String getStringProperty(String propertyName) throws JMSException
{
- if (propertyName.startsWith("JMSX"))
+ //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
{
- //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
- if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
- {
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
- }
-
- return null;
+ return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
}
else
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -425,7 +422,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -436,7 +433,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -447,7 +444,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setByteProperty(String propertyName, byte b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -458,7 +455,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -469,7 +466,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setShortProperty(String propertyName, short i) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -487,7 +484,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setLongProperty(String propertyName, long l) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -498,7 +495,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setFloatProperty(String propertyName, float f) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -509,7 +506,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -691,9 +688,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
}
}
- public void setConsumer(BasicMessageConsumer basicMessageConsumer)
- {
- _consumer = basicMessageConsumer;
- }
-
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
index a70acbabbe..d8fe964b85 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
@@ -55,7 +55,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- populateMapFromData();
+ if(data != null)
+ {
+ populateMapFromData();
+ }
+
}
JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
@@ -76,7 +80,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm
public String toBodyString() throws JMSException
{
- return _map.toString();
+ return _map == null ? "" : _map.toString();
}
public AMQShortString getMimeTypeAsShortString()
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index 5b199f2478..18157adc34 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,10 +24,20 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -36,33 +46,15 @@ import org.apache.qpid.framing.ContentHeaderBody;
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage
+public abstract class UnprocessedMessage
{
- private long _bytesReceived = 0;
+ private long _bytesReceived = 0L;
- private final BasicDeliverBody _deliverBody;
- private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
- private final int _channelId;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
- {
- _deliverBody = deliverBody;
- _channelId = channelId;
- _bounceBody = null;
- }
-
-
- public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
- {
- _deliverBody = null;
- _channelId = channelId;
- _bounceBody = bounceBody;
- }
-
public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
{
@@ -96,22 +88,11 @@ public class UnprocessedMessage
return _bytesReceived == getContentHeader().bodySize;
}
- public BasicDeliverBody getDeliverBody()
- {
- return _deliverBody;
- }
-
- public BasicReturnBody getBounceBody()
- {
- return _bounceBody;
- }
- public int getChannelId()
- {
- return _channelId;
- }
+ abstract public BasicDeliverBody getDeliverBody();
+ abstract public BasicReturnBody getBounceBody();
public ContentHeaderBody getContentHeader()
{
@@ -128,4 +109,188 @@ public class UnprocessedMessage
return _bodies;
}
+ abstract public boolean isDeliverMessage();
+
+ public static final class UnprocessedDeliverMessage extends UnprocessedMessage
+ {
+ private final BasicDeliverBody _body;
+
+ public UnprocessedDeliverMessage(final BasicDeliverBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return _body;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return true;
+ }
+ }
+
+ public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+ {
+ private final BasicReturnBody _body;
+
+ public UnprocessedBouncedMessage(final BasicReturnBody body)
+ {
+ _body = body;
+ }
+
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return null;
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return _body;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
+ public static final class CloseConsumerMessage extends UnprocessedMessage
+ {
+ BasicMessageConsumer _consumer;
+
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ _consumer = consumer;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return new BasicDeliverBody()
+ {
+ // This is the only thing we need to preserve so the correct consumer can be found later.
+ public AMQShortString getConsumerTag()
+ {
+ return _consumer.getConsumerTag();
+ }
+
+ // The Rest of these methods are not used
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean getRedelivered()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ public int getClazz()
+ {
+ return 0;
+ }
+
+ public int getMethod()
+ {
+ return 0;
+ }
+
+ public void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ public byte getFrameType()
+ {
+ return 0;
+ }
+
+ public int getSize()
+ {
+ return 0;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException
+ {
+ }
+
+ public AMQFrame generateFrame(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
+ {
+ return false;
+ }
+ };
+ }
+
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 8a1e78d2e0..3932b098cd 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -30,7 +30,6 @@ import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -55,6 +54,7 @@ import org.apache.qpid.ssl.SSLContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -153,9 +153,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
+
+ /** The last failover exception that occured */
+ private FailoverException _lastFailoverException;
+
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+ /** Default buffer size for pending messages reads */
+ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
+
+ /** Default buffer size for pending messages writes */
+ private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144";
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -209,29 +219,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
catch (RuntimeException e)
{
- e.printStackTrace();
+ _logger.error(e.getMessage(), e);
}
- if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+ if (Boolean.getBoolean("protectio"))
{
try
{
//Add IO Protection Filters
IoFilterChain chain = session.getFilterChain();
- int buf_size = 32768;
- if (session.getConfig() instanceof SocketSessionConfig)
- {
- buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
- }
session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
readfilter.attach(chain);
WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT)));
writefilter.attach(chain);
session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
@@ -355,7 +360,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
- if (cause instanceof AMQConnectionClosedException)
+ if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
// this will attemp failover
@@ -372,8 +377,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
propagateExceptionToWaiters(amqe);
- _connection.exceptionReceived(cause);
}
+ _connection.exceptionReceived(cause);
}
@@ -406,7 +411,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void propagateExceptionToWaiters(Exception e)
{
- getStateManager().error(e);
+
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -418,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ public void notifyFailoverStarting()
+ {
+ // Set the last exception in the sync block to ensure the ordering with add.
+ // either this gets done and the add does the ml.error
+ // or the add completes first and the iterator below will do ml.error
+ synchronized (_frameListeners)
+ {
+ _lastFailoverException = new FailoverException("Failing over about to start");
+ }
+
+ propagateExceptionToWaiters(_lastFailoverException);
+ }
+
+ public void failoverInProgress()
+ {
+ _lastFailoverException = null;
+ }
+
private static int _messageReceivedCount;
public void messageReceived(IoSession session, Object message) throws Exception
@@ -438,78 +461,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
-
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
-
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
-
- exceptionCaught(session, e);
- }
-
- break;
-
- case ContentHeaderBody.TYPE:
-
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
-
- case ContentBody.TYPE:
-
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
-
- case HeartbeatBody.TYPE:
-
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
-
- break;
-
- default:
+ bodyFrame.handle(frame.getChannel(),_protocolSession);
- }
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -527,6 +480,57 @@ public class AMQProtocolHandler extends IoHandlerAdapter
}
}
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ throws AMQException
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
+ }
+
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
+
+ try
+ {
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
+ {
+ //This iterator is safe from the error state as the frame listeners always add before they send so their
+ // will be ready and waiting for this response.
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
+ }
+ catch (AMQException e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
+ }
+
+ exceptionCaught(session, e);
+ }
+
+ }
+
private static int _messagesOut;
public void messageSent(IoSession session, Object message) throws Exception
@@ -612,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
try
{
- _frameListeners.add(listener);
+ synchronized (_frameListeners)
+ {
+ if (_lastFailoverException != null)
+ {
+ throw _lastFailoverException;
+ }
+
+ _frameListeners.add(listener);
+ }
_protocolSession.writeFrame(frame);
AMQMethodEvent e = listener.blockForFrame(timeout);
@@ -621,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
- catch (AMQException e)
- {
- throw e;
- }
finally
{
// If we don't removeKey the listener then no-one will
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index b48adbdb08..6a5cc62bfc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
protected final AMQProtocolHandler _protocolHandler;
- /** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+ private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
protected int _queueId = 1;
@@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ private final AMQConnection _connection;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = message;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.put(channelId, message);
+ }
}
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId);
+
+
if (msg == null)
{
throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
@@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
- public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+ public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
{
- UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage msg;
+ final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+ if(fastAccess)
+ {
+ msg = _channelId2UnprocessedMsgArray[channelId];
+ }
+ else
+ {
+ msg = _channelId2UnprocessedMsgMap.get(channelId);
+ }
+
if (msg == null)
{
throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
@@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
if (msg.getContentHeader() == null)
{
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if(fastAccess)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
throw new AMQException("Error: received content body without having received a ContentHeader frame first");
}
@@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+
+ }
+
/**
* Deliver a message to the appropriate session, removing the unprocessed message from our map
*
@@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
}
protected AMQSession getSession(int channelId)
@@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_methodDispatcher = methodDispatcher;
}
+
+ public void setFlowControl(final int channelId, final boolean active)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setFlowControl(active);
+ }
+
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index b6baefe1b0..2e6a4beb83 100644
--- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -37,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
* there is a separate state manager.
*/
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -52,9 +52,9 @@ public class AMQStateManager implements AMQMethodListener
* AMQFrame.
*/
- private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+
private final Object _stateLock = new Object();
- private static final long MAXIMUM_STATE_WAIT_TIME = 30000L;
+ private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
public AMQStateManager()
{
@@ -91,19 +91,6 @@ public class AMQStateManager implements AMQMethodListener
}
}
- public void error(Exception e)
- {
- _logger.debug("State manager receive error notification: " + e);
- synchronized (_stateListeners)
- {
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.error(e);
- }
- }
- }
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 5482e48699..b2f7ae8395 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class SocketTransportConnection implements ITransportConnection
{
@@ -83,8 +87,34 @@ public class SocketTransportConnection implements ITransportConnection
_logger.info("send-buffer-size = " + scfg.getSendBufferSize());
scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
_logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
- final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
- _logger.info("Attempting connection to " + address);
+
+ final InetSocketAddress address;
+
+ if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+ {
+ address = null;
+
+ Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket://<SocketID>' transport:" + brokerDetail);
+ }
+ }
+ else
+ {
+ address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
+ _logger.info("Attempting connection to " + address);
+ }
+
+
ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index e8a220f5e9..7ae2ddf66c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.client.transport;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.net.Socket;
+
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
@@ -54,12 +58,25 @@ public class TransportConnection
private static final int TCP = 0;
private static final int VM = 1;
+ private static final int SOCKET = 2;
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
- public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
+
+ public static void registerOpenSocket(String socketID, Socket openSocket)
+ {
+ _openSocketRegister.put(socketID, openSocket);
+ }
+
+ public static Socket removeOpenSocket(String socketID)
+ {
+ return _openSocketRegister.remove(socketID);
+ }
+
+ public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -68,7 +85,7 @@ public class TransportConnection
throw new AMQNoTransportForProtocolException(details);
}
- if (transport == _currentInstance)
+ /* if (transport == _currentInstance)
{
if (transport == VM)
{
@@ -83,19 +100,29 @@ public class TransportConnection
}
}
- _currentInstance = transport;
+ _currentInstance = transport;*/
+ ITransportConnection instance;
switch (transport)
{
-
+ case SOCKET:
+ instance =
+ new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
+ break;
case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
SocketConnector result;
// FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
+ if (Boolean.getBoolean("qpidnio"))
{
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
? "Qpid NIO is new default"
@@ -117,16 +144,23 @@ public class TransportConnection
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
break;
}
+ default:
+ throw new AMQNoTransportForProtocolException(details);
}
- return _instance;
+ return instance;
}
private static int getTransport(String transport)
{
+ if (transport.equals(BrokerDetails.SOCKET))
+ {
+ return SOCKET;
+ }
+
if (transport.equals(BrokerDetails.TCP))
{
return TCP;
@@ -283,11 +317,14 @@ public class TransportConnection
public static void killAllVMBrokers()
{
_logger.info("Killing all VM Brokers");
- _acceptor.unbindAll();
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
synchronized (_inVmPipeAddress)
{
_inVmPipeAddress.clear();
- }
+ }
_acceptor = null;
_currentInstance = -1;
_currentVMPort = -1;
@@ -302,6 +339,8 @@ public class TransportConnection
{
_logger.info("Killing VM Broker:" + port);
_inVmPipeAddress.remove(port);
+ // This does need to be sychronized as otherwise mina can hang
+ // if a new connection is made
_acceptor.unbind(pipe);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 603b0834a3..8b353a7264 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -34,6 +34,7 @@ public interface BrokerDetails
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
public static final int DEFAULT_PORT = 5672;
+ public static final String SOCKET = "socket";
public static final String TCP = "tcp";
public static final String VM = "vm";
diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
index 6ec883ff0b..8e3ccc3b02 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
@@ -34,7 +34,6 @@ public class FailoverPolicy
private static final long MINUTE = 60000L;
private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE;
- private static final long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE;
private FailoverMethod[] _methods = new FailoverMethod[1];
@@ -161,16 +160,7 @@ public class FailoverPolicy
}
else
{
- if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT)
- {
- _logger.info("Failover timeout");
-
- return false;
- }
- else
- {
- _lastMethodTime = now;
- }
+ _lastMethodTime = now;
}
}
else
diff --git a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
index b91fc2d960..b830c377b8 100644
--- a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
@@ -50,4 +50,8 @@ public interface MessageProducer extends javax.jms.MessageProducer
void send(Destination destination, Message message, int deliveryMode,
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException;
+
+ void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException;
+
}
diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
index a246352d8b..2fe01fc126 100644
--- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
+++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java
@@ -183,7 +183,7 @@ public class TestLargePublisher
}
catch (UnknownHostException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
catch (AMQException e)
{
diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
index 33891142b5..37b4ff1498 100644
--- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
+++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java
@@ -133,7 +133,7 @@ public class TestPublisher
}
catch (JMSException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
}
@@ -163,7 +163,7 @@ public class TestPublisher
}
catch (UnknownHostException e)
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
catch (AMQException e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index d2a7ba301b..09886f4736 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -173,7 +173,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.trace("Message:" + m);
+ _logger.debug("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
m.getStringProperty("TempQueue"));
@@ -292,8 +292,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
((AMQMessage) m).getPropertyHeaders().containsKey("void"));
//JMSXUserID
- Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
- m.getStringProperty("JMSXUserID"));
+ if (m.getStringProperty("JMSXUserID") != null)
+ {
+ Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
+ m.getStringProperty("JMSXUserID"));
+ }
}
received.clear();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
index 40c712c1c9..d05ed7ca73 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
@@ -21,18 +21,20 @@
package org.apache.qpid.test.unit.basic;
import junit.framework.TestCase;
-
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.transport.TransportConnection;
-
+import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -46,12 +48,12 @@ public class SelectorTest extends TestCase implements MessageListener
private AMQSession _session;
private int count;
public String _connectionString = "vm://:1";
+ private static final String INVALID_SELECTOR = "Cost LIKE 5";
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
}
protected void tearDown() throws Exception
@@ -60,19 +62,19 @@ public class SelectorTest extends TestCase implements MessageListener
TransportConnection.killAllVMBrokers();
}
- private void init(AMQConnection connection) throws Exception
+ private void init(AMQConnection connection) throws JMSException
{
init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true));
}
- private void init(AMQConnection connection, AMQDestination destination) throws Exception
+ private void init(AMQConnection connection, AMQDestination destination) throws JMSException
{
_connection = connection;
_destination = destination;
connection.start();
String selector = null;
- // selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
+ selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
_session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -80,13 +82,17 @@ public class SelectorTest extends TestCase implements MessageListener
_session.createConsumer(destination, selector).setMessageListener(this);
}
- public synchronized void test() throws JMSException, InterruptedException
+ public synchronized void test() throws JMSException, InterruptedException, URLSyntaxException, AMQException
{
try
{
+
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+
Message msg = _session.createTextMessage("Message");
msg.setJMSPriority(1);
msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen", "wibble");
msg.setJMSType("Special");
_logger.info("Sending Message:" + msg);
@@ -106,10 +112,147 @@ public class SelectorTest extends TestCase implements MessageListener
// throw new RuntimeException("Did not get message!");
}
}
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ System.out.println("SUCCESS!!");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ fail("Wrong exception");
+ }
+ catch (AMQException e)
+ {
+ _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ fail("Wrong exception");
+ }
+
+ finally
+ {
+ if (_session != null)
+ {
+ _session.close();
+ }
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ }
+
+
+ public void testInvalidSelectors()
+ {
+ Connection connection = null;
+
+ try
+ {
+ connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test");
+ _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ }
+ catch (JMSException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail("Error:" + e.getMessage());
+ }
+
+ //Try Creating a Browser
+ try
+ {
+ _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ //Try Creating a Consumer
+ try
+ {
+ _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
+ //Try Creating a Receiever
+ try
+ {
+ _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR);
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage());
+ if (!(e instanceof InvalidSelectorException))
+ {
+ fail("Wrong exception:" + e.getMessage());
+ }
+ else
+ {
+ _logger.debug("SUCCESS!!");
+ }
+ }
+
finally
{
- _session.close();
- _connection.close();
+ if (_session != null)
+ {
+ try
+ {
+ _session.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Error cleaning up:" + e.getMessage());
+ }
+ }
+ if (_connection != null)
+ {
+ try
+ {
+ _connection.close();
+ }
+ catch (JMSException e)
+ {
+ fail("Error cleaning up:" + e.getMessage());
+ }
+ }
}
}
@@ -128,9 +271,29 @@ public class SelectorTest extends TestCase implements MessageListener
public static void main(String[] argv) throws Exception
{
SelectorTest test = new SelectorTest();
- test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0];
- test.setUp();
- test.test();
+ test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0];
+
+ try
+ {
+ while (true)
+ {
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.setUp();
+ }
+ test.test();
+
+ if (test._connectionString.contains("vm://:1"))
+ {
+ test.tearDown();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ }
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index 56394fee27..4b4df7e5c8 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.jms.Session;
import junit.framework.TestCase;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.QueueSession;
import javax.jms.TopicSession;
@@ -55,25 +56,30 @@ public class ConnectionTest extends TestCase
TransportConnection.killVMBroker(1);
}
- public void testSimpleConnection()
+ public void testSimpleConnection() throws Exception
{
+ AMQConnection conn = null;
try
{
- AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
- conn.close();
+ conn = new AMQConnection(_broker, "guest", "guest", "fred", "test");
}
catch (Exception e)
{
fail("Connection to " + _broker + " should succeed. Reason: " + e);
}
+ finally
+ {
+ conn.close();
+ }
}
- public void testDefaultExchanges()
+ public void testDefaultExchanges() throws Exception
{
+ AMQConnection conn = null;
try
{
- AMQConnection conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='"
+ conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='"
+ _broker
+ "?retries='1''&defaultQueueExchange='test.direct'"
+ "&defaultTopicExchange='test.topic'"
@@ -106,37 +112,53 @@ public class ConnectionTest extends TestCase
topicSession.close();
-
- conn.close();
}
catch (Exception e)
{
fail("Connection to " + _broker + " should succeed. Reason: " + e);
}
+ finally
+ {
+ conn.close();
+ }
}
- //fixme AMQAuthenticationException is not propogaged
- public void PasswordFailureConnection() throws Exception
+ //See QPID-771
+ public void testPasswordFailureConnection() throws Exception
{
+ AMQConnection conn = null;
try
{
- new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''");
+ conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''");
fail("Connection should not be established password is wrong.");
}
catch (AMQException amqe)
{
- if (!(amqe instanceof AMQAuthenticationException))
+ if (amqe.getCause().getClass() == Exception.class)
{
- fail("Correct exception not thrown. Excpected 'AMQAuthenticationException' got: " + amqe);
+ System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure.");
+ return;
+ }
+
+ assertEquals("Exception was wrong type", JMSException.class, amqe.getCause().getClass());
+ Exception linked = ((JMSException) amqe.getCause()).getLinkedException();
+ assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
}
}
}
public void testConnectionFailure() throws Exception
{
+ AMQConnection conn = null;
try
{
- new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''");
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''");
fail("Connection should not be established");
}
catch (AMQException amqe)
@@ -146,14 +168,22 @@ public class ConnectionTest extends TestCase
fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe);
}
}
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
public void testUnresolvedHostFailure() throws Exception
{
+ AMQConnection conn = null;
try
{
- new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''");
+ conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''");
fail("Connection should not be established");
}
catch (AMQException amqe)
@@ -163,6 +193,38 @@ public class ConnectionTest extends TestCase
fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe);
}
}
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testUnresolvedVirtualHostFailure() throws Exception
+ {
+ AMQConnection conn = null;
+ try
+ {
+ conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
public void testClientIdCannotBeChanged() throws Exception
@@ -178,13 +240,27 @@ public class ConnectionTest extends TestCase
{
// PASS
}
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
}
public void testClientIdIsPopulatedAutomatically() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
null, "test");
- assertNotNull(connection.getClientID());
+ try
+ {
+ assertNotNull(connection.getClientID());
+ }
+ finally
+ {
+ connection.close();
+ }
}
public static junit.framework.Test suite()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
index 6c872a0e10..d90873a6a7 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
@@ -510,6 +510,25 @@ public class ConnectionURLTest extends TestCase
}
+ public void testSocketProtocol() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'";
+
+ try
+ {
+ AMQConnectionURL curl = new AMQConnectionURL(url);
+ assertNotNull(curl);
+ assertEquals(1, curl.getBrokerCount());
+ assertNotNull(curl.getBrokerDetails(0));
+ assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport());
+ assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost());
+ assertEquals("URL does not toString as expected", url, curl.toString());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ }
public static junit.framework.Test suite()
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
index 5a61480f6a..ee110e7932 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
@@ -29,8 +29,8 @@ import org.apache.qpid.client.transport.TransportConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import uk.co.thebadgerset.junit.concurrency.TestRunnable;
-import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator;
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
import javax.jms.Connection;
import javax.jms.Message;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
index 3012909daa..f2655adc98 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
@@ -38,6 +38,7 @@ import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
+import java.util.Enumeration;
/**
* @author Apache Software Foundation
@@ -85,6 +86,12 @@ public class JMSPropertiesTest extends TestCase
sentMsg.setJMSType(JMS_TYPE);
sentMsg.setJMSReplyTo(JMS_REPLY_TO);
+ String JMSXGroupID_VALUE = "group";
+ sentMsg.setStringProperty("JMSXGroupID", JMSXGroupID_VALUE);
+
+ int JMSXGroupSeq_VALUE = 1;
+ sentMsg.setIntProperty("JMSXGroupSeq", JMSXGroupSeq_VALUE);
+
// send it
producer.send(sentMsg);
@@ -101,6 +108,30 @@ public class JMSPropertiesTest extends TestCase
// assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode());
assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType());
assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo());
+ assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:"));
+
+ //Validate that the JMSX values are correct
+ assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID"));
+ assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq"));
+
+ boolean JMSXGroupID_Available = false;
+ boolean JMSXGroupSeq_Available = false;
+ Enumeration props = con.getMetaData().getJMSXPropertyNames();
+ while (props.hasMoreElements())
+ {
+ String name = (String) props.nextElement();
+ if (name.equals("JMSXGroupID"))
+ {
+ JMSXGroupID_Available = true;
+ }
+ if (name.equals("JMSXGroupSeq"))
+ {
+ JMSXGroupSeq_Available = true;
+ }
+ }
+
+ assertTrue("JMSXGroupID not available.",JMSXGroupID_Available);
+ assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available);
con.close();
}