summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java149
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java44
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java36
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java82
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java21
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java222
11 files changed, 501 insertions, 125 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 17300d6b50..25669c62d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -229,7 +229,7 @@ public class AMQChannel
BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeaderBody.properties;
//fixme: fudge for QPID-677
properties.getHeaders().keySet();
-
+
properties.setUserId(protocolSession.getAuthorizedID().getName());
}
@@ -381,7 +381,14 @@ public class AMQChannel
{
_txnContext.rollback();
unsubscribeAllConsumers(session);
- requeue();
+ try
+ {
+ requeue();
+ }
+ catch (AMQException e)
+ {
+ _log.error("Caught AMQException whilst attempting to reque:" + e);
+ }
setClosing(true);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 5bfd47b469..cde8791364 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -33,12 +33,27 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MainRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.MessageStoreClosedException;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -118,7 +133,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
- throws AMQException
+ throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
@@ -144,7 +159,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
- AMQStateManager stateManager) throws AMQException
+ AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
@@ -197,7 +212,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- private void frameReceived(AMQFrame frame) throws AMQException
+ private void frameReceived(AMQFrame frame)
{
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
@@ -207,26 +222,57 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_logger.debug("Frame Received: " + frame);
}
- if (body instanceof AMQMethodBody)
- {
- methodFrameReceived(channelId, (AMQMethodBody) body);
- }
- else if (body instanceof ContentHeaderBody)
- {
- contentHeaderReceived(channelId, (ContentHeaderBody) body);
- }
- else if (body instanceof ContentBody)
+ // Check that this channel is not closing
+ if (channelAwaitingClosure(channelId))
{
- contentBodyReceived(channelId, (ContentBody) body);
+ if (body instanceof ChannelCloseOkBody)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
+ }
+
+ return;
+ }
}
- else if (body instanceof HeartbeatBody)
+ try
{
- // NO OP
+ if (body instanceof AMQMethodBody)
+ {
+ methodFrameReceived(channelId, (AMQMethodBody) body);
+ }
+ else if (body instanceof ContentHeaderBody)
+ {
+ contentHeaderReceived(channelId, (ContentHeaderBody) body);
+ }
+ else if (body instanceof ContentBody)
+ {
+ contentBodyReceived(channelId, (ContentBody) body);
+ }
+ else if (body instanceof HeartbeatBody)
+ {
+ // NO OP
+ }
+ else
+ {
+ _logger.warn("Unrecognised frame " + frame.getClass().getName());
+ }
}
- else
+ catch (AMQException e)
{
- _logger.warn("Unrecognised frame " + frame.getClass().getName());
+ //This will occur if we receive Content*Body chunks during an 'inverse' shutdown.
+ // That is one where were the store shuts down before we can gracefully close connections.
+ // note: todo: Here we should send forced ConnectionClose frames.
+ _logger.error("AMQException occured whilst receiving Frame:" + e);
}
+
}
private void protocolInitiationReceived(ProtocolInitiation pi)
@@ -246,12 +292,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
AMQFrame response =
- ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) getProtocolMajorVersion(), // versionMajor
- (short) getProtocolMinorVersion()); // versionMinor
+ ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) getProtocolMajorVersion(), // versionMajor
+ (short) getProtocolMinorVersion()); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -271,35 +317,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
-
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- // Check that this channel is not closing
- if (channelAwaitingClosure(channelId))
- {
- if ((evt.getMethod() instanceof ChannelCloseOkBody))
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
- }
- }
- else
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
- }
-
- return;
- }
- }
-
try
{
try
{
-
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if (!_frameListeners.isEmpty())
@@ -342,8 +365,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
closeSession();
AMQConnectionException ce =
- evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(ce.getCloseFrame(channelId));
@@ -363,23 +386,41 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
catch (Exception e)
{
+ //NOTE: Currently we throw AMQExceptions sub-classes that are not Protcol problems.
+ // These items should not cause the connection to close unless there is no other option.
+ //note; todo: This should cause the connection to close
+ if (e instanceof MessageStoreClosedException)
+ {
+ _logger.error("Message Store is closed so unable to perform action:" + e);
+ // This should really close the exception as mentioned below.
+ return;
+ }
+
+ //NOTE: TODO: While this is the responsible for closing the connection as a last resort the above section
+ // May have a problem closing channel ... This may be related to a connection fault but we should still
+ // attempt to send a connection close so that the connecion may be shutdown gracefully.
+
+ //Detect when needed and shutdown connection gracefully .. such as Logged MSCException above
+
+ // If an AMQException gets to here then there it should ONLY be donw to a protocol error
+ // from the above attempts to close the Connection.
+ // Notify any exceptions listeners before we just close the conneciton
_stateManager.error(e);
for (AMQMethodListener listener : _frameListeners)
{
listener.error(e);
}
+ // This is the last resort
_minaProtocolSession.close();
}
}
private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
-
AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentHeader(body, this);
-
}
private void contentBodyReceived(int channelId, ContentBody body) throws AMQException
@@ -430,7 +471,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public AMQChannel getChannel(int channelId) throws AMQException
{
final AMQChannel channel =
- ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
return null;
@@ -463,8 +504,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
if (_channelMap.size() == _maxNoOfChannels)
{
String errorMessage =
- toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
- + "); can't create channel";
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index c5ec28e626..e58a2bfac4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -171,7 +171,14 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
//fixme -- this can be null
if (amqProtocolSession != null)
{
- amqProtocolSession.closeSession();
+ try
+ {
+ amqProtocolSession.closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Caught AMQException whilst closingSession:" + e);
+ }
}
}
@@ -205,7 +212,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter
}
else if (throwable instanceof IOException)
{
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 22fa0fab23..455983c6d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,14 @@
*/
package org.apache.qpid.server.registry;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* An abstract application registry that provides access to configuration information and handles the
* construction and caching of configurable objects.
@@ -59,24 +58,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public void run()
{
_logger.info("Shutting down application registries...");
- try
- {
- synchronized (ApplicationRegistry.class)
- {
- Iterator<IApplicationRegistry> keyIterator = _instanceMap.values().iterator();
-
- while (keyIterator.hasNext())
- {
- IApplicationRegistry instance = keyIterator.next();
-
- instance.close();
- }
- }
- }
- catch (Exception e)
- {
- _logger.error("Error shutting down message store: " + e, e);
- }
+ removeAll();
}
}
@@ -116,6 +98,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
catch (Exception e)
{
+ _logger.error("Error shutting down message store: " + e, e);
}
finally
@@ -124,6 +107,14 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
}
}
+ public static void removeAll()
+ {
+ Object[] keys = _instanceMap.keySet().toArray();
+ for (Object k : keys)
+ {
+ remove((Integer) k);
+ }
+ }
protected ApplicationRegistry(Configuration configuration)
{
@@ -154,7 +145,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
catch (Exception e)
{
_logger.error("Error configuring application: " + e, e);
- //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
+ //throw new AMQBrokerCreationException(instanceID, "Unable to create Application Registry instance " + instanceID);
throw new RuntimeException("Unable to create Application Registry", e);
}
}
@@ -167,7 +158,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
public void close() throws Exception
{
- for(VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
+ for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
{
virtualHost.close();
}
@@ -204,7 +195,6 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
return instance;
}
-
public static void setDefaultApplicationRegistry(String clazz)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 8ccb0be0a8..7a6e0b011f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -20,27 +20,26 @@
*/
package org.apache.qpid.server.store;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-/**
- * A simple message store that stores the messages in a threadsafe structure in memory.
- */
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A simple message store that stores the messages in a threadsafe structure in memory. */
public class MemoryMessageStore implements MessageStore
{
private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
@@ -54,6 +53,7 @@ public class MemoryMessageStore implements MessageStore
protected ConcurrentMap<Long, List<ContentChunk>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
+ private AtomicBoolean _closed = new AtomicBoolean(false);
public void configure()
{
@@ -77,6 +77,7 @@ public class MemoryMessageStore implements MessageStore
public void close() throws Exception
{
+ _closed.getAndSet(true);
if (_metaDataMap != null)
{
_metaDataMap.clear();
@@ -89,8 +90,9 @@ public class MemoryMessageStore implements MessageStore
}
}
- public void removeMessage(StoreContext context, Long messageId)
+ public void removeMessage(StoreContext context, Long messageId) throws AMQException
{
+ checkNotClosed();
if (_log.isDebugEnabled())
{
_log.debug("Removing message with id " + messageId);
@@ -172,9 +174,10 @@ public class MemoryMessageStore implements MessageStore
public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody)
throws AMQException
{
+ checkNotClosed();
List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
- if(bodyList == null && lastContentBody)
+ if (bodyList == null && lastContentBody)
{
_contentBodyMap.put(messageId, Collections.singletonList(contentBody));
}
@@ -193,17 +196,28 @@ public class MemoryMessageStore implements MessageStore
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
throws AMQException
{
+ checkNotClosed();
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(StoreContext context,Long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
+ checkNotClosed();
return _metaDataMap.get(messageId);
}
public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
+ checkNotClosed();
List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
return bodyList.get(index);
}
+
+ private void checkNotClosed() throws MessageStoreClosedException
+ {
+ if (_closed.get())
+ {
+ throw new MessageStoreClosedException();
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
new file mode 100644
index 0000000000..3d1538c7eb
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreClosedException.java
@@ -0,0 +1,36 @@
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.AMQException;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * NOTE: this class currently extends AMQException but
+ * we should be using AMQExceptions internally in the code base for Protocol errors hence
+ * the message store interface should throw a different super class which this should be
+ * moved to reflect
+ */
+public class MessageStoreClosedException extends AMQException
+{
+ public MessageStoreClosedException()
+ {
+ super("Message store closed");
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 9abc94b3df..cbb98a2dd5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1286,4 +1286,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
return _sessions.get(channelId);
}
+
+ public boolean isFailingOver()
+ {
+ return (_protocolHandler.getFailoverLatch() != null);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index fd795392ee..a052b48426 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -420,7 +420,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*
* @throws IllegalStateException If the session is closed.
*/
- public void acknowledge() throws IllegalStateException
+ public void acknowledge() throws JMSException
{
if (isClosed())
{
@@ -2510,6 +2510,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (Iterator it = consumers.iterator(); it.hasNext();)
{
BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+ consumer.failedOver();
registerConsumer(consumer, true);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 773401d03a..44c10afcf5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -33,14 +33,12 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -277,8 +275,28 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_session.setInRecovery(false);
}
- private void acquireReceiving() throws JMSException
+ /**
+ * @param immediate if true then return immediately if the connection is failing over
+ *
+ * @return boolean if the acquisition was successful
+ *
+ * @throws JMSException
+ * @throws InterruptedException
+ */
+ private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
{
+ if (_connection.isFailingOver())
+ {
+ if (immediate)
+ {
+ return false;
+ }
+ else
+ {
+ _connection.blockUntilNotFailingOver();
+ }
+ }
+
if (!_receiving.compareAndSet(false, true))
{
throw new javax.jms.IllegalStateException("Another thread is already receiving.");
@@ -290,6 +308,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
_receivingThread = Thread.currentThread();
+ return true;
}
private void releaseReceiving()
@@ -343,7 +362,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ acquireReceiving(false);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Interrupted: " + e);
+ if (isClosed())
+ {
+ return null;
+ }
+ }
_session.startDistpatcherIfNecessary();
@@ -424,7 +454,25 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
checkPreConditions();
- acquireReceiving();
+ try
+ {
+ if (!acquireReceiving(true))
+ {
+ //If we couldn't acquire the receiving thread then return null.
+ // This will occur if failing over.
+ return null;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ /*
+ * This seems slightly shoddy but should never actually be executed
+ * since we told acquireReceiving to return immediately and it shouldn't
+ * block on anything.
+ */
+
+ return null;
+ }
_session.startDistpatcherIfNecessary();
@@ -868,11 +916,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- public void acknowledge() // throws JMSException
+ public void acknowledge() throws JMSException
{
- if (!isClosed())
+ if (isClosed())
+ {
+ throw new IllegalStateException("Consumer is closed");
+ }
+ else if (_session.hasFailedOver())
+ {
+ throw new JMSException("has failed over");
+ }
+ else
{
-
Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
while (tags.hasNext())
{
@@ -880,10 +935,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
tags.remove();
}
}
- else
- {
- throw new IllegalStateException("Consumer is closed");
- }
}
/** Called on recovery to reset the list of delivery tags */
@@ -1022,4 +1073,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_synchronousQueue.clear();
}
+
+ /** to be called when a failover has occured */
+ public void failedOver()
+ {
+ clearReceiveQueue();
+ clearUnackedMessages();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 040b5f7b68..49d528a4f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -99,8 +98,8 @@ public class TransportConnection
if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
{
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
- ? "Qpid NIO is new default"
- : "Sysproperty 'qpidnio' is set"));
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
result = new MultiThreadSocketConnector();
}
else
@@ -277,8 +276,7 @@ public class TransportConnection
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
- amqbce.initCause(e);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
throw amqbce;
}
@@ -291,14 +289,11 @@ public class TransportConnection
_acceptor.unbindAll();
synchronized (_inVmPipeAddress)
{
- Iterator keys = _inVmPipeAddress.keySet().iterator();
-
- while (keys.hasNext())
- {
- int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
- }
- }
+ _inVmPipeAddress.clear();
+ }
+ _acceptor = null;
+ _currentInstance = -1;
+ _currentVMPort = -1;
}
public static void killVMBroker(int port)
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
new file mode 100644
index 0000000000..fffe073362
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -0,0 +1,222 @@
+package org.apache.qpid.test.client.failover;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+
+public class FailoverTest extends TestCase implements ConnectionListener
+{
+ private static final Logger _logger = Logger.getLogger(FailoverTest.class);
+
+ private static final int NUM_BROKERS = 2;
+ private static final String BROKER = "amqp://guest:guest@/test?brokerlist='vm://:%d;vm://:%d'";
+ private static final String QUEUE = "queue";
+ private static final int NUM_MESSAGES = 10;
+ private Connection con;
+ private AMQConnectionFactory conFactory;
+ private Session prodSess;
+ private AMQQueue q;
+ private MessageProducer prod;
+ private Session conSess;
+ private MessageConsumer consumer;
+
+ private static int usedBrokers = 0;
+ private CountDownLatch failoverComplete;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ // Create two VM brokers
+
+ for (int i = 0; i < NUM_BROKERS; i++)
+ {
+ usedBrokers++;
+
+ TransportConnection.createVMBroker(usedBrokers);
+ }
+ //undo last addition
+
+ conFactory = new AMQConnectionFactory(String.format(BROKER, usedBrokers - 1, usedBrokers));
+ _logger.info("Connecting on:" + conFactory.getConnectionURL());
+ con = conFactory.createConnection();
+ ((AMQConnection) con).setConnectionListener(this);
+ con.start();
+ failoverComplete = new CountDownLatch(1);
+ }
+
+ private void init(boolean transacted, int mode) throws JMSException
+ {
+ prodSess = con.createSession(transacted, mode);
+ q = new AMQQueue("amq.direct", QUEUE);
+ prod = prodSess.createProducer(q);
+ conSess = con.createSession(transacted, mode);
+ consumer = conSess.createConsumer(q);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+
+ }
+
+ try
+ {
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.removeAll();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to clean up");
+ }
+ super.tearDown();
+ }
+
+ private void consumeMessages(int toConsume) throws JMSException
+ {
+ Message msg;
+ for (int i = 0; i < toConsume; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull("Message " + i + " was null!", msg);
+ assertEquals("message " + i, ((TextMessage) msg).getText());
+ }
+ }
+
+ private void sendMessages(int totalMessages) throws JMSException
+ {
+ for (int i = 0; i < totalMessages; i++)
+ {
+ prod.send(prodSess.createTextMessage("message " + i));
+ }
+
+// try
+// {
+// Thread.sleep(100 * totalMessages);
+// }
+// catch (InterruptedException e)
+// {
+// //evil ignoring of IE
+// }
+ }
+
+ public void testP2PFailover() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, true);
+ }
+
+ public void testP2PFailoverWithMessagesLeft() throws Exception
+ {
+ testP2PFailover(NUM_MESSAGES, false);
+ }
+
+ private void testP2PFailover(int totalMessages, boolean consumeAll) throws JMSException
+ {
+ Message msg = null;
+ init(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(totalMessages);
+
+ // Consume some messages
+ int toConsume = totalMessages;
+ if (!consumeAll)
+ {
+ toConsume = totalMessages / 2;
+ }
+
+ consumeMessages(toConsume);
+
+ _logger.info("Failing over");
+
+ causeFailure();
+
+ msg = consumer.receive(500);
+ //todo: reinstate
+ assertNull("Should not have received message from new broker!", msg);
+ // Check that messages still sent / received
+ sendMessages(totalMessages);
+ consumeMessages(totalMessages);
+ }
+
+ private void causeFailure()
+ {
+ _logger.info("Failover");
+
+ TransportConnection.killVMBroker(usedBrokers - 1);
+ ApplicationRegistry.remove(usedBrokers - 1);
+
+ _logger.info("Awaiting Failover completion");
+ try
+ {
+ failoverComplete.await();
+ }
+ catch (InterruptedException e)
+ {
+ //evil ignore IE.
+ }
+ }
+
+ public void testClientAckFailover() throws Exception
+ {
+ init(false, Session.CLIENT_ACKNOWLEDGE);
+ sendMessages(1);
+ Message msg = consumer.receive();
+ assertNotNull("Expected msgs not received", msg);
+
+
+ causeFailure();
+
+ Exception failure = null;
+ try
+ {
+ msg.acknowledge();
+ }
+ catch (Exception e)
+ {
+ failure = e;
+ }
+ assertNotNull("Exception should be thrown", failure);
+ }
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ failoverComplete.countDown();
+ }
+}