diff options
17 files changed, 914 insertions, 285 deletions
diff --git a/qpid/java/broker/etc/md5passwd b/qpid/java/broker/etc/md5passwd index 7984b742e0..6a149919de 100644 --- a/qpid/java/broker/etc/md5passwd +++ b/qpid/java/broker/etc/md5passwd @@ -16,6 +16,6 @@ # specific language governing permissions and limitations
# under the License.
#
-guest:qfgyy4ewnVMBg
+guest:CE4DQ6BIb/BVMN9scFyLtA==
admin:ISMvKXpXpadDiUoOSoAfww==
user:aBzonUodYLhwSa8s9A10sA==
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 43a04dbfa1..28a9e85489 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -20,18 +20,9 @@ */ package org.apache.qpid.server; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentBody; @@ -52,6 +43,16 @@ import org.apache.qpid.server.queue.Subscription; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class AMQChannel { public static final int DEFAULT_PREFETCH = 5000; @@ -208,7 +209,8 @@ public class AMQChannel _currentMessage.setPublisher(publisher); } - public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException + public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) + throws AMQException { if (_currentMessage == null) { @@ -230,6 +232,7 @@ public class AMQChannel // check and deliver if header says body length is zero if (contentHeaderBody.bodySize == 0) { + _txnContext.messageProcessed(protocolSession); _currentMessage = null; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 01242f90de..0dcceaddbb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange if (queues == null || queues.isEmpty()) { String msg = "Routing key " + routingKey + " is not known to " + this; - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { throw new NoRouteException(msg, payload, null); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 222e341b1a..f6a95b5e55 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.server.exchange; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.LinkedList; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.management.MBeanConstructor; +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; import javax.management.JMException; import javax.management.MBeanException; @@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.abstraction.MessagePublishInfo; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; public class DestWildExchange extends AbstractExchange { private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); - // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); + private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = + new ConcurrentHashMap<AMQShortString, List<AMQQueue>>(); + // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>(); private static final String TOPIC_SEPARATOR = "."; private static final String AMQP_STAR = "*"; private static final String AMQP_HASH = "#"; @@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange queueList.add(q.getName().toString()); } - Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; + Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) }; CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); _bindingList.put(bindingData); } @@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public AMQShortString getType() { return ExchangeDefaults.TOPIC_EXCHANGE_CLASS; @@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange { queueList = _routingKey2queues.get(routingKey); } + if (!queueList.contains(queue)) { queueList.add(queue); @@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange for (int index = 0; index < size; index++) { - //if there are more levels - if (index + 1 < size) + // if there are more levels + if ((index + 1) < size) { if (_subscription.get(index).equals(AMQP_HASH)) { @@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange // we don't need #.# delete this one _subscription.remove(index); size--; - //redo this normalisation + // redo this normalisation index--; } @@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange _subscription.add(index + 1, _subscription.remove(index)); } } - }//if we have more levels + } // if we have more levels } StringBuilder sb = new StringBuilder(); @@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange List<AMQQueue> queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do // TODO: add support for the immediate flag - if (queues == null || queues.size() == 0) + if ((queues == null) || queues.isEmpty()) { - if (info.isMandatory()) + if (info.isMandatory() || info.isImmediate()) { String msg = "Topic " + routingKey + " is not known to " + this; throw new NoRouteException(msg, payload, null); @@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange { _logger.warn("No queues found for routing key " + routingKey); _logger.warn("Routing map contains: " + _routingKey2queues); + return; } } @@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && queues.contains(queue); - } + return (queues != null) && queues.contains(queue); + } public boolean isBound(AMQShortString routingKey) throws AMQException { List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey)); - return queues != null && !queues.isEmpty(); + + return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) throws AMQException @@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange return true; } } + return false; } @@ -279,12 +284,14 @@ public class DestWildExchange extends AbstractExchange " with routing key " + routingKey + ". No queue was registered with that routing key", null); } + boolean removedQ = queues.remove(queue); if (!removedQ) { throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey, null); } + if (queues.isEmpty()) { _routingKey2queues.remove(routingKey); @@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange } } - private List<AMQQueue> getMatchedQueues(AMQShortString routingKey) { List<AMQQueue> list = new LinkedList<AMQQueue>(); @@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange queueList.add(queTok.nextToken()); } - int depth = 0; boolean matching = true; boolean done = false; @@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange while (matching && !done) { - if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip) + if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) { done = true; // if it was the routing key that ran out of digits - if (routingkeyList.size() == depth + routingskip) + if (routingkeyList.size() == (depth + routingskip)) { if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1; + { // a hash and it is the last entry + matching = + queueList.get(depth + queueskip).equals(AMQP_HASH) + && (queueList.size() == (depth + queueskip + 1)); } } - else if (routingkeyList.size() > depth + routingskip) + else if (routingkeyList.size() > (depth + routingskip)) { // There is still more routing key to check matching = false; } - continue; } @@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) { // Is this a # at the end - if (queueList.size() == depth + queueskip + 1) + if (queueList.size() == (depth + queueskip + 1)) { done = true; + continue; } // otherwise # in the middle - while (routingkeyList.size() > depth + routingskip) + while (routingkeyList.size() > (depth + routingskip)) { if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) { queueskip++; depth++; + break; } + routingskip++; } + continue; } + matching = false; } + depth++; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6148fd4e1c..bf00eeb9d3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -1,27 +1,36 @@ /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange {
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange }
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange }
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,7 +144,6 @@ public class FanoutExchange extends AbstractExchange {
assert queue != null;
-
if (!_queues.remove(queue))
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
@@ -159,10 +155,10 @@ public class FanoutExchange extends AbstractExchange {
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
@@ -193,13 +189,12 @@ public class FanoutExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 8205924207..e86094e26f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange String msg = "Exchange " + getName() + ": message not routable."; - if (payload.getMessagePublishInfo().isMandatory()) + if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate()) { throw new NoRouteException(msg, payload, null); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 82e969b496..c9f5e42286 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,24 +20,13 @@ */ package org.apache.qpid.server.protocol; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.security.Principal; - -import javax.management.JMException; -import javax.security.sasl.SaslServer; - import org.apache.log4j.Logger; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoServiceConfig; import org.apache.mina.common.IoSession; import org.apache.mina.transport.vmpipe.VmPipeAddress; + import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; @@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; -import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.output.ProtocolOutputConverter; +import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.AMQState; +import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -public class AMQMinaProtocolSession implements AMQProtocolSession, - Managable +import javax.management.JMException; +import javax.security.sasl.SaslServer; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + +public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); @@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; - public ManagedObject getManagedObject() { return _managedObject; } - - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory) + throws AMQException { _stateManager = new AMQStateManager(virtualHostRegistry, this); _minaProtocolSession = session; session.setAttachment(this); - _codecFactory = codecFactory; - try { IoServiceConfig config = session.getServiceConfig(); @@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, catch (RuntimeException e) { e.printStackTrace(); - // throw e; + // throw e; } -// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, - AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, + AMQStateManager stateManager) throws AMQException { _stateManager = stateManager; _minaProtocolSession = session; @@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return (AMQProtocolSession) minaProtocolSession.getAttachment(); } - public void dataBlockReceived(AMQDataBlock message) - throws Exception + public void dataBlockReceived(AMQDataBlock message) throws Exception { _lastReceived = message; if (message instanceof ProtocolInitiation) @@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - private void frameReceived(AMQFrame frame) - throws AMQException + private void frameReceived(AMQFrame frame) throws AMQException { int channelId = frame.getChannel(); AMQBody body = frame.getBodyFrame(); @@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, String locales = "en_US"; // Interfacing with generated code - be aware of possible changes to parameter order as versions change. - AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, - getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) - locales.getBytes(), // locales - mechanisms.getBytes(), // mechanisms - null, // serverProperties - (short) getProtocolMajorVersion(), // versionMajor - (short) getProtocolMinorVersion()); // versionMinor + AMQFrame response = + ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor) + locales.getBytes(), // locales + mechanisms.getBytes(), // mechanisms + null, // serverProperties + (short) getProtocolMajorVersion(), // versionMajor + (short) getProtocolMinorVersion()); // versionMinor _minaProtocolSession.write(response); } catch (AMQException e) @@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // TODO: Close connection (but how to wait until message is sent?) // ritchiem 2006-12-04 will this not do? -// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); -// future.join(); -// close connection + // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()])); + // future.join(); + // close connection } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) { - final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, - methodBody); + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody); - //Check that this channel is not closing + // Check that this channel is not closing if (channelAwaitingClosure(channelId)) { if ((evt.getMethod() instanceof ChannelCloseOkBody)) @@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); } + return; } } - try { try @@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt) || - wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } + if (!wasAnyoneInterested) { throw new AMQNoMethodHandlerException(evt, null); @@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing channel due to: " + e.getMessage()); } + writeFrame(e.getCloseFrame(channelId)); closeChannel(channelId); } @@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); } + if (_logger.isInfoEnabled()) { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); - AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, - AMQConstant.CHANNEL_ERROR.getName().toString()); + AMQConnectionException ce = + evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(ce.getCloseFrame(channelId)); @@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.info("Closing connection due to: " + e.getMessage()); } + closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(channelId)); @@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { listener.error(e); } + _minaProtocolSession.close(); } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); - channel.publishContentHeader(body); + channel.publishContentHeader(body, this); } @@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null); } + return channel; } public AMQChannel getChannel(int channelId) throws AMQException { - final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId) - ? _cachedChannels[channelId] - : _channelMap.get(channelId); - if (channel == null || channel.isClosing()) + final AMQChannel channel = + ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId); + if ((channel == null) || channel.isClosing()) { return null; } @@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, if (_channelMap.size() == _maxNoOfChannels) { - String errorMessage = toString() + ": maximum number of channels has been reached (" + - _maxNoOfChannels + "); can't create channel"; + String errorMessage = + toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels + + "); can't create channel"; _logger.error(errorMessage); throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null); } @@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _cachedChannels[channelId] = channel; } + checkForNotification(); } @@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void commitTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.commit(); } @@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public void rollbackTransactions(AMQChannel channel) throws AMQException { - if (channel != null && channel.isTransactional()) + if ((channel != null) && channel.isTransactional()) { channel.rollback(); } @@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { channel.close(this); } + _channelMap.clear(); for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { @@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _managedObject.unregister(); } + for (Task task : _taskList) { task.doTask(this); @@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE))); } + if (_clientProperties.getString(ClientProperties.version.toString()) != null) { _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString())); @@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public boolean isProtocolVersion(byte major, byte minor) { - return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor; + return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor); } public VersionSpecificRegistry getRegistry() @@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _registry; } - public Object getClientIdentifier() { return _minaProtocolSession.getRemoteAddress(); } - public VirtualHost getVirtualHost() { return _virtualHost; @@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public String getClientVersion() { - return _clientVersion == null ? null : _clientVersion.toString(); + return (_clientVersion == null) ? null : _clientVersion.toString(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a803ef1227..6273ac997b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -49,6 +49,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; +import javax.management.JMException; + +import java.text.MessageFormat; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described * fully in RFC 006. @@ -607,7 +617,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue delete(); } - public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException + /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException { // fixme not sure what this is doing. should we be passing deliverFirst through here? // This code is not used so when it is perhaps it should @@ -623,7 +633,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue // from the queue: dequeue(storeContext, msg); } - } + }*/ // public DeliveryManager getDeliveryManager() // { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 50fbd47fe9..d59412fdba 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -337,7 +337,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect if ((message == null) || message.equals("")) { - message = "Unable to Connect"; + if (message == null) + { + message = "Unable to Connect"; + } + else // can only be "" if getMessage() returned it therfore lastException != null + { + message = "Unable to Connect:" + lastException.getClass(); + } } AMQException e = new AMQConnectionFailureException(message, null); diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java index a82b05e20f..a904bfa419 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java @@ -252,7 +252,8 @@ public class TestClient implements MessageListener public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
try
{
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java index 1597da6dba..6f2089290a 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -91,7 +92,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti private static final long TEN_MILLI_SEC = 10000000; - private static final long FIVE_MILLI_SEC = 5000000; + private static final int DEBUG_LOG_UPATE_INTERVAL = 10; + private static final int LOG_UPATE_INTERVAL = 10; + private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); /** * Should provide the name of the test case that this class implements. The exact names are defined in the interop @@ -129,6 +132,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); + String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); if (debugLog.isDebugEnabled()) { @@ -150,7 +154,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti session = new Session[1]; connection[0] = - org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl, + org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + clientName, + org.apache.qpid.interop.testclient.TestClient.brokerUrl, org.apache.qpid.interop.testclient.TestClient.virtualHost); session[0] = connection[0].createSession(false, ackMode); @@ -182,6 +188,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti { connection[i] = org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, + clientName, org.apache.qpid.interop.testclient.TestClient.brokerUrl, org.apache.qpid.interop.testclient.TestClient.virtualHost); session[i] = connection[i].createSession(false, ackMode); @@ -192,7 +199,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti MessageConsumer consumer = session[i].createConsumer(sendDestination); - consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination)); + consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination)); } break; @@ -347,7 +354,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti _received++; if (((TextMessage) message).getText().equals("start")) { - debugLog.info("Starting Batch"); + debugLog.debug("Starting Batch"); _startTime = System.nanoTime(); } else if (((TextMessage) message).getText().equals("end")) @@ -355,8 +362,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti if (_startTime != null) { long currentTime = System.nanoTime(); - sendStatus(currentTime - _startTime, _received); - debugLog.info("End Batch"); + sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); + debugLog.debug("End Batch"); } } } @@ -373,28 +380,31 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * * @param time taken for the the last batch * @param received Total number of messages received. - * + * @param batchNumber the batch number * @throws JMSException if an error occurs during the send */ - private void sendStatus(long time, long received) throws JMSException + private void sendStatus(long time, long received, int batchNumber) throws JMSException { Message updateMessage = _session.createTextMessage("update"); updateMessage.setStringProperty("CLIENT_ID", ":" + _client); updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); updateMessage.setLongProperty("RECEIVED", received); + updateMessage.setIntProperty("BATCH", batchNumber); updateMessage.setLongProperty("DURATION", time); if (debugLog.isInfoEnabled()) { - debugLog.info("**** SENDING [" + received / _batchSize + "]**** " - + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + debugLog.info("**** SENDING [" + batchNumber + "]**** " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); } // Output on the main log.info the details of this batch - if (received / _batchSize % 10 == 0) + if (batchNumber % 10 == 0) { - log.info("Sending Report [" + received / _batchSize + "] " - + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time); + log.info("Sending Report [" + batchNumber + "] " + + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); } _updater.send(updateMessage); @@ -415,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti class SustainedRateAdapter implements MessageListener, Runnable { private SustainedTestClient _client; - private long _messageVariance = 500; //no. messages to allow drifting + private long _batchVariance = 3; //no. batches to allow drifting private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) private volatile long _delay; //in nanos private long _sent; @@ -451,18 +461,23 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti long duration = message.getLongProperty("DURATION"); long totalReceived = message.getLongProperty("RECEIVED"); String client = message.getStringProperty("CLIENT_ID"); + int batchNumber = message.getIntProperty("BATCH"); - if (debugLog.isInfoEnabled()) + if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) { - debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration); + debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + + " Recevied BATCH:" + batchNumber + " DURATION:" + duration); } - recordSlow(client, totalReceived); - - adjustDelay(client, totalReceived, duration); + recordSlow(client, totalReceived, batchNumber); + adjustDelay(client, batchNumber, duration); - if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2) + // Warm up completes when: + // we haven't warmed up + // and the number of batches sent to each client is at least half of the required warmup batches + if (!_warmedup + && (batchNumber >= _warmUpBatches)) { _warmedup = true; _warmup.countDown(); @@ -478,7 +493,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti CountDownLatch _warmup = new CountDownLatch(1); - int _warmUpBatches = 20; + int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); int _numBatches = 10000; @@ -527,12 +542,14 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti testMessage = _client.session[0].createTextMessage("start"); - for (int batch = 0; batch < batchSize; batch++) + for (int batch = 0; batch <= batchSize; batch++) // while (_running) { long start = System.nanoTime(); testMessage.setText("start"); + testMessage.setIntProperty("BATCH", batch); + _client.producer.send(testMessage); _rateAdapter.sentMessage(); @@ -552,9 +569,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti long sendtime = end - start; - debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + if (debugLog.isDebugEnabled()) + { + debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]); + } - if (batch % 10 == 0) + if (batch % LOG_UPATE_INTERVAL == 0) { log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); } @@ -583,23 +603,17 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti return; } - //Slow down if gap between send and received is too large - if (_sent - _totalReceived / delays.size() > _messageVariance) - { - //pause between batches. - debugLog.info("Sleeping to keep sent in check with received"); - log.debug("Increaseing _delay as sending more than receiving"); - _delay += TEN_MILLI_SEC; - } - - //per batch sleep.. if sleep is to small to spread over the batch. - if (_delay <= TEN_MILLI_SEC * _batchSize) - { - sleepLong(_delay); - } - else + if (!SLEEP_PER_MESSAGE) { - debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); + //per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= TEN_MILLI_SEC * _batchSize) + { + sleepLong(_delay); + } + else + { + debugLog.info("Not sleeping _delay > ten*batch is:" + _delay); + } } } @@ -617,10 +631,10 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * Adjust the delay for sending messages based on this update from the client * * @param client The client that send this update - * @param totalReceived The number of messages that this client has received. * @param duration The time taken for the last batch of messagse + * @param batchNumber The reported batchnumber from the client */ - private void adjustDelay(String client, long totalReceived, long duration) + private void adjustDelay(String client, int batchNumber, long duration) { //Retrieve the current total time taken for this client. Long currentTime = delays.get(client); @@ -637,23 +651,28 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti delays.put(client, currentTime); + long batchesSent = _sent / _batchSize; + + // ensure we don't divide by zero + if (batchesSent == 0) + { + batchesSent = 1L; + } _totalReceived += _batchSize; _totalDuration += duration; - // Calculate the number of messages in the batch. - long batchCount = (_totalReceived / _batchSize); - //calculate average duration accross clients per batch - long averageDuration = _totalDuration / delays.size() / batchCount; + long averageDuration = _totalDuration / delays.size() / batchesSent; //calculate the difference between current send delay and average report delay long diff = (duration) - averageDuration; - if (debugLog.isInfoEnabled()) + if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0) { - debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers" - + " on batch: " + batchCount + debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + + " on batch: " + batchesSent + + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: " + averageDuration + " so diff: " + diff + " for : " + client @@ -696,6 +715,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti delayStable(); } + // If we have a consumer that is behind with the batches. + if (batchesSent - batchNumber > _batchVariance) + { + debugLog.debug("Increasing _delay as sending more than receiving"); + + _delay += 2 * TEN_MILLI_SEC; + delayChanged(); + } + + } /** Reset the number of iterations before we say the delay has stabilised. */ @@ -725,10 +754,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * * @param client The client identifier to check * @param received the number of messages received by that client + * @param batchNumber */ - private void recordSlow(String client, long received) + private void recordSlow(String client, long received, int batchNumber) { - if (received < (_sent - _messageVariance)) + if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) { _slowClients.put(client, received); } @@ -761,6 +791,13 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } } } + else + { + if (SLEEP_PER_MESSAGE && (_delay > 0)) + { + sleepLong(_delay / _batchSize); + } + } } @@ -771,16 +808,38 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti */ private boolean checkForSlowClients() { - if (_sent % _batchSize == 0) + // This will allways be true as we are running this at the end of each batchSize +// if (_sent % _batchSize == 0) { // Cause test to pause when we have slow if (!_slowClients.isEmpty() || NO_CLIENTS) { - debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray()); + while (!_slowClients.isEmpty()) { - debugLog.info(_slowClients.size() + " slow clients."); + if (debugLog.isInfoEnabled() + && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0) + { + String clients = ""; + Iterator it = _slowClients.keySet().iterator(); + while (it.hasNext()) + { + clients += it.next(); + if (it.hasNext()) + { + clients += ", "; + } + } + debugLog.info("Pausing for slow clients:" + clients); + } + + + if (log.isDebugEnabled() + && _sent / _batchSize % LOG_UPATE_INTERVAL == 0) + { + log.debug(_slowClients.size() + " slow clients."); + } sleep(PAUSE_SLEEP); } @@ -794,7 +853,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti } else { - debugLog.info("Delay:" + _delay); + if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0) + { + log.info("Total Delay :" + _delay + " " + + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")")); + } } } @@ -825,7 +888,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti * Perform the sleep , swallowing any InteruptException. * * NOTE: If a sleep request is > 10s then reset only sleep for 5s - * + * * @param milli to sleep for * @param nano sub miliseconds to sleep for */ diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java index b437e165b4..0075e45a8c 100644 --- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java @@ -113,6 +113,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i setPropertiesOnMessage(assignSender, testProperties); assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignSender.setStringProperty("ROLE", "SENDER"); + assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER"); senderConversation.send(senderControlTopic, assignSender); @@ -170,6 +171,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i setPropertiesOnMessage(assignReceiver, _testProperties); assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE"); assignReceiver.setStringProperty("ROLE", "RECEIVER"); + assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName); receiverConversation.send(receiverControlTopic, assignReceiver); diff --git a/qpid/java/integrationtests/src/resources/sustained-log4j.xml b/qpid/java/integrationtests/src/resources/sustained-log4j.xml new file mode 100644 index 0000000000..c5ab3137bf --- /dev/null +++ b/qpid/java/integrationtests/src/resources/sustained-log4j.xml @@ -0,0 +1,69 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + + + <appender name="FileAppender" class="org.apache.log4j.FileAppender"> + <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/> + <param name="Append" value="false"/> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + </layout> + </appender> + + <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p (%F:%L) - %m%n"/> + <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/--> + </layout> + </appender> + + <category name="SustainedTest"> + <priority value="${sustained.level}"/> + </category> + + <category name="org.apache"> + <priority value="warn"/> + </category> + + <category name="org.apache.qpid.interop"> + <priority value="${interop.logging.level}"/> + </category> + + + <category name="org.apache.qpid.sustained"> + <priority value="${amqj.logging.level}"/> + </category> + + <!--category name="org.apache.qpid.server.txn"> + <priority value="debug"/> + </category>--> + + <root> + <priority value="all"/> + <appender-ref ref="STDOUT"/> + <!--appender-ref ref="ArchivingFileAppender"/--> + </root> +</log4j:configuration> diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml index 2e1a792c49..cf39e43ecc 100644 --- a/qpid/java/pom.xml +++ b/qpid/java/pom.xml @@ -391,6 +391,29 @@ under the License. <version>0.5</version> </plugin> + <plugin> + <artifactId>maven-remote-resources-plugin</artifactId> + <version>1.0-alpha-5</version> + <executions> + <execution> + <goals> + <goal>process</goal> + </goals> + <configuration> + <resourceBundles> + <resourceBundle>org.apache:apache-incubator-disclaimer-resource-bundle:1.1</resourceBundle> + <resourceBundle>org.apache:apache-jar-resource-bundle:1.2</resourceBundle> + </resourceBundles> + <properties> + <addLicense>true</addLicense> + <projectName>Apache Qpid</projectName> + </properties> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> </pluginManagement> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java index 05fbceca20..048fcfb0b3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java @@ -25,10 +25,11 @@ import junit.framework.TestCase; import org.apache.log4j.NDC;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,8 +42,11 @@ import javax.naming.Context; import javax.naming.InitialContext;
import javax.naming.NamingException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
/**
* ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS
@@ -58,6 +62,10 @@ import java.util.List; * connected.
* <tr><td> Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is
* connected.
+ * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when a consumer is
+ * disconnected.
+ * <tr><dt> Check that an immediate message results in no consumers code, in a transaction, when a consumer is
+ * disconnected.
* </table>
*
* @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties,
@@ -73,63 +81,215 @@ public class ImmediateMessageTest extends TestCase private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class);
/** Used to read the tests configurable properties through. */
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+ ParsedProperties testProps;
- /** All these tests should have the immediate flag on. */
- private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+ /** Used to create unique destination names for each test.
+ * @todo Move into the test framework.
+ */
+ private static AtomicLong uniqueDestsId = new AtomicLong();
/** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_517_ImmediateOkNoTx() throws Exception
+ public void test_QPID_517_ImmediateOkNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_517_ImmediateOkTx() throws Exception
+ public void test_QPID_517_ImmediateOkTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
/** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception
+ public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
{
NDC.push(getName());
+ testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the immediate flag on. */
+ testProps.setProperty(IMMEDIATE_PROPNAME, true);
+
+ /** Bind the receivers consumer by default. */
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true);
+
// Ensure that the in-vm broker is created.
TransportConnection.createVMBroker(1);
}
@@ -220,10 +380,64 @@ public class ImmediateMessageTest extends TestCase return false;
}
+ /**
+ * Reports the number of exceptions held by this monitor.
+ *
+ * @return The number of exceptions held by this monitor.
+ */
+ public int size()
+ {
+ return exceptions.size();
+ }
+
public void reset()
{
exceptions = new ArrayList();
}
+
+ /**
+ * Provides a dump of the stack traces of all exceptions that this exception monitor was notified of. Mainly
+ * use for debugging/test failure reporting purposes.
+ *
+ * @return A string containing a dump of the stack traces of all exceptions.
+ */
+ public String toString()
+ {
+ String result = "ExceptionMonitor: holds " + exceptions.size() + " exceptions.\n\n";
+
+ for (JMSException ex : exceptions)
+ {
+ result += getStackTrace(ex) + "\n";
+ }
+
+ return result;
+ }
+
+ /**
+ * Prints an exception stack trace into a string.
+ *
+ * @param t The throwable to get the stack trace from.
+ *
+ * @return A string containing the throwables stack trace.
+ */
+ public static String getStackTrace(Throwable t)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw, true);
+ t.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+
+ return sw.toString();
+ }
+ }
+
+ public static class MessageMonitor implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message): called");
+ }
}
/**
@@ -290,22 +504,30 @@ public class ImmediateMessageTest extends TestCase {
try
{
- int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
- boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
- String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME);
- String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME);
+ // Get a unique offset to append to destination names to make them unique to the connection.
+ long uniqueId = uniqueDestsId.incrementAndGet();
+
+ // Extract the standard test configuration parameters relevant to the connection.
+ String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId;
+ String destinationReceiveRoot =
+ messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId;
boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME);
boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME);
boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME);
boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME);
+
+ // Check which JMS flags and options are to be set.
+ int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ boolean durableSubscription = messagingProps.getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME);
// Check if any Qpid/AMQP specific flags or options need to be set.
boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
boolean needsQpidOptions = immediate | mandatory;
- log.debug("ackMode = " + ackMode);
+ /*log.debug("ackMode = " + ackMode);
log.debug("useTopics = " + useTopics);
log.debug("destinationSendRoot = " + destinationSendRoot);
log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
@@ -316,7 +538,7 @@ public class ImmediateMessageTest extends TestCase log.debug("transactional = " + transactional);
log.debug("immediate = " + immediate);
log.debug("mandatory = " + mandatory);
- log.debug("needsQpidOptions = " + needsQpidOptions);
+ log.debug("needsQpidOptions = " + needsQpidOptions);*/
// Create connection, sessions and producer/consumer pairs on each session.
Connection connection = createConnection(messagingProps);
@@ -329,7 +551,7 @@ public class ImmediateMessageTest extends TestCase Session receiverSession = connection.createSession(transactional, ackMode);
Destination publisherProducerDestination =
- useTopics ? publisherSession.createTopic(destinationSendRoot)
+ useTopics ? (Destination) publisherSession.createTopic(destinationSendRoot)
: publisherSession.createQueue(destinationSendRoot);
MessageProducer publisherProducer =
@@ -342,13 +564,29 @@ public class ImmediateMessageTest extends TestCase createPublisherConsumer
? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+ if (publisherConsumer != null)
+ {
+ publisherConsumer.setMessageListener(new MessageMonitor());
+ }
+
MessageProducer receiverProducer =
createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
: null;
+ Destination receiverConsumerDestination =
+ useTopics ? (Destination) receiverSession.createTopic(destinationSendRoot)
+ : receiverSession.createQueue(destinationSendRoot);
+
MessageConsumer receiverConsumer =
- createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
- : null;
+ createReceiverConsumer
+ ? ((durableSubscription && useTopics)
+ ? receiverSession.createDurableSubscriber((Topic) receiverConsumerDestination, "testsub")
+ : receiverSession.createConsumer(receiverConsumerDestination)) : null;
+
+ if (receiverConsumer != null)
+ {
+ receiverConsumer.setMessageListener(new MessageMonitor());
+ }
// Start listening for incoming messages.
connection.start();
@@ -372,7 +610,8 @@ public class ImmediateMessageTest extends TestCase public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
{
- return client.getSession().createMessage();
+ return client.getSession().createTextMessage("Hello");
+ // return client.getSession().createMessage();
}
/**
@@ -428,12 +667,12 @@ public class ImmediateMessageTest extends TestCase public MessageProducer getProducer()
{
- return null;
+ return producer;
}
public MessageConsumer getConsumer()
{
- return null;
+ return consumer;
}
public void send(Message message) throws JMSException
@@ -524,6 +763,10 @@ public class ImmediateMessageTest extends TestCase public ExceptionMonitor getExceptionMonitor();
+ public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */);
+
+ public void testNoExceptions(ParsedProperties testProps);
+
public void close();
}
@@ -603,62 +846,63 @@ public class ImmediateMessageTest extends TestCase }
}
- public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
+ public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
{
- PublisherReceiver testClients;
-
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- testClients = createPublisherReceiverPairSharedConnection(testProps);
- testClients.start();
-
- testClients.send(testProps, 1);
-
+ start();
+ send(testProps, 1);
pause(1000L);
String errors = "";
- if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass))
+ ExceptionMonitor connectionExceptionMonitor = getConnectionExceptionMonitor();
+ if (!connectionExceptionMonitor.assertOneJMSExceptionWithLinkedCause(aClass))
{
- errors += "Was expecting linked exception type " + aClass.getName() + ".\n";
+ errors += "Was expecting linked exception type " + aClass.getName() + " on the connection.\n";
+ errors +=
+ (connectionExceptionMonitor.size() > 0)
+ ? ("Actually got the following exceptions on the connection, " + connectionExceptionMonitor)
+ : "Got no exceptions on the connection.";
}
// Clean up the publisher/receiver client pair.
- testClients.close();
+ close();
assertEquals(errors, "", errors);
}
/**
*/
- public static void testNoExceptions(ParsedProperties testProps)
+ public void testNoExceptions(ParsedProperties testProps)
{
- PublisherReceiver testClients;
-
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- testClients = createPublisherReceiverPairSharedConnection(testProps);
- testClients.start();
-
- testClients.send(testProps, 1);
-
+ start();
+ send(testProps, 1);
pause(1000L);
String errors = "";
- if (!testClients.getConnectionExceptionMonitor().assertNoExceptions())
+ if (!getConnectionExceptionMonitor().assertNoExceptions())
{
- errors += "There were connection exceptions.\n";
+ errors += "Was expecting no exceptions.\n";
+ errors += "Got the following exceptions on the connection, " + getConnectionExceptionMonitor();
}
- if (!testClients.getExceptionMonitor().assertNoExceptions())
+ if (!getExceptionMonitor().assertNoExceptions())
{
- errors += "There were exceptions on producer.\n";
+ errors += "Was expecting no exceptions.\n";
+ errors += "Got the following exceptions on the producer, " + getExceptionMonitor();
}
// Clean up the publisher/receiver client pair.
- testClients.close();
+ close();
assertEquals(errors, "", errors);
}
+
+ public static PublisherReceiver connectClients(ParsedProperties testProps)
+ {
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ return createPublisherReceiverPairSharedConnection(testProps);
+ }
}
/**
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java index f41acca11b..09a32aa3eb 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java @@ -49,6 +49,10 @@ import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; * connected.
* <tr><td> Check that a mandatory message results in no route code, upon transaction commit, when a consumer is
* connected.
+ * <tr><td> Check that a mandatory message is sent succesfully, not using transactions, when a consumer is
+ * disconnected but the route exists.
+ * <tr><dt> Check that a mandatory message is send successfully, in a transactions, when a consumer is
+ * disconnected but when the route exists.
* </table>
*/
public class MandatoryMessageTest extends TestCase
@@ -57,64 +61,234 @@ public class MandatoryMessageTest extends TestCase private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class);
/** Used to read the tests configurable properties through. */
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+ ParsedProperties testProps;
- /** All these tests should have the mandatory flag on. */
- // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
- private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true);
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
/** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_508_MandatoryOkNoTx() throws Exception
+ public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_508_MandatoryOkTx() throws Exception
+ public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
// Send one message with no errors.
- ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception
+ public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
{
NDC.push(getName());
+ testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the mandatory flag on. */
+ testProps.setProperty(MANDATORY_PROPNAME, true);
+
+ /** Bind the receivers consumer by default. */
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true);
+
// Ensure that the in-vm broker is created.
TransportConnection.createVMBroker(1);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java index 9c8cefc492..b584c8c80b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java @@ -1,3 +1,23 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import org.apache.qpid.jms.Session;
@@ -167,6 +187,12 @@ public class MessagingTestConfigProperties /** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. */
+ public static final String DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription";
+
+ /** Defines the default value of the durable subscriptions flag. */
+ public static final boolean DURABLE_SUBSCRIPTION_DEFAULT = false;
+
// ====================== Qpid Options and Flags ================================
/** Holds the name of the property to set the exclusive flag from. */
@@ -272,6 +298,7 @@ public class MessagingTestConfigProperties defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);
|