diff options
11 files changed, 501 insertions, 125 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 17300d6b50..25669c62d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -229,7 +229,7 @@ public class AMQChannel BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties; //fixme: fudge for QPID-677 properties.getHeaders().keySet(); - + properties.setUserId(protocolSession.getAuthorizedID().getName()); } @@ -381,7 +381,14 @@ public class AMQChannel { _txnContext.rollback(); unsubscribeAllConsumers(session); - requeue(); + try + { + requeue(); + } + catch (AMQException e) + { + _log.error("Caught AMQException whilst attempting to reque:" + e); + } setClosing(true); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 5bfd47b469..cde8791364 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -33,12 +33,27 @@ import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MainRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.store.MessageStoreClosedException; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.output.ProtocolOutputConverter; @@ -118,7 +133,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) - throws AMQException + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; @@ -144,7 +159,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, - AMQStateManager stateManager) throws AMQException + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -197,7 +212,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void frameReceived(AMQFrame frame) throws AMQException + private void frameReceived(AMQFrame frame) { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -207,26 +222,57 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.debug("Frame Received: " + frame); } - if (body instanceof AMQMethodBody) - { - methodFrameReceived(channelId, (AMQMethodBody) body); - } - else if (body instanceof ContentHeaderBody) - { - contentHeaderReceived(channelId, (ContentHeaderBody) body); - } - else if (body instanceof ContentBody) + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) { - contentBodyReceived(channelId, (ContentBody) body); + if (body instanceof ChannelCloseOkBody) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); + } + + return; + } } - else if (body instanceof HeartbeatBody) + try { - // NO OP + if (body instanceof AMQMethodBody) + { + methodFrameReceived(channelId, (AMQMethodBody) body); + } + else if (body instanceof ContentHeaderBody) + { + contentHeaderReceived(channelId, (ContentHeaderBody) body); + } + else if (body instanceof ContentBody) + { + contentBodyReceived(channelId, (ContentBody) body); + } + else if (body instanceof HeartbeatBody) + { + // NO OP + } + else + { + _logger.warn("Unrecognised frame " + frame.getClass().getName()); + } } - else + catch (AMQException e) { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); + //This will occur if we receive Content*Body chunks during an 'inverse' shutdown. + // That is one where were the store shuts down before we can gracefully close connections. + // note: todo: Here we should send forced ConnectionClose frames. + _logger.error("AMQException occured whilst receiving Frame:" + e); } + } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -246,12 +292,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable // Interfacing with generated code - be aware of possible changes to parameter order as versions change. AMQFrame response = - ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) getProtocolMajorVersion(), // versionMajor - (short) getProtocolMinorVersion()); // versionMinor + ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) getProtocolMajorVersion(), // versionMajor + (short) getProtocolMinorVersion()); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -271,35 +317,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private void methodFrameReceived(int channelId, AMQMethodBody methodBody) { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((evt.getMethod() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); - } - - return; - } - } - try { try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if (!_frameListeners.isEmpty()) @@ -342,8 +365,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable closeSession(); AMQConnectionException ce = - evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(ce.getCloseFrame(channelId)); @@ -363,23 +386,41 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { + //NOTE: Currently we throw AMQExceptions sub-classes that are not Protcol problems. + // These items should not cause the connection to close unless there is no other option. + //note; todo: This should cause the connection to close + if (e instanceof MessageStoreClosedException) + { + _logger.error("Message Store is closed so unable to perform action:" + e); + // This should really close the exception as mentioned below. + return; + } + + //NOTE: TODO: While this is the responsible for closing the connection as a last resort the above section + // May have a problem closing channel ... This may be related to a connection fault but we should still + // attempt to send a connection close so that the connecion may be shutdown gracefully. + + //Detect when needed and shutdown connection gracefully .. such as Logged MSCException above + + // If an AMQException gets to here then there it should ONLY be donw to a protocol error + // from the above attempts to close the Connection. + // Notify any exceptions listeners before we just close the conneciton _stateManager.error(e); for (AMQMethodListener listener : _frameListeners) { listener.error(e); } + // This is the last resort _minaProtocolSession.close(); } } private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { - AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentHeader(body, this); - } private void contentBodyReceived(int channelId, ContentBody body) throws AMQException @@ -430,7 +471,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public AMQChannel getChannel(int channelId) throws AMQException { final AMQChannel channel = - ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); if ((channel == null) || channel.isClosing()) { return null; @@ -463,8 +504,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable if (_channelMap.size() == _maxNoOfChannels) { String errorMessage = - toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels - + "); can't create channel"; + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index c5ec28e626..e58a2bfac4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -171,7 +171,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter //fixme -- this can be null if (amqProtocolSession != null) { - amqProtocolSession.closeSession(); + try + { + amqProtocolSession.closeSession(); + } + catch (AMQException e) + { + _logger.error("Caught AMQException whilst closingSession:" + e); + } } } @@ -205,7 +212,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } else if (throwable instanceof IOException) { - _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable); + _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 22fa0fab23..455983c6d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,15 +20,14 @@ */ package org.apache.qpid.server.registry; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.virtualhost.VirtualHost; +import java.util.HashMap; +import java.util.Map; + /** * An abstract application registry that provides access to configuration information and handles the * construction and caching of configurable objects. @@ -59,24 +58,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void run() { _logger.info("Shutting down application registries..."); - try - { - synchronized (ApplicationRegistry.class) - { - Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator(); - - while (keyIterator.hasNext()) - { - IApplicationRegistry instance = keyIterator.next(); - - instance.close(); - } - } - } - catch (Exception e) - { - _logger.error("Error shutting down message store: " + e, e); - } + removeAll(); } } @@ -116,6 +98,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } catch (Exception e) { + _logger.error("Error shutting down message store: " + e, e); } finally @@ -124,6 +107,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry } } + public static void removeAll() + { + Object[] keys = _instanceMap.keySet().toArray(); + for (Object k : keys) + { + remove((Integer) k); + } + } protected ApplicationRegistry(Configuration configuration) { @@ -154,7 +145,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry catch (Exception e) { _logger.error("Error configuring application: " + e, e); - //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); + //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID); throw new RuntimeException("Unable to create Application Registry", e); } } @@ -167,7 +158,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry public void close() throws Exception { - for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) + for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts()) { virtualHost.close(); } @@ -204,7 +195,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry return instance; } - public static void setDefaultApplicationRegistry(String clazz) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 8ccb0be0a8..7a6e0b011f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -20,27 +20,26 @@ */ package org.apache.qpid.server.store; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.configuration.Configuration; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; -/** - * A simple message store that stores the messages in a threadsafe structure in memory. - */ +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** A simple message store that stores the messages in a threadsafe structure in memory. */ public class MemoryMessageStore implements MessageStore { private static final Logger _log = Logger.getLogger(MemoryMessageStore.class); @@ -54,6 +53,7 @@ public class MemoryMessageStore implements MessageStore protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap; private final AtomicLong _messageId = new AtomicLong(1); + private AtomicBoolean _closed = new AtomicBoolean(false); public void configure() { @@ -77,6 +77,7 @@ public class MemoryMessageStore implements MessageStore public void close() throws Exception { + _closed.getAndSet(true); if (_metaDataMap != null) { _metaDataMap.clear(); @@ -89,8 +90,9 @@ public class MemoryMessageStore implements MessageStore } } - public void removeMessage(StoreContext context, Long messageId) + public void removeMessage(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); if (_log.isDebugEnabled()) { _log.debug("Removing message with id " + messageId); @@ -172,9 +174,10 @@ public class MemoryMessageStore implements MessageStore public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException { + checkNotClosed(); List<ContentChunk> bodyList = _contentBodyMap.get(messageId); - if(bodyList == null && lastContentBody) + if (bodyList == null && lastContentBody) { _contentBodyMap.put(messageId, Collections.singletonList(contentBody)); } @@ -193,17 +196,28 @@ public class MemoryMessageStore implements MessageStore public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException { + checkNotClosed(); _metaDataMap.put(messageId, messageMetaData); } - public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException + public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException { + checkNotClosed(); return _metaDataMap.get(messageId); } public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException { + checkNotClosed(); List<ContentChunk> bodyList = _contentBodyMap.get(messageId); return bodyList.get(index); } + + private void checkNotClosed() throws MessageStoreClosedException + { + if (_closed.get()) + { + throw new MessageStoreClosedException(); + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java new file mode 100644 index 0000000000..3d1538c7eb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java @@ -0,0 +1,36 @@ +package org.apache.qpid.server.store; + +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * NOTE: this class currently extends AMQException but + * we should be using AMQExceptions internally in the code base for Protocol errors hence + * the message store interface should throw a different super class which this should be + * moved to reflect + */ +public class MessageStoreClosedException extends AMQException +{ + public MessageStoreClosedException() + { + super("Message store closed"); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 9abc94b3df..cbb98a2dd5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1286,4 +1286,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { return _sessions.get(channelId); } + + public boolean isFailingOver() + { + return (_protocolHandler.getFailoverLatch() != null); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index fd795392ee..a052b48426 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -420,7 +420,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * * @throws IllegalStateException If the session is closed. */ - public void acknowledge() throws IllegalStateException + public void acknowledge() throws JMSException { if (isClosed()) { @@ -2510,6 +2510,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi for (Iterator it = consumers.iterator(); it.hasNext();) { BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); + consumer.failedOver(); registerConsumer(consumer, true); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 773401d03a..44c10afcf5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; - import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -277,8 +275,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer _session.setInRecovery(false); } - private void acquireReceiving() throws JMSException + /** + * @param immediate if true then return immediately if the connection is failing over + * + * @return boolean if the acquisition was successful + * + * @throws JMSException + * @throws InterruptedException + */ + private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException { + if (_connection.isFailingOver()) + { + if (immediate) + { + return false; + } + else + { + _connection.blockUntilNotFailingOver(); + } + } + if (!_receiving.compareAndSet(false, true)) { throw new javax.jms.IllegalStateException("Another thread is already receiving."); @@ -290,6 +308,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } _receivingThread = Thread.currentThread(); + return true; } private void releaseReceiving() @@ -343,7 +362,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer checkPreConditions(); - acquireReceiving(); + try + { + acquireReceiving(false); + } + catch (InterruptedException e) + { + _logger.warn("Interrupted: " + e); + if (isClosed()) + { + return null; + } + } _session.startDistpatcherIfNecessary(); @@ -424,7 +454,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { checkPreConditions(); - acquireReceiving(); + try + { + if (!acquireReceiving(true)) + { + //If we couldn't acquire the receiving thread then return null. + // This will occur if failing over. + return null; + } + } + catch (InterruptedException e) + { + /* + * This seems slightly shoddy but should never actually be executed + * since we told acquireReceiving to return immediately and it shouldn't + * block on anything. + */ + + return null; + } _session.startDistpatcherIfNecessary(); @@ -868,11 +916,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } } - public void acknowledge() // throws JMSException + public void acknowledge() throws JMSException { - if (!isClosed()) + if (isClosed()) + { + throw new IllegalStateException("Consumer is closed"); + } + else if (_session.hasFailedOver()) + { + throw new JMSException("has failed over"); + } + else { - Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator(); while (tags.hasNext()) { @@ -880,10 +935,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer tags.remove(); } } - else - { - throw new IllegalStateException("Consumer is closed"); - } } /** Called on recovery to reset the list of delivery tags */ @@ -1022,4 +1073,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _synchronousQueue.clear(); } + + /** to be called when a failover has occured */ + public void failedOver() + { + clearReceiveQueue(); + clearUnackedMessages(); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 040b5f7b68..49d528a4f4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; /** @@ -99,8 +98,8 @@ public class TransportConnection if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio")) { _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") - ? "Qpid NIO is new default" - : "Sysproperty 'qpidnio' is set")); + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); result = new MultiThreadSocketConnector(); } else @@ -277,8 +276,7 @@ public class TransportConnection } AMQVMBrokerCreationException amqbce = - new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null); - amqbce.initCause(e); + new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e); throw amqbce; } @@ -291,14 +289,11 @@ public class TransportConnection _acceptor.unbindAll(); synchronized (_inVmPipeAddress) { - Iterator keys = _inVmPipeAddress.keySet().iterator(); - - while (keys.hasNext()) - { - int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); - } - } + _inVmPipeAddress.clear(); + } + _acceptor = null; + _currentInstance = -1; + _currentVMPort = -1; } public static void killVMBroker(int port) diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java new file mode 100644 index 0000000000..fffe073362 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -0,0 +1,222 @@ +package org.apache.qpid.test.client.failover; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.log4j.Logger; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; + +public class FailoverTest extends TestCase implements ConnectionListener +{ + private static final Logger _logger = Logger.getLogger(FailoverTest.class); + + private static final int NUM_BROKERS = 2; + private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'"; + private static final String QUEUE = "queue"; + private static final int NUM_MESSAGES = 10; + private Connection con; + private AMQConnectionFactory conFactory; + private Session prodSess; + private AMQQueue q; + private MessageProducer prod; + private Session conSess; + private MessageConsumer consumer; + + private static int usedBrokers = 0; + private CountDownLatch failoverComplete; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + // Create two VM brokers + + for (int i = 0; i < NUM_BROKERS; i++) + { + usedBrokers++; + + TransportConnection.createVMBroker(usedBrokers); + } + //undo last addition + + conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers)); + _logger.info("Connecting on:" + conFactory.getConnectionURL()); + con = conFactory.createConnection(); + ((AMQConnection) con).setConnectionListener(this); + con.start(); + failoverComplete = new CountDownLatch(1); + } + + private void init(boolean transacted, int mode) throws JMSException + { + prodSess = con.createSession(transacted, mode); + q = new AMQQueue("amq.direct", QUEUE); + prod = prodSess.createProducer(q); + conSess = con.createSession(transacted, mode); + consumer = conSess.createConsumer(q); + } + + @Override + protected void tearDown() throws Exception + { + try + { + con.close(); + } + catch (Exception e) + { + + } + + try + { + TransportConnection.killAllVMBrokers(); + ApplicationRegistry.removeAll(); + } + catch (Exception e) + { + fail("Unable to clean up"); + } + super.tearDown(); + } + + private void consumeMessages(int toConsume) throws JMSException + { + Message msg; + for (int i = 0; i < toConsume; i++) + { + msg = consumer.receive(1000); + assertNotNull("Message " + i + " was null!", msg); + assertEquals("message " + i, ((TextMessage) msg).getText()); + } + } + + private void sendMessages(int totalMessages) throws JMSException + { + for (int i = 0; i < totalMessages; i++) + { + prod.send(prodSess.createTextMessage("message " + i)); + } + +// try +// { +// Thread.sleep(100 * totalMessages); +// } +// catch (InterruptedException e) +// { +// //evil ignoring of IE +// } + } + + public void testP2PFailover() throws Exception + { + testP2PFailover(NUM_MESSAGES, true); + } + + public void testP2PFailoverWithMessagesLeft() throws Exception + { + testP2PFailover(NUM_MESSAGES, false); + } + + private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException + { + Message msg = null; + init(false, Session.AUTO_ACKNOWLEDGE); + sendMessages(totalMessages); + + // Consume some messages + int toConsume = totalMessages; + if (!consumeAll) + { + toConsume = totalMessages / 2; + } + + consumeMessages(toConsume); + + _logger.info("Failing over"); + + causeFailure(); + + msg = consumer.receive(500); + //todo: reinstate + assertNull("Should not have received message from new broker!", msg); + // Check that messages still sent / received + sendMessages(totalMessages); + consumeMessages(totalMessages); + } + + private void causeFailure() + { + _logger.info("Failover"); + + TransportConnection.killVMBroker(usedBrokers - 1); + ApplicationRegistry.remove(usedBrokers - 1); + + _logger.info("Awaiting Failover completion"); + try + { + failoverComplete.await(); + } + catch (InterruptedException e) + { + //evil ignore IE. + } + } + + public void testClientAckFailover() throws Exception + { + init(false, Session.CLIENT_ACKNOWLEDGE); + sendMessages(1); + Message msg = consumer.receive(); + assertNotNull("Expected msgs not received", msg); + + + causeFailure(); + + Exception failure = null; + try + { + msg.acknowledge(); + } + catch (Exception e) + { + failure = e; + } + assertNotNull("Exception should be thrown", failure); + } + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + failoverComplete.countDown(); + } +} |