diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-21 13:55:26 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-21 13:55:26 +0000 |
commit | 1d69ea16b30dd67ac32683e6dc512f4c58ef93f1 (patch) | |
tree | b7d326570570b53936c6512d58c465c76244e925 /java/client/src | |
parent | a53cbaad17df415e98f22cc42f2512467936bbc6 (diff) | |
parent | c0c3c38f032200e786cf5a4404cfa40a0c95f5e8 (diff) | |
download | qpid-python-1d69ea16b30dd67ac32683e6dc512f4c58ef93f1.tar.gz |
create branch for broker refactoring
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@650146 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
34 files changed, 2081 insertions, 593 deletions
diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java new file mode 100644 index 0000000000..ab3bc28d83 --- /dev/null +++ b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.transport.socket.nio; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExceptionMonitor; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoConnectorConfig; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.common.support.BaseIoConnector; +import org.apache.mina.common.support.DefaultConnectFuture; +import org.apache.mina.util.NamePreservingRunnable; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.Queue; + +import java.io.IOException; +import java.net.ConnectException; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +/** + * {@link IoConnector} for socket transport (TCP/IP). + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev$, $Date$ + */ +public class ExistingSocketConnector extends BaseIoConnector +{ + /** @noinspection StaticNonFinalField */ + private static volatile int nextId = 0; + + private final Object lock = new Object(); + private final int id = nextId++; + private final String threadName = "SocketConnector-" + id; + private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); + private final Queue connectQueue = new Queue(); + private final SocketIoProcessor[] ioProcessors; + private final int processorCount; + private final Executor executor; + + /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ + private Selector selector; + private Worker worker; + private int processorDistributor = 0; + private int workerTimeout = 60; // 1 min. + private Socket _openSocket = null; + + /** Create a connector with a single processing thread using a NewThreadExecutor */ + public ExistingSocketConnector() + { + this(1, new NewThreadExecutor()); + } + + /** + * Create a connector with the desired number of processing threads + * + * @param processorCount Number of processing threads + * @param executor Executor to use for launching threads + */ + public ExistingSocketConnector(int processorCount, Executor executor) + { + if (processorCount < 1) + { + throw new IllegalArgumentException("Must have at least one processor"); + } + + this.executor = executor; + this.processorCount = processorCount; + ioProcessors = new SocketIoProcessor[processorCount]; + + for (int i = 0; i < processorCount; i++) + { + ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); + } + } + + /** + * How many seconds to keep the connection thread alive between connection requests + * + * @return Number of seconds to keep connection thread alive + */ + public int getWorkerTimeout() + { + return workerTimeout; + } + + /** + * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. + * + * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 + */ + public void setWorkerTimeout(int workerTimeout) + { + if (workerTimeout < 0) + { + throw new IllegalArgumentException("Must be >= 0"); + } + this.workerTimeout = workerTimeout; + } + + public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) + { + return connect(address, null, handler, config); + } + + public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, + IoHandler handler, IoServiceConfig config) + { + /** Changes here from the Mina OpenSocketConnector. + * Ignoreing all address as they are not needed */ + + if (handler == null) + { + throw new NullPointerException("handler"); + } + + + if (config == null) + { + config = getDefaultConfig(); + } + + if (_openSocket == null) + { + throw new IllegalArgumentException("Specifed Socket not active"); + } + + boolean success = false; + + try + { + DefaultConnectFuture future = new DefaultConnectFuture(); + newSession(_openSocket, handler, config, future); + success = true; + return future; + } + catch (IOException e) + { + return DefaultConnectFuture.newFailedFuture(e); + } + finally + { + if (!success && _openSocket != null) + { + try + { + _openSocket.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + public IoServiceConfig getDefaultConfig() + { + return defaultConfig; + } + + /** + * Sets the config this connector will use by default. + * + * @param defaultConfig the default config. + * + * @throws NullPointerException if the specified value is <code>null</code>. + */ + public void setDefaultConfig(SocketConnectorConfig defaultConfig) + { + if (defaultConfig == null) + { + throw new NullPointerException("defaultConfig"); + } + this.defaultConfig = defaultConfig; + } + + private synchronized void startupWorker() throws IOException + { + if (worker == null) + { + selector = Selector.open(); + worker = new Worker(); + executor.execute(new NamePreservingRunnable(worker)); + } + } + + private void registerNew() + { + if (connectQueue.isEmpty()) + { + return; + } + + for (; ;) + { + ConnectionRequest req; + synchronized (connectQueue) + { + req = (ConnectionRequest) connectQueue.pop(); + } + + if (req == null) + { + break; + } + + SocketChannel ch = req.channel; + try + { + ch.register(selector, SelectionKey.OP_CONNECT, req); + } + catch (IOException e) + { + req.setException(e); + } + } + } + + private void processSessions(Set keys) + { + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isConnectable()) + { + continue; + } + + SocketChannel ch = (SocketChannel) key.channel(); + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + boolean success = false; + try + { + ch.finishConnect(); + newSession(ch, entry.handler, entry.config, entry); + success = true; + } + catch (Throwable e) + { + entry.setException(e); + } + finally + { + key.cancel(); + if (!success) + { + try + { + ch.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + } + } + } + + keys.clear(); + } + + private void processTimedOutSessions(Set keys) + { + long currentTime = System.currentTimeMillis(); + Iterator it = keys.iterator(); + + while (it.hasNext()) + { + SelectionKey key = (SelectionKey) it.next(); + + if (!key.isValid()) + { + continue; + } + + ConnectionRequest entry = (ConnectionRequest) key.attachment(); + + if (currentTime >= entry.deadline) + { + entry.setException(new ConnectException()); + try + { + key.channel().close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + key.cancel(); + } + } + } + } + + private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + SocketSessionImpl session = new SocketSessionImpl(this, + nextProcessor(), + getListeners(), + config, + socket.getChannel(), + handler, + socket.getRemoteSocketAddress()); + + newSession(session, config, connectFuture); + } + + private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + + { + SocketSessionImpl session = new SocketSessionImpl(this, + nextProcessor(), + getListeners(), + config, + ch, + handler, + ch.socket().getRemoteSocketAddress()); + + newSession(session, config, connectFuture); + } + + private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture) + throws IOException + { + try + { + getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); + config.getThreadModel().buildFilterChain(session.getFilterChain()); + } + catch (Throwable e) + { + throw (IOException) new IOException("Failed to create a session.").initCause(e); + } + session.getIoProcessor().addNew(session); + connectFuture.setSession(session); + } + + private SocketIoProcessor nextProcessor() + { + return ioProcessors[processorDistributor++ % processorCount]; + } + + public void setOpenSocket(Socket openSocket) + { + _openSocket = openSocket; + } + + private class Worker implements Runnable + { + private long lastActive = System.currentTimeMillis(); + + public void run() + { + Thread.currentThread().setName(ExistingSocketConnector.this.threadName); + + for (; ;) + { + try + { + int nKeys = selector.select(1000); + + registerNew(); + + if (nKeys > 0) + { + processSessions(selector.selectedKeys()); + } + + processTimedOutSessions(selector.keys()); + + if (selector.keys().isEmpty()) + { + if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) + { + synchronized (lock) + { + if (selector.keys().isEmpty() && + connectQueue.isEmpty()) + { + worker = null; + try + { + selector.close(); + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + } + finally + { + selector = null; + } + break; + } + } + } + } + else + { + lastActive = System.currentTimeMillis(); + } + } + catch (IOException e) + { + ExceptionMonitor.getInstance().exceptionCaught(e); + + try + { + Thread.sleep(1000); + } + catch (InterruptedException e1) + { + ExceptionMonitor.getInstance().exceptionCaught(e1); + } + } + } + } + } + + private class ConnectionRequest extends DefaultConnectFuture + { + private final SocketChannel channel; + private final long deadline; + private final IoHandler handler; + private final IoServiceConfig config; + + private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) + { + this.channel = channel; + long timeout; + if (config instanceof IoConnectorConfig) + { + timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); + } + else + { + timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); + } + this.deadline = System.currentTimeMillis() + timeout; + this.handler = handler; + this.config = config; + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java index b6fbb6c6bf..69ff7a2c19 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java @@ -39,4 +39,9 @@ public class AMQAuthenticationException extends AMQException { super(error, msg); } + public boolean isHardError() + { + return true; + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index c04380ba8c..f3e71d2035 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -57,8 +57,9 @@ public class AMQBrokerDetails implements BrokerDetails if (transport != null) { //todo this list of valid transports should be enumerated somewhere - if ((!(transport.equalsIgnoreCase("vm") || - transport.equalsIgnoreCase("tcp")))) + if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) || + transport.equalsIgnoreCase(BrokerDetails.TCP) || + transport.equalsIgnoreCase(BrokerDetails.SOCKET)))) { if (transport.equalsIgnoreCase("localhost")) { @@ -156,7 +157,10 @@ public class AMQBrokerDetails implements BrokerDetails } else { - setPort(port); + if (!_transport.equalsIgnoreCase(SOCKET)) + { + setPort(port); + } } String queryString = connection.getQuery(); @@ -263,13 +267,16 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_transport); sb.append("://"); - if (!(_transport.equalsIgnoreCase("vm"))) + if (!(_transport.equalsIgnoreCase(VM))) { sb.append(_host); } - sb.append(':'); - sb.append(_port); + if (!(_transport.equalsIgnoreCase(SOCKET))) + { + sb.append(':'); + sb.append(_port); + } sb.append(printOptionsURL()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 39b3b80e74..4b8143cfb5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -39,6 +39,7 @@ import org.apache.qpid.jms.Connection; import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.jms.ConnectionURL; import org.apache.qpid.jms.FailoverPolicy; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +73,105 @@ import java.util.concurrent.atomic.AtomicInteger; public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable { + private static final class ChannelToSessionMap + { + private final AMQSession[] _fastAccessSessions = new AMQSession[16]; + private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>(); + private int _size = 0; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + + public AMQSession get(int channelId) + { + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + return _fastAccessSessions[channelId]; + } + else + { + return _slowAccessSessions.get(channelId); + } + } + + public AMQSession put(int channelId, AMQSession session) + { + AMQSession oldVal; + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + oldVal = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = session; + } + else + { + oldVal = _slowAccessSessions.put(channelId, session); + } + if((oldVal != null) && (session == null)) + { + _size--; + } + else if((oldVal == null) && (session != null)) + { + _size++; + } + + return session; + + } + + + public AMQSession remove(int channelId) + { + AMQSession session; + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + session = _fastAccessSessions[channelId]; + _fastAccessSessions[channelId] = null; + } + else + { + session = _slowAccessSessions.remove(channelId); + } + + if(session != null) + { + _size--; + } + return session; + + } + + public Collection<AMQSession> values() + { + ArrayList<AMQSession> values = new ArrayList<AMQSession>(size()); + + for(int i = 0; i < 16; i++) + { + if(_fastAccessSessions[i] != null) + { + values.add(_fastAccessSessions[i]); + } + } + values.addAll(_slowAccessSessions.values()); + + return values; + } + + public int size() + { + return _size; + } + + public void clear() + { + _size = 0; + _slowAccessSessions.clear(); + for(int i = 0; i<16; i++) + { + _fastAccessSessions[i] = null; + } + } + } + + private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private AtomicInteger _idFactory = new AtomicInteger(0); @@ -101,7 +201,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private AMQProtocolHandler _protocolHandler; /** Maps from session id (Integer) to AMQSession instance */ - private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>(); + private final ChannelToSessionMap _sessions = new ChannelToSessionMap(); private String _clientName; @@ -157,6 +257,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final long DEFAULT_TIMEOUT = 1000 * 30; private ProtocolVersion _protocolVersion; + /** * @param broker brokerdetails * @param username username @@ -232,6 +333,26 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException { + final ArrayList<JMSException> exceptions = new ArrayList<JMSException>(); + + class Listener implements ExceptionListener + { + public void onException(JMSException e) + { + exceptions.add(e); + } + } + + try + { + setExceptionListener(new Listener()); + } + catch (JMSException e) + { + // Shouldn't happen + throw new AMQException(null, null, e); + } + if (_logger.isInfoEnabled()) { _logger.info("Connection:" + connectionURL); @@ -288,8 +409,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect try { makeBrokerConnection(_failoverPolicy.getNextBrokerDetails()); - lastException = null; - _connected = true; } catch (Exception e) { @@ -316,8 +435,31 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if (!_connected) { String message = null; + try + { + Thread.sleep(150); + } + catch (InterruptedException e) + { + // Eat it, we've hopefully got all the exceptions if this happened + } + if (exceptions.size() > 0) + { + JMSException e = exceptions.get(0); + int code = -1; + try + { + code = new Integer(e.getErrorCode()).intValue(); + } + catch (NumberFormatException nfe) + { + // Ignore this, we have some error codes and messages swapped around + } - if (lastException != null) + throw new AMQConnectionFailureException(AMQConstant.getConstant(code), + e.getMessage(), e); + } + else if (lastException != null) { if (lastException.getCause() != null) { @@ -395,20 +537,27 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); try { + TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail); // this blocks until the connection has been set up or when an error // has prevented the connection being set up //_protocolHandler.attainState(AMQState.CONNECTION_OPEN); AMQState state = _protocolHandler.attainState(openOrClosedStates); - if(state == AMQState.CONNECTION_OPEN) + if (state == AMQState.CONNECTION_OPEN) { - _failoverPolicy.attainedConnection(); // Again this should be changed to a suitable notify _connected = true; } + else if (state == AMQState.CONNECTION_CLOSED) + { + //We need to change protocol handler here as an error during the connect will not + // cause the StateManager to be replaced. So the state is out of sync on reconnect + // This occurs here when we need to re-negotiate protocol versions + _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED); + } } catch (AMQException e) { @@ -513,71 +662,71 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { - synchronized(_sessionCreationLock) + synchronized (_sessionCreationLock) { - checkNotClosed(); + checkNotClosed(); - if (channelLimitReached()) - { - throw new ChannelLimitReachedException(_maximumChannelCount); - } + if (channelLimitReached()) + { + throw new ChannelLimitReachedException(_maximumChannelCount); + } - return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( - new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() - { - public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException + return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>( + new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>() { - int channelId = _idFactory.incrementAndGet(); - - if (_logger.isDebugEnabled()) + public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException { - _logger.debug("Write channel open frame for channel id " + channelId); - } - - // We must create the session and register it before actually sending the frame to the server to - // open it, so that there is no window where we could receive data on the channel and not be set - // up to handle it appropriately. - AMQSession session = - new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, - prefetchLow); - // _protocolHandler.addSessionByChannel(channelId, session); - registerSession(channelId, session); + int channelId = _idFactory.incrementAndGet(); - boolean success = false; - try - { - createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); - success = true; - } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error creating session: " + e); - jmse.setLinkedException(e); - throw jmse; - } - finally - { - if (!success) + if (_logger.isDebugEnabled()) { - deregisterSession(channelId); + _logger.debug("Write channel open frame for channel id " + channelId); } - } - if (_started) - { + // We must create the session and register it before actually sending the frame to the server to + // open it, so that there is no window where we could receive data on the channel and not be set + // up to handle it appropriately. + AMQSession session = + new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh, + prefetchLow); + // _protocolHandler.addSessionByChannel(channelId, session); + registerSession(channelId, session); + + boolean success = false; try { - session.start(); + createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted); + success = true; } catch (AMQException e) { - throw new JMSAMQException(e); + JMSException jmse = new JMSException("Error creating session: " + e); + jmse.setLinkedException(e); + throw jmse; + } + finally + { + if (!success) + { + deregisterSession(channelId); + } } - } - return session; - } - }, this).execute(); + if (_started) + { + try + { + session.start(); + } + catch (AMQException e) + { + throw new JMSAMQException(e); + } + } + + return session; + } + }, this).execute(); } } @@ -589,13 +738,13 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO: Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); + _protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); - BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false); + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(0, prefetchHigh, false); // todo send low water mark when protocol allows. // todo Be aware of possible changes to parameter order as versions change. - _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class); + _protocolHandler.syncWrite(basicQosBody.generateFrame(channelId), BasicQosOkBody.class); if (transacted) { @@ -720,10 +869,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect checkNotClosed(); if (!_started) { - final Iterator it = _sessions.entrySet().iterator(); + final Iterator it = _sessions.values().iterator(); while (it.hasNext()) { - final AMQSession s = (AMQSession) ((Map.Entry) it.next()).getValue(); + final AMQSession s = (AMQSession) (it.next()); try { s.start(); @@ -783,17 +932,20 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else { - synchronized (getFailoverMutex()) - { if (!_closed.getAndSet(true)) { - try + + synchronized (getFailoverMutex()) { - long startCloseTime = System.currentTimeMillis(); + try + { + long startCloseTime = System.currentTimeMillis(); - _taskPool.shutdown(); closeAllSessions(null, timeout, startCloseTime); + //This MUST occur after we have successfully closed all Channels/Sessions + _taskPool.shutdown(); + if (!_taskPool.isTerminated()) { try @@ -977,11 +1129,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _maximumFrameSize; } - public Map getSessions() - { - return _sessions; - } - public String getUsername() { return _username; @@ -1154,7 +1301,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.error("Throwable Received but no listener set: " + cause.getMessage()); } - if (!(cause instanceof AMQUndeliveredException) && !(cause instanceof AMQAuthenticationException)) + if (hardError(cause)) { try { @@ -1178,6 +1325,16 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + private boolean hardError(Throwable cause) + { + if (cause instanceof AMQException) + { + return ((AMQException)cause).isHardError(); + } + + return true; + } + void registerSession(int channelId, AMQSession session) { _sessions.put(channelId, session); @@ -1202,6 +1359,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // _protocolHandler.addSessionByChannel(s.getChannelId(), s); reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted()); s.resubscribe(); + s.setFlowControl(true); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 28e5992b26..a3cf39003d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.AMQException; import javax.jms.IllegalStateException; import javax.jms.JMSException; @@ -50,7 +51,9 @@ public class AMQQueueBrowser implements QueueBrowser _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector; // Create Consumer to verify message selector. BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + // Close this consumer as we are not looking to consume only to establish that, at least for now, + // the QB can be created consumer.close(); } @@ -88,39 +91,40 @@ public class AMQQueueBrowser implements QueueBrowser checkState(); final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); + _consumers.add(consumer); return new Enumeration() + { + + Message _nextMessage = consumer == null ? null : consumer.receive(); + + public boolean hasMoreElements() { + _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); - Message _nextMessage = consumer.receive(); + return (_nextMessage != null); + } - public boolean hasMoreElements() + public Object nextElement() + { + Message msg = _nextMessage; + try { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + _logger.info("QB:nextElement about to receive"); - return (_nextMessage != null); + _nextMessage = consumer.receive(); + _logger.info("QB:nextElement received:" + _nextMessage); } - - public Object nextElement() + catch (JMSException e) { - Message msg = _nextMessage; - try - { - _logger.info("QB:nextElement about to receive"); - - _nextMessage = consumer.receive(); - _logger.info("QB:nextElement received:" + _nextMessage); - } - catch (JMSException e) - { - _logger.warn("Exception caught while queue browsing", e); - _nextMessage = null; - } - - return msg; + _logger.warn("Exception caught while queue browsing", e); + _nextMessage = null; } - }; + + return msg; + } + }; } public void close() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 42f07f97f9..c3219e6564 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -39,10 +39,10 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; -import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -78,10 +78,7 @@ import javax.jms.TopicSubscriber; import javax.jms.TransactionRolledBackException; import java.io.Serializable; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -107,6 +104,89 @@ import java.util.concurrent.atomic.AtomicLong; */ public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { + private static final class IdToConsumerMap + { + private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; + private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>(); + + + public BasicMessageConsumer get(int id) + { + if((id & 0xFFFFFFF0) == 0) + { + return _fastAccessConsumers[id]; + } + else + { + return _slowAccessConsumers.get(id); + } + } + + public BasicMessageConsumer put(int id, BasicMessageConsumer consumer) + { + BasicMessageConsumer oldVal; + if((id & 0xFFFFFFF0) == 0) + { + oldVal = _fastAccessConsumers[id]; + _fastAccessConsumers[id] = consumer; + } + else + { + oldVal = _slowAccessConsumers.put(id, consumer); + } + + return consumer; + + } + + + public BasicMessageConsumer remove(int id) + { + BasicMessageConsumer consumer; + if((id & 0xFFFFFFF0) == 0) + { + consumer = _fastAccessConsumers[id]; + _fastAccessConsumers[id] = null; + } + else + { + consumer = _slowAccessConsumers.remove(id); + } + + return consumer; + + } + + public Collection<BasicMessageConsumer> values() + { + ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>(); + + for(int i = 0; i < 16; i++) + { + if(_fastAccessConsumers[i] != null) + { + values.add(_fastAccessConsumers[i]); + } + } + values.addAll(_slowAccessConsumers.values()); + + return values; + } + + + public void clear() + { + _slowAccessConsumers.clear(); + for(int i = 0; i<16; i++) + { + _fastAccessConsumers[i] = null; + } + } + } + + + + /** Used for debugging. */ private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); @@ -156,7 +236,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _transacted; /** Holds the sessions acknowledgement mode. */ - private int _acknowledgeMode; + private final int _acknowledgeMode; /** Holds this session unique identifier, used to distinguish it from other sessions. */ private int _channelId; @@ -217,8 +297,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right * consumer. */ - private Map<AMQShortString, BasicMessageConsumer> _consumers = - new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); + private final IdToConsumerMap _consumers = new IdToConsumerMap(); + + //Map<AMQShortString, BasicMessageConsumer> _consumers = + //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>(); /** * Contains a list of consumers which have been removed but which might still have @@ -281,6 +363,27 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi /** Has failover occured on this session */ private boolean _failedOver; + + + private static final class FlowControlIndicator + { + private volatile boolean _flowControl = true; + + public synchronized void setFlowControl(boolean flowControl) + { + _flowControl= flowControl; + notify(); + } + + public boolean getFlowControl() + { + return _flowControl; + } + } + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + /** * Creates a new session on a connection. * @@ -327,24 +430,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { public void aboveThreshold(int currentValue) { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { _logger.debug( "Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); new Thread(new SuspenderRunner(true)).start(); - } + } public void underThreshold(int currentValue) { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { _logger.debug( "Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); new Thread(new SuspenderRunner(false)).start(); - } + } }); } @@ -495,14 +594,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); } - synchronized (_connection.getFailoverMutex()) + // Ensure we only try and close an open session. + if (!_closed.getAndSet(true)) { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session. - synchronized (_messageDeliveryLock) + synchronized (_connection.getFailoverMutex()) { - // Ensure we only try and close an open session. - if (!_closed.getAndSet(true)) + // We must close down all producers and consumers in an orderly fashion. This is the only method + // that can be called from a different thread of control from the one controlling the session. + synchronized (_messageDeliveryLock) { // we pass null since this is not an error case closeProducersAndConsumers(null); @@ -516,7 +615,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi final AMQFrame frame = body.generateFrame(getChannelId()); getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout); - // When control resumes at this point, a reply will have been received that // indicates the broker has closed the channel successfully. @@ -550,33 +648,44 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ public void closed(Throwable e) throws JMSException { - synchronized (_connection.getFailoverMutex()) + // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived + // calls through connection.closeAllSessions which is also called by the public connection.close() + // with a null cause + // When we are closing the Session due to a protocol session error we simply create a new AMQException + // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. + // We need to determin here if the connection should be + + if (e instanceof AMQDisconnectedException) { - if (e instanceof AMQDisconnectedException) + if (_dispatcher != null) { - if (_dispatcher != null) - { - // Failover failed and ain't coming back. Knife the dispatcher. - _dispatcher.interrupt(); - } + // Failover failed and ain't coming back. Knife the dispatcher. + _dispatcher.interrupt(); } - synchronized (_messageDeliveryLock) + } + + if (!_closed.getAndSet(true)) + { + synchronized (_connection.getFailoverMutex()) { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else + synchronized (_messageDeliveryLock) { - amqe = new AMQException("Closing session forcibly", e); - } + // An AMQException has an error code and message already and will be passed in when closure occurs as a + // result of a channel close request + AMQException amqe; + if (e instanceof AMQException) + { + amqe = (AMQException) e; + } + else + { + amqe = new AMQException("Closing session forcibly", e); + } + - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); + _connection.deregisterSession(_channelId); + closeProducersAndConsumers(amqe); + } } } } @@ -662,16 +771,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { // Remove the consumer from the map - BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag); + BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue()); if (consumer != null) { - // fixme this isn't right.. needs to check if _queue contains data for this consumer - if (consumer.isAutoClose()) // && _queue.isEmpty()) - { - consumer.closeWhenNoMessages(true); - } - - if (!consumer.isNoConsume()) + if (!consumer.isNoConsume()) // Normal Consumer { // Clean the Maps up first // Flush any pending messages for this consumerTag @@ -687,7 +790,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _dispatcher.rejectPending(consumer); } - else + else // Queue Browser { // Just close the consumer // fixme the CancelOK is being processed before the arriving messages.. @@ -695,13 +798,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // has yet to receive before the close comes in. // consumer.markClosed(); + + + + if (consumer.isAutoClose()) + { // There is a small window where the message is between the two queues in the dispatcher. + if (consumer.isClosed()) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing consumer:" + consumer.debugIdentity()); + } + + deregisterConsumer(consumer); + + } + else + { + _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer)); + } + } } } - else - { - _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map."); - } - } public QueueBrowser createBrowser(Queue queue) throws JMSException @@ -744,6 +862,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, false); } + + public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, true, null, null, + false, false); + } + + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); @@ -761,6 +889,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi messageSelector, null, false, false); } + + public MessageConsumer createExclusiveConsumer(Destination destination, String messageSelector, boolean noLocal) + throws JMSException + { + checkValidDestination(destination); + + return createConsumerImpl(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, true, + messageSelector, null, false, false); + } + + public MessageConsumer createConsumer(Destination destination, int prefetch, boolean noLocal, boolean exclusive, String selector) throws JMSException { @@ -925,7 +1064,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); - return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic), topic); + return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic,false,false), topic); } public Queue createQueue(String queueName) throws JMSException @@ -1089,9 +1228,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest)); } + /** * Creates a non-durable subscriber with a message selector * @@ -1109,7 +1249,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi AMQTopic dest = checkValidTopic(topic); // AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); + return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal)); } public TemporaryQueue createTemporaryQueue() throws JMSException @@ -1276,15 +1416,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + "] received in session with channel id " + _channelId); } - if (message.getDeliverBody() == null) + if (message.isDeliverMessage()) { - // Return of the bounced message. - returnBouncedMessage(message); + _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); + _queue.add(message); } else { - _highestDeliveryTag.set(message.getDeliverBody().getDeliveryTag()); - _queue.add(message); + // Return of the bounced message. + returnBouncedMessage(message); } } @@ -1393,9 +1533,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(UnprocessedMessage message, boolean requeue) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag()); + _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().getDeliveryTag()); } rejectMessage(message.getDeliverBody().getDeliveryTag(), requeue); @@ -1403,9 +1543,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); } rejectMessage(message.getDeliveryTag(), requeue); @@ -1638,11 +1778,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { JMSException ex = new JMSException("Error registering consumer: " + e); - if (_logger.isDebugEnabled()) - { - e.printStackTrace(); - } - ex.setLinkedException(e); throw ex; } @@ -1666,7 +1801,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { - if (_consumers.remove(consumer.getConsumerTag()) != null) + if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null) { String subscriptionName = _reverseSubscriptionMap.remove(consumer); if (subscriptionName != null) @@ -2063,8 +2198,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException { + int tagId = _nextTag++; // need to generate a consumer tag on the client so we can exploit the nowait flag - AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++)); + AMQShortString tag = new AMQShortString(Integer.toString(tagId)); FieldTable arguments = FieldTableFactory.newFieldTable(); if ((messageSelector != null) && !messageSelector.equals("")) @@ -2084,7 +2220,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening - _consumers.put(tag, consumer); + _consumers.put(tagId, consumer); try { @@ -2112,7 +2248,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi catch (AMQException e) { // clean-up the map in the event of an error - _consumers.remove(tag); + _consumers.remove(tagId); throw e; } } @@ -2148,6 +2284,73 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } + + /** + * Returns the number of messages currently queued for the given destination. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to be checked + * + * @return the number of queued messages. + * + * @throws AMQException If the queue cannot be declared for any reason. + */ + public long getQueueDepth(final AMQDestination amqd) + throws AMQException + { + + class QueueDeclareOkHandler extends SpecificMethodFrameListener + { + + private long _messageCount; + private long _consumerCount; + + public QueueDeclareOkHandler() + { + super(getChannelId(), QueueDeclareOkBody.class); + } + + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException + { + boolean matches = super.processMethod(channelId, frame); + if (matches) + { + QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; + _messageCount = declareOk.getMessageCount(); + _consumerCount = declareOk.getConsumerCount(); + } + return matches; + } + + } + + return new FailoverNoopSupport<Long, AMQException>( + new FailoverProtectedOperation<Long, AMQException>() + { + public Long execute() throws AMQException, FailoverException + { + + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(_channelId); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + + return okHandler._messageCount; + } + }, _connection).execute(); + + } + + + /** * Declares the named exchange and type of exchange. * @@ -2595,6 +2798,25 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _ticket = ticket; } + public void setFlowControl(final boolean active) + { + _flowControl.setFlowControl(active); + } + + + public void checkFlowControl() throws InterruptedException + { + synchronized(_flowControl) + { + while(!_flowControl.getFlowControl()) + { + _flowControl.wait(); + } + } + + } + + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { @@ -2732,7 +2954,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _lock.wait(2000); } - if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get()) + if (!(message instanceof UnprocessedMessage.CloseConsumerMessage) + && (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())) { rejectMessage(message, true); } @@ -2786,10 +3009,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private void dispatchMessage(UnprocessedMessage message) { - if (message.getDeliverBody() != null) + final BasicDeliverBody deliverBody = message.getDeliverBody(); + if (deliverBody != null) { final BasicMessageConsumer consumer = - (BasicMessageConsumer) _consumers.get(message.getDeliverBody().getConsumerTag()); + _consumers.get(deliverBody.getConsumerTag().toIntValue()); if ((consumer == null) || consumer.isClosed()) { @@ -2798,13 +3022,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (consumer == null) { _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().getDeliveryTag() + "] from queue " - + message.getDeliverBody().getConsumerTag() + " )without a handler - rejecting(requeue)..."); + + deliverBody.getDeliveryTag() + "] from queue " + + deliverBody.getConsumerTag() + " )without a handler - rejecting(requeue)..."); } else { _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "[" - + message.getDeliverBody().getDeliveryTag() + "] from queue " + " consumer(" + + deliverBody.getDeliveryTag() + "] from queue " + " consumer(" + consumer.debugIdentity() + ") is closed rejecting(requeue)..."); } } @@ -2816,7 +3040,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - consumer.notifyMessage(message, _channelId); + consumer.notifyMessage(message); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 610e0109b1..efbce6033b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -26,11 +26,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicCancelBody; -import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.slf4j.Logger; @@ -42,6 +38,7 @@ import javax.jms.MessageListener; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @@ -53,13 +50,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); /** The connection being used by this consumer */ - private AMQConnection _connection; + private final AMQConnection _connection; - private String _messageSelector; + private final String _messageSelector; - private boolean _noLocal; + private final boolean _noLocal; - private AMQDestination _destination; + private final AMQDestination _destination; /** When true indicates that a blocking receive call is in progress */ private final AtomicBoolean _receiving = new AtomicBoolean(false); @@ -70,7 +67,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private AMQShortString _consumerTag; /** We need to know the channel id when constructing frames */ - private int _channelId; + private final int _channelId; /** * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors @@ -78,45 +75,36 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private final ArrayBlockingQueue _synchronousQueue; - private MessageFactoryRegistry _messageFactory; + private final MessageFactoryRegistry _messageFactory; private final AMQSession _session; - private AMQProtocolHandler _protocolHandler; + private final AMQProtocolHandler _protocolHandler; /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ - private FieldTable _rawSelectorFieldTable; + private final FieldTable _rawSelectorFieldTable; /** * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchHigh; + private final int _prefetchHigh; /** * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of * failover */ - private int _prefetchLow; + private final int _prefetchLow; /** We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover */ - private boolean _exclusive; + private final boolean _exclusive; /** * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our * implementation. */ - private int _acknowledgeMode; - - /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */ - private int _outstanding; - - /** - * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding - * number of msgs >= _prefetchHigh and disabled at < _prefetchLow - */ - private boolean _dups_ok_acknowledge_send; + private final int _acknowledgeMode; private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); @@ -133,10 +121,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive * on the queue. This is used for queue browsing. */ - private boolean _autoClose; - private boolean _closeWhenNoMessages; + private final boolean _autoClose; - private boolean _noConsume; + private final boolean _noConsume; private List<StackTraceElement> _closedStack = null; protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, @@ -156,7 +143,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; - _acknowledgeMode = acknowledgeMode; + _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); _autoClose = autoClose; _noConsume = noConsume; @@ -166,6 +153,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _acknowledgeMode = Session.NO_ACKNOWLEDGE; } + else + { + _acknowledgeMode = acknowledgeMode; + } } public AMQDestination getDestination() @@ -253,10 +244,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer switch (_acknowledgeMode) { - case Session.DUPS_OK_ACKNOWLEDGE: - _logger.info("Recording tag for acking on close:" + msg.getDeliveryTag()); - _receivedDeliveryTags.add(msg.getDeliveryTag()); - break; case Session.CLIENT_ACKNOWLEDGE: _unacknowledgedDeliveryTags.add(msg.getDeliveryTag()); @@ -269,7 +256,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } else { - _logger.info("Recording tag for commit:" + msg.getDeliveryTag()); _receivedDeliveryTags.add(msg.getDeliveryTag()); } @@ -284,8 +270,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * * @return boolean if the acquisition was successful * - * @throws JMSException - * @throws InterruptedException + * @throws JMSException if a listener has already been set or another thread is receiving + * @throws InterruptedException if interrupted */ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { @@ -372,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted acquire: " + e); if (isClosed()) { return null; @@ -383,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { - if (closeOnAutoClose()) - { - return null; - } - Object o = null; if (l > 0) { @@ -400,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted poll: " + e); if (isClosed()) { return null; @@ -418,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e); + _logger.warn("Interrupted take: " + e); if (isClosed()) { return null; @@ -440,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - private boolean closeOnAutoClose() throws JMSException - { - if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty()) - { - close(false); - - return true; - } - else - { - return false; - } - } - public Message receiveNoWait() throws JMSException { checkPreConditions(); @@ -482,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { - if (closeOnAutoClose()) - { - return null; - } - Object o = _synchronousQueue.poll(); final AbstractJMSMessage m = returnMessageOrThrow(o); if (m != null) @@ -507,7 +469,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * We can get back either a Message or an exception from the queue. This method examines the argument and deals with * it by throwing it (if an exception) or returning it (in any other case). * - * @param o + * @param o the object to return or throw * * @return a message only if o is a Message * @@ -527,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw e; } + else if (o instanceof UnprocessedMessage.CloseConsumerMessage) + { + _closed.set(true); + deregisterConsumer(); + return null; + } else { return (AbstractJMSMessage) o; @@ -540,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public void close(boolean sendClose) throws JMSException { - // synchronized (_closed) - if (_logger.isInfoEnabled()) { _logger.info("Closing consumer:" + debugIdentity()); } - synchronized (_connection.getFailoverMutex()) + if (!_closed.getAndSet(true)) { - if (!_closed.getAndSet(true)) + if (_logger.isDebugEnabled()) { - if (_logger.isTraceEnabled()) + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + if (_closedStack != null) { - StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); - if (_closedStack != null) - { - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); - } - else - { - _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1); - } + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); + } + else + { + _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1); } + } - if (sendClose) + if (sendClose) + { + // The Synchronized block only needs to protect network traffic. + synchronized (_connection.getFailoverMutex()) { BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false); @@ -578,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } - } catch (AMQException e) { @@ -589,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer throw new JMSAMQException("FailoverException interrupted basic cancel.", e); } } - else - { - // //fixme this probably is not right - // if (!isNoConsume()) - { // done in BasicCancelOK Handler but not sending one so just deregister. - deregisterConsumer(); - } + } + else + { + // //fixme this probably is not right + // if (!isNoConsume()) + { // done in BasicCancelOK Handler but not sending one so just deregister. + deregisterConsumer(); } + } - if ((_messageListener != null) && _receiving.get()) + // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive + // so we need to let it know it is time to close. + if ((_messageListener != null) && _receiving.get()) + { + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Interrupting thread: " + _receivingThread); - } - - _receivingThread.interrupt(); + _logger.info("Interrupting thread: " + _receivingThread); } + + _receivingThread.interrupt(); } } } @@ -621,14 +589,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + _logger.debug(_consumerTag + " markClosed():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } else { @@ -645,10 +613,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer * message listener or a synchronous receive() caller. * * @param messageFrame the raw unprocessed mesage - * @param channelId channel on which this message was sent */ - void notifyMessage(UnprocessedMessage messageFrame, int channelId) + void notifyMessage(UnprocessedMessage messageFrame) { + if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage) + { + notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame); + return; + } + final boolean debug = _logger.isDebugEnabled(); if (debug) @@ -658,10 +631,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer try { + final BasicDeliverBody deliverBody = messageFrame.getDeliverBody(); + AbstractJMSMessage jmsMessage = - _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(), - messageFrame.getDeliverBody().getRedelivered(), messageFrame.getDeliverBody().getExchange(), - messageFrame.getDeliverBody().getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + _messageFactory.createMessage(deliverBody.getDeliveryTag(), + deliverBody.getRedelivered(), + deliverBody.getExchange(), + deliverBody.getRoutingKey(), + messageFrame.getContentHeader(), + messageFrame.getBodies()); if (debug) { @@ -673,11 +651,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // if (!_closed.get()) { - jmsMessage.setConsumer(this); - preDeliver(jmsMessage); - notifyMessage(jmsMessage, channelId); + notifyMessage(jmsMessage); } // else // { @@ -700,11 +676,33 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - /** - * @param jmsMessage this message has already been processed so can't redo preDeliver - * @param channelId - */ - public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId) + /** @param closeMessage this message signals that we should close the browser */ + public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage) + { + if (isMessageListenerSet()) + { + // Currently only possible to get this msg type with a browser. + // If we get the message here then we should probably just close this consumer. + // Though an AutoClose consumer with message listener is quite odd... + // Just log out the fact so we know where we are + _logger.warn("Using an AutoCloseconsumer with message listener is not supported."); + } + else + { + try + { + _synchronousQueue.put(closeMessage); + } + catch (InterruptedException e) + { + _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," + + "but we shouldn't have close yet"); + } + } + } + + /** @param jmsMessage this message has already been processed so can't redo preDeliver */ + public void notifyMessage(AbstractJMSMessage jmsMessage) { try { @@ -773,28 +771,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer break; case Session.DUPS_OK_ACKNOWLEDGE: - /*( if (++_outstanding >= _prefetchHigh) - { - _dups_ok_acknowledge_send = true; - } - - //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur. - if (_outstanding < _prefetchLow) - { - _dups_ok_acknowledge_send = false; - } - - if (_dups_ok_acknowledge_send) - { - if (!_session.isInRecovery()) - { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); - _outstanding = 0; - } - } - - break; - */ case Session.AUTO_ACKNOWLEDGE: // we do not auto ack a message if the application code called recover() if (!_session.isInRecovery()) @@ -845,14 +821,14 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // synchronized (_closed) { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + _logger.debug(_consumerTag + " notifyError():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously" + _closedStack.toString()); } else { @@ -948,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _noConsume; } - public void closeWhenNoMessages(boolean b) - { - _closeWhenNoMessages = b; - - if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null)) - { - _closed.set(true); - _receivingThread.interrupt(); - } - - } - public void rollback() { clearUnackedMessages(); @@ -982,9 +946,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (tag != null) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting tag from _receivedDTs:" + tag); + _logger.debug("Rejecting tag from _receivedDTs:" + tag); } _session.rejectMessage(tag, true); @@ -1025,9 +989,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _session.rejectMessage(((AbstractJMSMessage) o), true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); + _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } iterator.remove(); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 7e96fb537c..ae71846870 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -538,6 +538,18 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j frames[0] = publishFrame; frames[1] = contentHeaderFrame; CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); + + try + { + _session.checkFlowControl(); + } + catch (InterruptedException e) + { + JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed"); + jmsEx.setLinkedException(e); + throw jmsEx; + } + _protocolHandler.writeFrame(compositeFrame, wait); if (message != origMessage) diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 60f95bfe33..a944ff6bec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -120,13 +120,17 @@ public class FailoverHandler implements Runnable // We wake up listeners. If they can handle failover, they will extend the // FailoverRetrySupport class and will in turn block on the latch until failover // has completed before retrying the operation. - _amqProtocolHandler.propagateExceptionToWaiters(new FailoverException("Failing over about to start")); + _amqProtocolHandler.notifyFailoverStarting(); // Since failover impacts several structures we protect them all with a single mutex. These structures // are also in child objects of the connection. This allows us to manipulate them without affecting // client code which runs in a separate thread. synchronized (_amqProtocolHandler.getConnection().getFailoverMutex()) { + //Clear the exception now that we have the failover mutex there can be no one else waiting for a frame so + // we can clear the exception. + _amqProtocolHandler.failoverInProgress(); + // We switch in a new state manager temporarily so that the interaction to get to the "connection open" // state works, without us having to terminate any existing "state waiters". We could theoretically // have a state waiter waiting until the connection is closed for some reason. Or in future we may have @@ -138,6 +142,9 @@ public class FailoverHandler implements Runnable _logger.info("Failover process veto-ed by client"); _amqProtocolHandler.setStateManager(existingStateManager); + + //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that + // prompted the failover event. if (_host != null) { _amqProtocolHandler.getConnection().exceptionReceived(new AMQDisconnectedException( diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index 120a07f0fc..e756d7baf9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -122,6 +122,13 @@ public class FailoverRetrySupport<T, E extends Exception> implements FailoverSup {
_log.debug("Failover exception caught during operation: " + e, e);
}
+ catch (IllegalStateException e)
+ {
+ if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
+ {
+ throw e;
+ }
+ }
}
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java index 49c8a83833..d05e99d210 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicDeliverBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +45,8 @@ public class BasicDeliverMethodHandler implements StateAwareMethodListener<Basic throws AMQException { final AMQProtocolSession session = stateManager.getProtocolSession(); - final UnprocessedMessage msg = new UnprocessedMessage(channelId, body); + final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedDeliverMessage(body); _logger.debug("New JmsDeliver method received"); - session.unprocessedMessageReceived(msg); + session.unprocessedMessageReceived(channelId, msg); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java index 428d366f07..2ebc9288c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java @@ -26,7 +26,6 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.BasicReturnBody; -import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +46,9 @@ public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, i { _logger.debug("New JmsBounce method received"); final AMQProtocolSession session = stateManager.getProtocolSession(); - final UnprocessedMessage msg = new UnprocessedMessage(channelId, body); + final UnprocessedMessage msg = new UnprocessedMessage.UnprocessedBouncedMessage(body); - session.unprocessedMessageReceived(msg); + session.unprocessedMessageReceived(channelId, msg); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java index 8c8814e9b7..a580a6466d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java @@ -104,6 +104,11 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann } // fixme why is this only done when the close is expected... // should the above forced closes not also cause a close? + // ---------- + // Closing the session only when it is expected allows the errors to be processed + // Calling this here will prevent failover. So we should do this for all exceptions + // that should never cause failover. Such as authentication errors. + session.channelClosed(channelId, errorCode, String.valueOf(reason)); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java new file mode 100644 index 0000000000..b47fe751d6 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java @@ -0,0 +1,54 @@ +package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.AMQException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +public class ChannelFlowMethodHandler implements StateAwareMethodListener<ChannelFlowBody> +{ + private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowMethodHandler.class); + private static final ChannelFlowMethodHandler _instance = new ChannelFlowMethodHandler(); + + public static ChannelFlowMethodHandler getInstance() + { + return _instance; + } + + private ChannelFlowMethodHandler() + { } + + public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) + throws AMQException + { + + final AMQProtocolSession session = stateManager.getProtocolSession(); + session.setFlowControl(channelId, body.getActive()); + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java index 4d805cf123..fdcb493f38 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java @@ -73,10 +73,11 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co if (errorCode != AMQConstant.REPLY_SUCCESS) { - if (errorCode == AMQConstant.NOT_ALLOWED) + if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED)) { - _logger.info("Authentication Error:" + Thread.currentThread().getName()); + _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName()); + // todo ritchiem : Why do this here when it is going to be done in the finally block? session.closeProtocolSession(); // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state. @@ -98,6 +99,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<Co session.closeProtocolSession(); + // ritchiem: Doing this though will cause any waiting connection start to be released without being able to + // see what the cause was. stateManager.changeState(AMQState.CONNECTION_CLOSED); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index b029770946..6d4c61fb29 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -55,8 +55,9 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach protected boolean _changedData; private Destination _destination; private JMSHeaderAdapter _headerAdapter; - private BasicMessageConsumer _consumer; - private boolean _strictAMQP; + + private static final boolean STRICT_AMQP_COMPLIANCE = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); protected AbstractJMSMessage(ByteBuffer data) { @@ -72,8 +73,6 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach _changedData = (data == null); _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); } protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, @@ -121,7 +120,10 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { if (getContentHeaderProperties().getMessageIdAsString() == null) { - getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID()); + StringBuilder b = new StringBuilder(39); + b.append("ID:"); + b.append(UUID.randomUUID()); + getContentHeaderProperties().setMessageId(b.toString()); } return getContentHeaderProperties().getMessageIdAsString(); @@ -301,7 +303,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -311,7 +313,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public boolean getBooleanProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -321,7 +323,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte getByteProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -331,7 +333,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -341,7 +343,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public short getShortProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -351,7 +353,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public int getIntProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -361,7 +363,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public long getLongProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -371,7 +373,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public float getFloatProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -381,7 +383,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public double getDoubleProperty(String propertyName) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -392,19 +394,14 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public String getStringProperty(String propertyName) throws JMSException { - if (propertyName.startsWith("JMSX")) + //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. + if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) { - //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. - if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) - { - return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); - } - - return null; + return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); } else { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -425,7 +422,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -436,7 +433,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBooleanProperty(String propertyName, boolean b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -447,7 +444,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setByteProperty(String propertyName, byte b) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -458,7 +455,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -469,7 +466,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setShortProperty(String propertyName, short i) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -487,7 +484,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setLongProperty(String propertyName, long l) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -498,7 +495,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setFloatProperty(String propertyName, float f) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -509,7 +506,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach public void setDoubleProperty(String propertyName, double v) throws JMSException { - if (_strictAMQP) + if (STRICT_AMQP_COMPLIANCE) { throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP"); } @@ -691,9 +688,4 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach } } - public void setConsumer(BasicMessageConsumer basicMessageConsumer) - { - _consumer = basicMessageConsumer; - } - } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java index a70acbabbe..d8fe964b85 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java @@ -55,7 +55,11 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm JMSMapMessage(ByteBuffer data) throws JMSException { super(data); // this instantiates a content header - populateMapFromData(); + if(data != null) + { + populateMapFromData(); + } + } JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey, @@ -76,7 +80,7 @@ public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jm public String toBodyString() throws JMSException { - return _map.toString(); + return _map == null ? "" : _map.toString(); } public AMQShortString getMimeTypeAsShortString() diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index 5b199f2478..18157adc34 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,10 +24,20 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQChannelException; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicDeliverBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and @@ -36,33 +46,15 @@ import org.apache.qpid.framing.ContentHeaderBody; * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher * thread in order to minimise the amount of work done in the MINA dispatcher thread. */ -public class UnprocessedMessage +public abstract class UnprocessedMessage { - private long _bytesReceived = 0; + private long _bytesReceived = 0L; - private final BasicDeliverBody _deliverBody; - private final BasicReturnBody _bounceBody; // TODO: check change (gustavo) - private final int _channelId; private ContentHeaderBody _contentHeader; /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */ private List<ContentBody> _bodies; - public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody) - { - _deliverBody = deliverBody; - _channelId = channelId; - _bounceBody = null; - } - - - public UnprocessedMessage(int channelId, BasicReturnBody bounceBody) - { - _deliverBody = null; - _channelId = channelId; - _bounceBody = bounceBody; - } - public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException { @@ -96,22 +88,11 @@ public class UnprocessedMessage return _bytesReceived == getContentHeader().bodySize; } - public BasicDeliverBody getDeliverBody() - { - return _deliverBody; - } - - public BasicReturnBody getBounceBody() - { - return _bounceBody; - } - public int getChannelId() - { - return _channelId; - } + abstract public BasicDeliverBody getDeliverBody(); + abstract public BasicReturnBody getBounceBody(); public ContentHeaderBody getContentHeader() { @@ -128,4 +109,188 @@ public class UnprocessedMessage return _bodies; } + abstract public boolean isDeliverMessage(); + + public static final class UnprocessedDeliverMessage extends UnprocessedMessage + { + private final BasicDeliverBody _body; + + public UnprocessedDeliverMessage(final BasicDeliverBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return _body; + } + + public BasicReturnBody getBounceBody() + { + return null; + } + + public boolean isDeliverMessage() + { + return true; + } + } + + public static final class UnprocessedBouncedMessage extends UnprocessedMessage + { + private final BasicReturnBody _body; + + public UnprocessedBouncedMessage(final BasicReturnBody body) + { + _body = body; + } + + + public BasicDeliverBody getDeliverBody() + { + return null; + } + + public BasicReturnBody getBounceBody() + { + return _body; + } + + public boolean isDeliverMessage() + { + return false; + } + } + + public static final class CloseConsumerMessage extends UnprocessedMessage + { + BasicMessageConsumer _consumer; + + public CloseConsumerMessage(BasicMessageConsumer consumer) + { + _consumer = consumer; + } + + public BasicDeliverBody getDeliverBody() + { + return new BasicDeliverBody() + { + // This is the only thing we need to preserve so the correct consumer can be found later. + public AMQShortString getConsumerTag() + { + return _consumer.getConsumerTag(); + } + + // The Rest of these methods are not used + public long getDeliveryTag() + { + return 0; + } + + public AMQShortString getExchange() + { + return null; + } + + public boolean getRedelivered() + { + return false; + } + + public AMQShortString getRoutingKey() + { + return null; + } + + public byte getMajor() + { + return 0; + } + + public byte getMinor() + { + return 0; + } + + public int getClazz() + { + return 0; + } + + public int getMethod() + { + return 0; + } + + public void writeMethodPayload(ByteBuffer buffer) + { + } + + public byte getFrameType() + { + return 0; + } + + public int getSize() + { + return 0; + } + + public void writePayload(ByteBuffer buffer) + { + } + + public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException + { + } + + public AMQFrame generateFrame(int channelId) + { + return null; + } + + public AMQChannelException getChannelNotFoundException(int channelId) + { + return null; + } + + public AMQChannelException getChannelException(AMQConstant code, String message) + { + return null; + } + + public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause) + { + return null; + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message) + { + return null; + } + + public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause) + { + return null; + } + + public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException + { + return false; + } + }; + } + + public BasicReturnBody getBounceBody() + { + return null; + } + + public boolean isDeliverMessage() + { + return false; + } + } + } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 8a1e78d2e0..3932b098cd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -30,7 +30,6 @@ import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; @@ -55,6 +54,7 @@ import org.apache.qpid.ssl.SSLContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -153,9 +153,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter /** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */ private CountDownLatch _failoverLatch; + + /** The last failover exception that occured */ + private FailoverException _lastFailoverException; + /** Defines the default timeout to use for synchronous protocol commands. */ private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30; + /** Default buffer size for pending messages reads */ + private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; + + /** Default buffer size for pending messages writes */ + private static final String DEFAULT_WRITE_BUFFER_LIMIT = "262144"; + /** * Creates a new protocol handler, associated with the specified client connection instance. * @@ -209,29 +219,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter } catch (RuntimeException e) { - e.printStackTrace(); + _logger.error(e.getMessage(), e); } - if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio")) + if (Boolean.getBoolean("protectio")) { try { //Add IO Protection Filters IoFilterChain chain = session.getFilterChain(); - int buf_size = 32768; - if (session.getConfig() instanceof SocketSessionConfig) - { - buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize(); - } session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER_LIMIT))); writefilter.attach(chain); session.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); @@ -355,7 +360,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter if (_failoverState == FailoverState.NOT_STARTED) { // if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException))) - if (cause instanceof AMQConnectionClosedException) + if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException) { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attemp failover @@ -372,8 +377,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter AMQException amqe = new AMQException("Protocol handler error: " + cause, cause); propagateExceptionToWaiters(amqe); - _connection.exceptionReceived(cause); } + _connection.exceptionReceived(cause); } @@ -406,7 +411,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - getStateManager().error(e); + if (!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -418,6 +423,24 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void notifyFailoverStarting() + { + // Set the last exception in the sync block to ensure the ordering with add. + // either this gets done and the add does the ml.error + // or the add completes first and the iterator below will do ml.error + synchronized (_frameListeners) + { + _lastFailoverException = new FailoverException("Failing over about to start"); + } + + propagateExceptionToWaiters(_lastFailoverException); + } + + public void failoverInProgress() + { + _lastFailoverException = null; + } + private static int _messageReceivedCount; public void messageReceived(IoSession session, Object message) throws Exception @@ -438,78 +461,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody); - switch (bodyFrame.getFrameType()) - { - case AMQMethodBody.TYPE: - - if (debug) - { - _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame); - } - - final AMQMethodEvent<AMQMethodBody> evt = - new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame); - - try - { - - boolean wasAnyoneInterested = getStateManager().methodReceived(evt); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; - } - } - - if (!wasAnyoneInterested) - { - throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" - + _frameListeners); - } - } - catch (AMQException e) - { - getStateManager().error(e); - if (!_frameListeners.isEmpty()) - { - Iterator it = _frameListeners.iterator(); - while (it.hasNext()) - { - final AMQMethodListener listener = (AMQMethodListener) it.next(); - listener.error(e); - } - } - - exceptionCaught(session, e); - } - - break; - - case ContentHeaderBody.TYPE: - - _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame); - break; - - case ContentBody.TYPE: - - _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame); - break; - - case HeartbeatBody.TYPE: - - if (debug) - { - _logger.debug("Received heartbeat"); - } - - break; - - default: + bodyFrame.handle(frame.getChannel(),_protocolSession); - } _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); } @@ -527,6 +480,57 @@ public class AMQProtocolHandler extends IoHandlerAdapter } } + public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session) + throws AMQException + { + + if (_logger.isDebugEnabled()) + { + _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame); + } + + final AMQMethodEvent<AMQMethodBody> evt = + new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame); + + try + { + + boolean wasAnyoneInterested = getStateManager().methodReceived(evt); + if (!_frameListeners.isEmpty()) + { + //This iterator is safe from the error state as the frame listeners always add before they send so their + // will be ready and waiting for this response. + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; + } + } + + if (!wasAnyoneInterested) + { + throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener. Listeners:" + + _frameListeners); + } + } + catch (AMQException e) + { + if (!_frameListeners.isEmpty()) + { + Iterator it = _frameListeners.iterator(); + while (it.hasNext()) + { + final AMQMethodListener listener = (AMQMethodListener) it.next(); + listener.error(e); + } + } + + exceptionCaught(session, e); + } + + } + private static int _messagesOut; public void messageSent(IoSession session, Object message) throws Exception @@ -612,7 +616,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter { try { - _frameListeners.add(listener); + synchronized (_frameListeners) + { + if (_lastFailoverException != null) + { + throw _lastFailoverException; + } + + _frameListeners.add(listener); + } _protocolSession.writeFrame(frame); AMQMethodEvent e = listener.blockForFrame(timeout); @@ -621,10 +633,6 @@ public class AMQProtocolHandler extends IoHandlerAdapter // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } - catch (AMQException e) - { - throw e; - } finally { // If we don't removeKey the listener then no-one will diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index b48adbdb08..6a5cc62bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -74,8 +74,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ protected final AMQProtocolHandler _protocolHandler; - /** Maps from the channel id to the AMQSession that it represents. */ - protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new ConcurrentHashMap<Integer, AMQSession>(); protected ConcurrentMap _closingChannels = new ConcurrentHashMap(); @@ -83,7 +81,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives * first) with the subsequent content header and content bodies. */ - protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap(); + private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>(); + private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16]; /** Counter to ensure unique queue names */ protected int _queueId = 1; @@ -101,7 +100,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession private MethodDispatcher _methodDispatcher; - private final AMQConnection _connection; + private final AMQConnection _connection; + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) { @@ -230,14 +230,24 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession * * @throws AMQException if this was not expected */ - public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException + public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException { - _channelId2UnprocessedMsgMap.put(message.getChannelId(), message); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = message; + } + else + { + _channelId2UnprocessedMsgMap.put(channelId, message); + } } - public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId] + : _channelId2UnprocessedMsgMap.get(channelId); + + if (msg == null) { throw new AMQException("Error: received content header without having received a BasicDeliver frame first"); @@ -256,9 +266,19 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } - public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException + public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException { - UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId); + UnprocessedMessage msg; + final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0; + if(fastAccess) + { + msg = _channelId2UnprocessedMsgArray[channelId]; + } + else + { + msg = _channelId2UnprocessedMsgMap.get(channelId); + } + if (msg == null) { throw new AMQException("Error: received content body without having received a JMSDeliver frame first"); @@ -266,7 +286,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession if (msg.getContentHeader() == null) { - _channelId2UnprocessedMsgMap.remove(channelId); + if(fastAccess) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } throw new AMQException("Error: received content body without having received a ContentHeader frame first"); } @@ -286,6 +313,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession } } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + + } + /** * Deliver a message to the appropriate session, removing the unprocessed message from our map * @@ -296,7 +328,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { AMQSession session = getSession(channelId); session.messageReceived(msg); - _channelId2UnprocessedMsgMap.remove(channelId); + if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0) + { + _channelId2UnprocessedMsgArray[channelId] = null; + } + else + { + _channelId2UnprocessedMsgMap.remove(channelId); + } } protected AMQSession getSession(int channelId) @@ -486,4 +525,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _methodDispatcher = methodDispatcher; } + + public void setFlowControl(final int channelId, final boolean active) + { + final AMQSession session = getSession(channelId); + session.setFlowControl(active); + } + + public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException + { + _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index b6baefe1b0..2e6a4beb83 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -37,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArraySet; * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler * there is a separate state manager. */ -public class AMQStateManager implements AMQMethodListener +public class AMQStateManager { private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class); @@ -52,9 +52,9 @@ public class AMQStateManager implements AMQMethodListener * AMQFrame. */ - private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); + private final Object _stateLock = new Object(); - private static final long MAXIMUM_STATE_WAIT_TIME = 30000L; + private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000")); public AMQStateManager() { @@ -91,19 +91,6 @@ public class AMQStateManager implements AMQMethodListener } } - public void error(Exception e) - { - _logger.debug("State manager receive error notification: " + e); - synchronized (_stateListeners) - { - final Iterator it = _stateListeners.iterator(); - while (it.hasNext()) - { - final StateListener l = (StateListener) it.next(); - l.error(e); - } - } - } public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 5482e48699..b2f7ae8395 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -24,6 +24,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; @@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class SocketTransportConnection implements ITransportConnection { @@ -83,8 +87,34 @@ public class SocketTransportConnection implements ITransportConnection _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); - _logger.info("Attempting connection to " + address); + + final InetSocketAddress address; + + if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) + { + address = null; + + Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://<SocketID>' transport:" + brokerDetail); + } + } + else + { + address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); + _logger.info("Attempting connection to " + address); + } + + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index e8a220f5e9..7ae2ddf66c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.transport; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.socket.nio.ExistingSocketConnector; import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; @@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.net.Socket; + /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying @@ -54,12 +58,25 @@ public class TransportConnection private static final int TCP = 0; private static final int VM = 1; + private static final int SOCKET = 2; private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; - public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException + private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); + + public static void registerOpenSocket(String socketID, Socket openSocket) + { + _openSocketRegister.put(socketID, openSocket); + } + + public static Socket removeOpenSocket(String socketID) + { + return _openSocketRegister.remove(socketID); + } + + public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -68,7 +85,7 @@ public class TransportConnection throw new AMQNoTransportForProtocolException(details); } - if (transport == _currentInstance) + /* if (transport == _currentInstance) { if (transport == VM) { @@ -83,19 +100,29 @@ public class TransportConnection } } - _currentInstance = transport; + _currentInstance = transport;*/ + ITransportConnection instance; switch (transport) { - + case SOCKET: + instance = + new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + return new ExistingSocketConnector(); + } + }); + break; case TCP: - _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { public IoConnector newSocketConnector() { SocketConnector result; // FIXME - this needs to be sorted to use the new Mina MultiThread SA. - if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) + if (Boolean.getBoolean("qpidnio")) { _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") ? "Qpid NIO is new default" @@ -117,16 +144,23 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); break; } + default: + throw new AMQNoTransportForProtocolException(details); } - return _instance; + return instance; } private static int getTransport(String transport) { + if (transport.equals(BrokerDetails.SOCKET)) + { + return SOCKET; + } + if (transport.equals(BrokerDetails.TCP)) { return TCP; @@ -283,11 +317,14 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); - _acceptor.unbindAll(); + if (_acceptor != null) + { + _acceptor.unbindAll(); + } synchronized (_inVmPipeAddress) { _inVmPipeAddress.clear(); - } + } _acceptor = null; _currentInstance = -1; _currentVMPort = -1; @@ -302,6 +339,8 @@ public class TransportConnection { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); + // This does need to be sychronized as otherwise mina can hang + // if a new connection is made _acceptor.unbind(pipe); } } diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 603b0834a3..8b353a7264 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -34,6 +34,7 @@ public interface BrokerDetails public static final String OPTIONS_CONNECT_DELAY = "connectdelay"; public static final int DEFAULT_PORT = 5672; + public static final String SOCKET = "socket"; public static final String TCP = "tcp"; public static final String VM = "vm"; diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index 6ec883ff0b..8e3ccc3b02 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -34,7 +34,6 @@ public class FailoverPolicy private static final long MINUTE = 60000L; private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE; - private static final long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE; private FailoverMethod[] _methods = new FailoverMethod[1]; @@ -161,16 +160,7 @@ public class FailoverPolicy } else { - if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT) - { - _logger.info("Failover timeout"); - - return false; - } - else - { - _lastMethodTime = now; - } + _lastMethodTime = now; } } else diff --git a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java index b91fc2d960..b830c377b8 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java @@ -50,4 +50,8 @@ public interface MessageProducer extends javax.jms.MessageProducer void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException; + + void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException; + } diff --git a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java index a246352d8b..2fe01fc126 100644 --- a/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java +++ b/java/client/src/old_test/java/org/apache/qpid/fragmentation/TestLargePublisher.java @@ -183,7 +183,7 @@ public class TestLargePublisher } catch (UnknownHostException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } catch (AMQException e) { diff --git a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java index 33891142b5..37b4ff1498 100644 --- a/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java +++ b/java/client/src/old_test/java/org/apache/qpid/pubsub1/TestPublisher.java @@ -133,7 +133,7 @@ public class TestPublisher } catch (JMSException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } @@ -163,7 +163,7 @@ public class TestPublisher } catch (UnknownHostException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } catch (AMQException e) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java index d2a7ba301b..09886f4736 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java @@ -173,7 +173,7 @@ public class PropertyValueTest extends TestCase implements MessageListener m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.trace("Message:" + m); + _logger.debug("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); @@ -292,8 +292,11 @@ public class PropertyValueTest extends TestCase implements MessageListener ((AMQMessage) m).getPropertyHeaders().containsKey("void")); //JMSXUserID - Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME, - m.getStringProperty("JMSXUserID")); + if (m.getStringProperty("JMSXUserID") != null) + { + Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME, + m.getStringProperty("JMSXUserID")); + } } received.clear(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java index 40c712c1c9..d05ed7ca73 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.basic; import junit.framework.TestCase; - +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.transport.TransportConnection; - +import org.apache.qpid.url.URLSyntaxException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -46,12 +48,12 @@ public class SelectorTest extends TestCase implements MessageListener private AMQSession _session; private int count; public String _connectionString = "vm://:1"; + private static final String INVALID_SELECTOR = "Cost LIKE 5"; protected void setUp() throws Exception { super.setUp(); TransportConnection.createVMBroker(1); - init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); } protected void tearDown() throws Exception @@ -60,19 +62,19 @@ public class SelectorTest extends TestCase implements MessageListener TransportConnection.killAllVMBrokers(); } - private void init(AMQConnection connection) throws Exception + private void init(AMQConnection connection) throws JMSException { init(connection, new AMQQueue(connection, randomize("SessionStartTest"), true)); } - private void init(AMQConnection connection, AMQDestination destination) throws Exception + private void init(AMQConnection connection, AMQDestination destination) throws JMSException { _connection = connection; _destination = destination; connection.start(); String selector = null; - // selector = "Cost = 2 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; + selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'"; // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT; _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); @@ -80,13 +82,17 @@ public class SelectorTest extends TestCase implements MessageListener _session.createConsumer(destination, selector).setMessageListener(this); } - public synchronized void test() throws JMSException, InterruptedException + public synchronized void test() throws JMSException, InterruptedException, URLSyntaxException, AMQException { try { + + init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test")); + Message msg = _session.createTextMessage("Message"); msg.setJMSPriority(1); msg.setIntProperty("Cost", 2); + msg.setStringProperty("property-with-hyphen", "wibble"); msg.setJMSType("Special"); _logger.info("Sending Message:" + msg); @@ -106,10 +112,147 @@ public class SelectorTest extends TestCase implements MessageListener // throw new RuntimeException("Did not get message!"); } } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + System.out.println("SUCCESS!!"); + } + } + catch (InterruptedException e) + { + _logger.debug("IE :" + e.getClass().getSimpleName() + ":" + e.getMessage()); + } + catch (URLSyntaxException e) + { + _logger.debug("URL:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + catch (AMQException e) + { + _logger.debug("AMQ:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + fail("Wrong exception"); + } + + finally + { + if (_session != null) + { + _session.close(); + } + if (_connection != null) + { + _connection.close(); + } + } + } + + + public void testInvalidSelectors() + { + Connection connection = null; + + try + { + connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"); + _session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); + } + catch (JMSException e) + { + fail(e.getMessage()); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + catch (URLSyntaxException e) + { + fail("Error:" + e.getMessage()); + } + + //Try Creating a Browser + try + { + _session.createBrowser(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Consumer + try + { + _session.createConsumer(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + + //Try Creating a Receiever + try + { + _session.createReceiver(_session.createQueue("Ping"), INVALID_SELECTOR); + } + catch (JMSException e) + { + _logger.debug("JMS:" + e.getClass().getSimpleName() + ":" + e.getMessage()); + if (!(e instanceof InvalidSelectorException)) + { + fail("Wrong exception:" + e.getMessage()); + } + else + { + _logger.debug("SUCCESS!!"); + } + } + finally { - _session.close(); - _connection.close(); + if (_session != null) + { + try + { + _session.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } + if (_connection != null) + { + try + { + _connection.close(); + } + catch (JMSException e) + { + fail("Error cleaning up:" + e.getMessage()); + } + } } } @@ -128,9 +271,29 @@ public class SelectorTest extends TestCase implements MessageListener public static void main(String[] argv) throws Exception { SelectorTest test = new SelectorTest(); - test._connectionString = (argv.length == 0) ? "localhost:5672" : argv[0]; - test.setUp(); - test.test(); + test._connectionString = (argv.length == 0) ? "localhost:3000" : argv[0]; + + try + { + while (true) + { + if (test._connectionString.contains("vm://:1")) + { + test.setUp(); + } + test.test(); + + if (test._connectionString.contains("vm://:1")) + { + test.tearDown(); + } + } + } + catch (Exception e) + { + System.err.println(e.getMessage()); + e.printStackTrace(); + } } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 56394fee27..4b4df7e5c8 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.jms.Session; import junit.framework.TestCase; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.QueueSession; import javax.jms.TopicSession; @@ -55,25 +56,30 @@ public class ConnectionTest extends TestCase TransportConnection.killVMBroker(1); } - public void testSimpleConnection() + public void testSimpleConnection() throws Exception { + AMQConnection conn = null; try { - AMQConnection conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); - conn.close(); + conn = new AMQConnection(_broker, "guest", "guest", "fred", "test"); } catch (Exception e) { fail("Connection to " + _broker + " should succeed. Reason: " + e); } + finally + { + conn.close(); + } } - public void testDefaultExchanges() + public void testDefaultExchanges() throws Exception { + AMQConnection conn = null; try { - AMQConnection conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='" + conn = new AMQConnection("amqp://guest:guest@clientid/test?brokerlist='" + _broker + "?retries='1''&defaultQueueExchange='test.direct'" + "&defaultTopicExchange='test.topic'" @@ -106,37 +112,53 @@ public class ConnectionTest extends TestCase topicSession.close(); - - conn.close(); } catch (Exception e) { fail("Connection to " + _broker + " should succeed. Reason: " + e); } + finally + { + conn.close(); + } } - //fixme AMQAuthenticationException is not propogaged - public void PasswordFailureConnection() throws Exception + //See QPID-771 + public void testPasswordFailureConnection() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''"); + conn = new AMQConnection("amqp://guest:rubbishpassword@clientid/test?brokerlist='" + _broker + "?retries='1''"); fail("Connection should not be established password is wrong."); } catch (AMQException amqe) { - if (!(amqe instanceof AMQAuthenticationException)) + if (amqe.getCause().getClass() == Exception.class) { - fail("Correct exception not thrown. Excpected 'AMQAuthenticationException' got: " + amqe); + System.err.println("QPID-594 : WARNING RACE CONDITION. Unable to determine cause of Connection Failure."); + return; + } + + assertEquals("Exception was wrong type", JMSException.class, amqe.getCause().getClass()); + Exception linked = ((JMSException) amqe.getCause()).getLinkedException(); + assertEquals("Exception was wrong type", AMQAuthenticationException.class, linked.getClass()); + } + finally + { + if (conn != null) + { + conn.close(); } } } public void testConnectionFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_NotRunning + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -146,14 +168,22 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQConnectionException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testUnresolvedHostFailure() throws Exception { + AMQConnection conn = null; try { - new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); + conn = new AMQConnection("amqp://guest:guest@clientid/testpath?brokerlist='" + _broker_BadDNS + "?retries='0''"); fail("Connection should not be established"); } catch (AMQException amqe) @@ -163,6 +193,38 @@ public class ConnectionTest extends TestCase fail("Correct exception not thrown. Excpected 'AMQUnresolvedAddressException' got: " + amqe); } } + finally + { + if (conn != null) + { + conn.close(); + } + } + + } + + public void testUnresolvedVirtualHostFailure() throws Exception + { + AMQConnection conn = null; + try + { + conn = new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''"); + fail("Connection should not be established"); + } + catch (AMQException amqe) + { + if (!(amqe instanceof AMQConnectionFailureException)) + { + fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe); + } + } + finally + { + if (conn != null) + { + conn.close(); + } + } } public void testClientIdCannotBeChanged() throws Exception @@ -178,13 +240,27 @@ public class ConnectionTest extends TestCase { // PASS } + finally + { + if (connection != null) + { + connection.close(); + } + } } public void testClientIdIsPopulatedAutomatically() throws Exception { Connection connection = new AMQConnection(_broker, "guest", "guest", null, "test"); - assertNotNull(connection.getClientID()); + try + { + assertNotNull(connection.getClientID()); + } + finally + { + connection.close(); + } } public static junit.framework.Test suite() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 6c872a0e10..d90873a6a7 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -510,6 +510,25 @@ public class ConnectionURLTest extends TestCase } + public void testSocketProtocol() throws URLSyntaxException + { + String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'"; + + try + { + AMQConnectionURL curl = new AMQConnectionURL(url); + assertNotNull(curl); + assertEquals(1, curl.getBrokerCount()); + assertNotNull(curl.getBrokerDetails(0)); + assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport()); + assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost()); + assertEquals("URL does not toString as expected", url, curl.toString()); + } + catch (URLSyntaxException e) + { + fail(e.getMessage()); + } + } public static junit.framework.Test suite() { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java index 5a61480f6a..ee110e7932 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -29,8 +29,8 @@ import org.apache.qpid.client.transport.TransportConnection; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import uk.co.thebadgerset.junit.concurrency.TestRunnable;
-import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator;
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
import javax.jms.Connection;
import javax.jms.Message;
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java index 3012909daa..f2655adc98 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java @@ -38,6 +38,7 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; +import java.util.Enumeration; /** * @author Apache Software Foundation @@ -85,6 +86,12 @@ public class JMSPropertiesTest extends TestCase sentMsg.setJMSType(JMS_TYPE); sentMsg.setJMSReplyTo(JMS_REPLY_TO); + String JMSXGroupID_VALUE = "group"; + sentMsg.setStringProperty("JMSXGroupID", JMSXGroupID_VALUE); + + int JMSXGroupSeq_VALUE = 1; + sentMsg.setIntProperty("JMSXGroupSeq", JMSXGroupSeq_VALUE); + // send it producer.send(sentMsg); @@ -101,6 +108,30 @@ public class JMSPropertiesTest extends TestCase // assertEquals("JMS Delivery Mode mismatch",sentMsg.getJMSDeliveryMode(),rm.getJMSDeliveryMode()); assertEquals("JMS Type mismatch", sentMsg.getJMSType(), rm.getJMSType()); assertEquals("JMS Reply To mismatch", sentMsg.getJMSReplyTo(), rm.getJMSReplyTo()); + assertTrue("JMSMessageID Does not start ID:", rm.getJMSMessageID().startsWith("ID:")); + + //Validate that the JMSX values are correct + assertEquals("JMSXGroupID is not as expected:", JMSXGroupID_VALUE, rm.getStringProperty("JMSXGroupID")); + assertEquals("JMSXGroupSeq is not as expected:", JMSXGroupSeq_VALUE, rm.getIntProperty("JMSXGroupSeq")); + + boolean JMSXGroupID_Available = false; + boolean JMSXGroupSeq_Available = false; + Enumeration props = con.getMetaData().getJMSXPropertyNames(); + while (props.hasMoreElements()) + { + String name = (String) props.nextElement(); + if (name.equals("JMSXGroupID")) + { + JMSXGroupID_Available = true; + } + if (name.equals("JMSXGroupSeq")) + { + JMSXGroupSeq_Available = true; + } + } + + assertTrue("JMSXGroupID not available.",JMSXGroupID_Available); + assertTrue("JMSXGroupSeq not available.",JMSXGroupSeq_Available); con.close(); } |