summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/etc/md5passwd2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java94
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java71
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java139
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java9
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java3
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java175
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java2
-rw-r--r--qpid/java/integrationtests/src/resources/sustained-log4j.xml69
-rw-r--r--qpid/java/pom.xml23
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java344
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java198
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java27
17 files changed, 914 insertions, 285 deletions
diff --git a/qpid/java/broker/etc/md5passwd b/qpid/java/broker/etc/md5passwd
index 7984b742e0..6a149919de 100644
--- a/qpid/java/broker/etc/md5passwd
+++ b/qpid/java/broker/etc/md5passwd
@@ -16,6 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-guest:qfgyy4ewnVMBg
+guest:CE4DQ6BIb/BVMN9scFyLtA==
admin:ISMvKXpXpadDiUoOSoAfww==
user:aBzonUodYLhwSa8s9A10sA==
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 43a04dbfa1..28a9e85489 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -20,18 +20,9 @@
*/
package org.apache.qpid.server;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -52,6 +43,16 @@ import org.apache.qpid.server.queue.Subscription;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
public class AMQChannel
{
public static final int DEFAULT_PREFETCH = 5000;
@@ -208,7 +209,8 @@ public class AMQChannel
_currentMessage.setPublisher(publisher);
}
- public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException
+ public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+ throws AMQException
{
if (_currentMessage == null)
{
@@ -230,6 +232,7 @@ public class AMQChannel
// check and deliver if header says body length is zero
if (contentHeaderBody.bodySize == 0)
{
+ _txnContext.messageProcessed(protocolSession);
_currentMessage = null;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 01242f90de..0dcceaddbb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -169,7 +169,7 @@ public class DestNameExchange extends AbstractExchange
if (queues == null || queues.isEmpty())
{
String msg = "Routing key " + routingKey + " is not known to " + this;
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index 222e341b1a..f6a95b5e55 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.server.exchange;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -41,24 +46,21 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
public class DestWildExchange extends AbstractExchange
{
private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
- private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
- // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+ new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final String TOPIC_SEPARATOR = ".";
private static final String AMQP_STAR = "*";
private static final String AMQP_HASH = "#";
@@ -90,7 +92,7 @@ public class DestWildExchange extends AbstractExchange
queueList.add(q.getName().toString());
}
- Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
+ Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -118,7 +120,6 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
-
public AMQShortString getType()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -140,6 +141,7 @@ public class DestWildExchange extends AbstractExchange
{
queueList = _routingKey2queues.get(routingKey);
}
+
if (!queueList.contains(queue))
{
queueList.add(queue);
@@ -165,8 +167,8 @@ public class DestWildExchange extends AbstractExchange
for (int index = 0; index < size; index++)
{
- //if there are more levels
- if (index + 1 < size)
+ // if there are more levels
+ if ((index + 1) < size)
{
if (_subscription.get(index).equals(AMQP_HASH))
{
@@ -175,7 +177,7 @@ public class DestWildExchange extends AbstractExchange
// we don't need #.# delete this one
_subscription.remove(index);
size--;
- //redo this normalisation
+ // redo this normalisation
index--;
}
@@ -186,7 +188,7 @@ public class DestWildExchange extends AbstractExchange
_subscription.add(index + 1, _subscription.remove(index));
}
}
- }//if we have more levels
+ } // if we have more levels
}
StringBuilder sb = new StringBuilder();
@@ -211,9 +213,9 @@ public class DestWildExchange extends AbstractExchange
List<AMQQueue> queues = getMatchedQueues(routingKey);
// if we have no registered queues we have nothing to do
// TODO: add support for the immediate flag
- if (queues == null || queues.size() == 0)
+ if ((queues == null) || queues.isEmpty())
{
- if (info.isMandatory())
+ if (info.isMandatory() || info.isImmediate())
{
String msg = "Topic " + routingKey + " is not known to " + this;
throw new NoRouteException(msg, payload, null);
@@ -222,6 +224,7 @@ public class DestWildExchange extends AbstractExchange
{
_logger.warn("No queues found for routing key " + routingKey);
_logger.warn("Routing map contains: " + _routingKey2queues);
+
return;
}
}
@@ -238,14 +241,15 @@ public class DestWildExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && queues.contains(queue);
- }
+ return (queues != null) && queues.contains(queue);
+ }
public boolean isBound(AMQShortString routingKey) throws AMQException
{
List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
- return queues != null && !queues.isEmpty();
+
+ return (queues != null) && !queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
@@ -257,6 +261,7 @@ public class DestWildExchange extends AbstractExchange
return true;
}
}
+
return false;
}
@@ -279,12 +284,14 @@ public class DestWildExchange extends AbstractExchange
" with routing key " + routingKey + ". No queue was registered with that routing key", null);
}
+
boolean removedQ = queues.remove(queue);
if (!removedQ)
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey, null);
}
+
if (queues.isEmpty())
{
_routingKey2queues.remove(routingKey);
@@ -304,7 +311,6 @@ public class DestWildExchange extends AbstractExchange
}
}
-
private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
{
List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -334,7 +340,6 @@ public class DestWildExchange extends AbstractExchange
queueList.add(queTok.nextToken());
}
-
int depth = 0;
boolean matching = true;
boolean done = false;
@@ -343,25 +348,26 @@ public class DestWildExchange extends AbstractExchange
while (matching && !done)
{
- if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+ if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
{
done = true;
// if it was the routing key that ran out of digits
- if (routingkeyList.size() == depth + routingskip)
+ if (routingkeyList.size() == (depth + routingskip))
{
if (queueList.size() > (depth + queueskip))
- { // a hash and it is the last entry
- matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+ { // a hash and it is the last entry
+ matching =
+ queueList.get(depth + queueskip).equals(AMQP_HASH)
+ && (queueList.size() == (depth + queueskip + 1));
}
}
- else if (routingkeyList.size() > depth + routingskip)
+ else if (routingkeyList.size() > (depth + routingskip))
{
// There is still more routing key to check
matching = false;
}
-
continue;
}
@@ -377,27 +383,33 @@ public class DestWildExchange extends AbstractExchange
else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
{
// Is this a # at the end
- if (queueList.size() == depth + queueskip + 1)
+ if (queueList.size() == (depth + queueskip + 1))
{
done = true;
+
continue;
}
// otherwise # in the middle
- while (routingkeyList.size() > depth + routingskip)
+ while (routingkeyList.size() > (depth + routingskip))
{
if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
{
queueskip++;
depth++;
+
break;
}
+
routingskip++;
}
+
continue;
}
+
matching = false;
}
+
depth++;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 6148fd4e1c..bf00eeb9d3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -1,27 +1,36 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
-
package org.apache.qpid.server.exchange;
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -36,16 +45,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
public class FanoutExchange extends AbstractExchange
{
@@ -63,7 +63,7 @@ public class FanoutExchange extends AbstractExchange
private final class FanoutExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
- public FanoutExchangeMBean() throws JMException
+ public FanoutExchangeMBean() throws JMException
{
super();
_exchangeType = "fanout";
@@ -79,9 +79,7 @@ public class FanoutExchange extends AbstractExchange
{
String queueName = queue.getName().toString();
-
-
- Object[] bindingItemValues = {queueName, new String[] {queueName}};
+ Object[] bindingItemValues = { queueName, new String[] { queueName } };
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingData);
}
@@ -98,7 +96,7 @@ public class FanoutExchange extends AbstractExchange
}
try
- {
+ {
queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
}
catch (AMQException ex)
@@ -107,8 +105,7 @@ public class FanoutExchange extends AbstractExchange
}
}
- }// End of MBean class
-
+ } // End of MBean class
protected ExchangeMBean createMBean() throws AMQException
{
@@ -147,7 +144,6 @@ public class FanoutExchange extends AbstractExchange
{
assert queue != null;
-
if (!_queues.remove(queue))
{
throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
@@ -159,10 +155,10 @@ public class FanoutExchange extends AbstractExchange
{
final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
final AMQShortString routingKey = publishInfo.getRoutingKey();
- if (_queues == null || _queues.isEmpty())
+ if ((_queues == null) || _queues.isEmpty())
{
String msg = "No queues bound to " + this;
- if (publishInfo.isMandatory())
+ if (publishInfo.isMandatory() || publishInfo.isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
@@ -193,13 +189,12 @@ public class FanoutExchange extends AbstractExchange
public boolean isBound(AMQShortString routingKey) throws AMQException
{
- return _queues != null && !_queues.isEmpty();
+ return (_queues != null) && !_queues.isEmpty();
}
public boolean isBound(AMQQueue queue) throws AMQException
{
-
return _queues.contains(queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 8205924207..e86094e26f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -229,7 +229,7 @@ public class HeadersExchange extends AbstractExchange
String msg = "Exchange " + getName() + ": message not routable.";
- if (payload.getMessagePublishInfo().isMandatory())
+ if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate())
{
throw new NoRouteException(msg, payload, null);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 82e969b496..c9f5e42286 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,24 +20,13 @@
*/
package org.apache.qpid.server.protocol;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.security.Principal;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
+
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -46,22 +35,34 @@ import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public class AMQMinaProtocolSession implements AMQProtocolSession,
- Managable
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
@@ -111,25 +112,20 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
-
public ManagedObject getManagedObject()
{
return _managedObject;
}
-
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
+ throws AMQException
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_minaProtocolSession = session;
session.setAttachment(this);
-
_codecFactory = codecFactory;
-
try
{
IoServiceConfig config = session.getServiceConfig();
@@ -140,16 +136,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
catch (RuntimeException e)
{
e.printStackTrace();
- // throw e;
+ // throw e;
}
-// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+ // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
- AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
+ AMQStateManager stateManager) throws AMQException
{
_stateManager = stateManager;
_minaProtocolSession = session;
@@ -182,8 +177,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
- public void dataBlockReceived(AMQDataBlock message)
- throws Exception
+ public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
if (message instanceof ProtocolInitiation)
@@ -203,8 +197,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- private void frameReceived(AMQFrame frame)
- throws AMQException
+ private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
AMQBody body = frame.getBodyFrame();
@@ -252,13 +245,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
String locales = "en_US";
// Interfacing with generated code - be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- locales.getBytes(), // locales
- mechanisms.getBytes(), // mechanisms
- null, // serverProperties
- (short) getProtocolMajorVersion(), // versionMajor
- (short) getProtocolMinorVersion()); // versionMinor
+ AMQFrame response =
+ ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short) getProtocolMajorVersion(), // versionMajor
+ (short) getProtocolMinorVersion()); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -269,21 +262,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// TODO: Close connection (but how to wait until message is sent?)
// ritchiem 2006-12-04 will this not do?
-// WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
-// future.join();
-// close connection
+ // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+ // future.join();
+ // close connection
}
}
-
private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
{
- final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
- methodBody);
+ final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
- //Check that this channel is not closing
+ // Check that this channel is not closing
if (channelAwaitingClosure(channelId))
{
if ((evt.getMethod() instanceof ChannelCloseOkBody))
@@ -299,11 +290,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Channel[" + channelId + "] awaiting closure ignoring");
}
+
return;
}
}
-
try
{
try
@@ -315,10 +306,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
for (AMQMethodListener listener : _frameListeners)
{
- wasAnyoneInterested = listener.methodReceived(evt) ||
- wasAnyoneInterested;
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
}
}
+
if (!wasAnyoneInterested)
{
throw new AMQNoMethodHandlerException(evt, null);
@@ -332,6 +323,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Closing channel due to: " + e.getMessage());
}
+
writeFrame(e.getCloseFrame(channelId));
closeChannel(channelId);
}
@@ -341,14 +333,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
}
+
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
- AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
- AMQConstant.CHANNEL_ERROR.getName().toString());
+ AMQConnectionException ce =
+ evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(ce.getCloseFrame(channelId));
@@ -360,6 +355,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.info("Closing connection due to: " + e.getMessage());
}
+
closeSession();
_stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(channelId));
@@ -372,17 +368,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
listener.error(e);
}
+
_minaProtocolSession.close();
}
}
-
private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
AMQChannel channel = getAndAssertChannel(channelId);
- channel.publishContentHeader(body);
+ channel.publishContentHeader(body, this);
}
@@ -427,15 +423,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
}
+
return channel;
}
public AMQChannel getChannel(int channelId) throws AMQException
{
- final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId)
- ? _cachedChannels[channelId]
- : _channelMap.get(channelId);
- if (channel == null || channel.isClosing())
+ final AMQChannel channel =
+ ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+ if ((channel == null) || channel.isClosing())
{
return null;
}
@@ -466,8 +462,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
if (_channelMap.size() == _maxNoOfChannels)
{
- String errorMessage = toString() + ": maximum number of channels has been reached (" +
- _maxNoOfChannels + "); can't create channel";
+ String errorMessage =
+ toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+ + "); can't create channel";
_logger.error(errorMessage);
throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
}
@@ -480,6 +477,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_cachedChannels[channelId] = channel;
}
+
checkForNotification();
}
@@ -504,7 +502,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void commitTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.commit();
}
@@ -512,7 +510,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public void rollbackTransactions(AMQChannel channel) throws AMQException
{
- if (channel != null && channel.isTransactional())
+ if ((channel != null) && channel.isTransactional())
{
channel.rollback();
}
@@ -597,6 +595,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
channel.close(this);
}
+
_channelMap.clear();
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
@@ -615,6 +614,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_managedObject.unregister();
}
+
for (Task task : _taskList)
{
task.doTask(this);
@@ -687,6 +687,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
}
+
if (_clientProperties.getString(ClientProperties.version.toString()) != null)
{
_clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
@@ -715,7 +716,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public boolean isProtocolVersion(byte major, byte minor)
{
- return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
+ return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
}
public VersionSpecificRegistry getRegistry()
@@ -723,13 +724,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _registry;
}
-
public Object getClientIdentifier()
{
return _minaProtocolSession.getRemoteAddress();
}
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -769,6 +768,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public String getClientVersion()
{
- return _clientVersion == null ? null : _clientVersion.toString();
+ return (_clientVersion == null) ? null : _clientVersion.toString();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a803ef1227..6273ac997b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -49,6 +49,16 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import javax.management.JMException;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
* fully in RFC 006.
@@ -607,7 +617,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
delete();
}
- public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+ /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
{
// fixme not sure what this is doing. should we be passing deliverFirst through here?
// This code is not used so when it is perhaps it should
@@ -623,7 +633,7 @@ public class AMQQueue implements Managable, Comparable, StorableQueue
// from the queue:
dequeue(storeContext, msg);
}
- }
+ }*/
// public DeliveryManager getDeliveryManager()
// {
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 50fbd47fe9..d59412fdba 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -337,7 +337,14 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
if ((message == null) || message.equals(""))
{
- message = "Unable to Connect";
+ if (message == null)
+ {
+ message = "Unable to Connect";
+ }
+ else // can only be "" if getMessage() returned it therfore lastException != null
+ {
+ message = "Unable to Connect:" + lastException.getClass();
+ }
}
AMQException e = new AMQConnectionFailureException(message, null);
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index a82b05e20f..a904bfa419 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -252,7 +252,8 @@ public class TestClient implements MessageListener
public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
- + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+ + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+ + ", String virtualHost = " + virtualHost + " ): called");
try
{
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
index 1597da6dba..6f2089290a 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
@@ -37,6 +37,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -91,7 +92,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
private static final long TEN_MILLI_SEC = 10000000;
- private static final long FIVE_MILLI_SEC = 5000000;
+ private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+ private static final int LOG_UPATE_INTERVAL = 10;
+ private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
/**
* Should provide the name of the test case that this class implements. The exact names are defined in the interop
@@ -129,6 +132,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+ String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
if (debugLog.isDebugEnabled())
{
@@ -150,7 +154,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
session = new Session[1];
connection[0] =
- org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+ org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ clientName,
+ org.apache.qpid.interop.testclient.TestClient.brokerUrl,
org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[0] = connection[0].createSession(false, ackMode);
@@ -182,6 +188,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
{
connection[i] =
org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+ clientName,
org.apache.qpid.interop.testclient.TestClient.brokerUrl,
org.apache.qpid.interop.testclient.TestClient.virtualHost);
session[i] = connection[i].createSession(false, ackMode);
@@ -192,7 +199,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
MessageConsumer consumer = session[i].createConsumer(sendDestination);
- consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination));
+ consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination));
}
break;
@@ -347,7 +354,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
_received++;
if (((TextMessage) message).getText().equals("start"))
{
- debugLog.info("Starting Batch");
+ debugLog.debug("Starting Batch");
_startTime = System.nanoTime();
}
else if (((TextMessage) message).getText().equals("end"))
@@ -355,8 +362,8 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
if (_startTime != null)
{
long currentTime = System.nanoTime();
- sendStatus(currentTime - _startTime, _received);
- debugLog.info("End Batch");
+ sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
+ debugLog.debug("End Batch");
}
}
}
@@ -373,28 +380,31 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*
* @param time taken for the the last batch
* @param received Total number of messages received.
- *
+ * @param batchNumber the batch number
* @throws JMSException if an error occurs during the send
*/
- private void sendStatus(long time, long received) throws JMSException
+ private void sendStatus(long time, long received, int batchNumber) throws JMSException
{
Message updateMessage = _session.createTextMessage("update");
updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
updateMessage.setLongProperty("RECEIVED", received);
+ updateMessage.setIntProperty("BATCH", batchNumber);
updateMessage.setLongProperty("DURATION", time);
if (debugLog.isInfoEnabled())
{
- debugLog.info("**** SENDING [" + received / _batchSize + "]**** "
- + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ debugLog.info("**** SENDING [" + batchNumber + "]**** "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
}
// Output on the main log.info the details of this batch
- if (received / _batchSize % 10 == 0)
+ if (batchNumber % 10 == 0)
{
- log.info("Sending Report [" + received / _batchSize + "] "
- + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ log.info("Sending Report [" + batchNumber + "] "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received
+ + " BATCH:" + batchNumber + " DURATION:" + time);
}
_updater.send(updateMessage);
@@ -415,7 +425,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
class SustainedRateAdapter implements MessageListener, Runnable
{
private SustainedTestClient _client;
- private long _messageVariance = 500; //no. messages to allow drifting
+ private long _batchVariance = 3; //no. batches to allow drifting
private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
private volatile long _delay; //in nanos
private long _sent;
@@ -451,18 +461,23 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
long duration = message.getLongProperty("DURATION");
long totalReceived = message.getLongProperty("RECEIVED");
String client = message.getStringProperty("CLIENT_ID");
+ int batchNumber = message.getIntProperty("BATCH");
- if (debugLog.isInfoEnabled())
+ if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
{
- debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration);
+ debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived
+ + " Recevied BATCH:" + batchNumber + " DURATION:" + duration);
}
- recordSlow(client, totalReceived);
-
- adjustDelay(client, totalReceived, duration);
+ recordSlow(client, totalReceived, batchNumber);
+ adjustDelay(client, batchNumber, duration);
- if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2)
+ // Warm up completes when:
+ // we haven't warmed up
+ // and the number of batches sent to each client is at least half of the required warmup batches
+ if (!_warmedup
+ && (batchNumber >= _warmUpBatches))
{
_warmedup = true;
_warmup.countDown();
@@ -478,7 +493,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
CountDownLatch _warmup = new CountDownLatch(1);
- int _warmUpBatches = 20;
+ int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
int _numBatches = 10000;
@@ -527,12 +542,14 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
testMessage = _client.session[0].createTextMessage("start");
- for (int batch = 0; batch < batchSize; batch++)
+ for (int batch = 0; batch <= batchSize; batch++)
// while (_running)
{
long start = System.nanoTime();
testMessage.setText("start");
+ testMessage.setIntProperty("BATCH", batch);
+
_client.producer.send(testMessage);
_rateAdapter.sentMessage();
@@ -552,9 +569,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
long sendtime = end - start;
- debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+ if (debugLog.isDebugEnabled())
+ {
+ debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+ }
- if (batch % 10 == 0)
+ if (batch % LOG_UPATE_INTERVAL == 0)
{
log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
}
@@ -583,23 +603,17 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
return;
}
- //Slow down if gap between send and received is too large
- if (_sent - _totalReceived / delays.size() > _messageVariance)
- {
- //pause between batches.
- debugLog.info("Sleeping to keep sent in check with received");
- log.debug("Increaseing _delay as sending more than receiving");
- _delay += TEN_MILLI_SEC;
- }
-
- //per batch sleep.. if sleep is to small to spread over the batch.
- if (_delay <= TEN_MILLI_SEC * _batchSize)
- {
- sleepLong(_delay);
- }
- else
+ if (!SLEEP_PER_MESSAGE)
{
- debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+ //per batch sleep.. if sleep is to small to spread over the batch.
+ if (_delay <= TEN_MILLI_SEC * _batchSize)
+ {
+ sleepLong(_delay);
+ }
+ else
+ {
+ debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+ }
}
}
@@ -617,10 +631,10 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
* Adjust the delay for sending messages based on this update from the client
*
* @param client The client that send this update
- * @param totalReceived The number of messages that this client has received.
* @param duration The time taken for the last batch of messagse
+ * @param batchNumber The reported batchnumber from the client
*/
- private void adjustDelay(String client, long totalReceived, long duration)
+ private void adjustDelay(String client, int batchNumber, long duration)
{
//Retrieve the current total time taken for this client.
Long currentTime = delays.get(client);
@@ -637,23 +651,28 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
delays.put(client, currentTime);
+ long batchesSent = _sent / _batchSize;
+
+ // ensure we don't divide by zero
+ if (batchesSent == 0)
+ {
+ batchesSent = 1L;
+ }
_totalReceived += _batchSize;
_totalDuration += duration;
- // Calculate the number of messages in the batch.
- long batchCount = (_totalReceived / _batchSize);
-
//calculate average duration accross clients per batch
- long averageDuration = _totalDuration / delays.size() / batchCount;
+ long averageDuration = _totalDuration / delays.size() / batchesSent;
//calculate the difference between current send delay and average report delay
long diff = (duration) - averageDuration;
- if (debugLog.isInfoEnabled())
+ if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
{
- debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers"
- + " on batch: " + batchCount
+ debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers."
+ + " on batch: " + batchesSent
+ + " received batch: " + batchNumber
+ " Batch Duration: " + duration
+ " Average: " + averageDuration
+ " so diff: " + diff + " for : " + client
@@ -696,6 +715,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
delayStable();
}
+ // If we have a consumer that is behind with the batches.
+ if (batchesSent - batchNumber > _batchVariance)
+ {
+ debugLog.debug("Increasing _delay as sending more than receiving");
+
+ _delay += 2 * TEN_MILLI_SEC;
+ delayChanged();
+ }
+
+
}
/** Reset the number of iterations before we say the delay has stabilised. */
@@ -725,10 +754,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*
* @param client The client identifier to check
* @param received the number of messages received by that client
+ * @param batchNumber
*/
- private void recordSlow(String client, long received)
+ private void recordSlow(String client, long received, int batchNumber)
{
- if (received < (_sent - _messageVariance))
+ if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
{
_slowClients.put(client, received);
}
@@ -761,6 +791,13 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
}
+ else
+ {
+ if (SLEEP_PER_MESSAGE && (_delay > 0))
+ {
+ sleepLong(_delay / _batchSize);
+ }
+ }
}
@@ -771,16 +808,38 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
private boolean checkForSlowClients()
{
- if (_sent % _batchSize == 0)
+ // This will allways be true as we are running this at the end of each batchSize
+// if (_sent % _batchSize == 0)
{
// Cause test to pause when we have slow
if (!_slowClients.isEmpty() || NO_CLIENTS)
{
- debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray());
+
while (!_slowClients.isEmpty())
{
- debugLog.info(_slowClients.size() + " slow clients.");
+ if (debugLog.isInfoEnabled()
+ && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0)
+ {
+ String clients = "";
+ Iterator it = _slowClients.keySet().iterator();
+ while (it.hasNext())
+ {
+ clients += it.next();
+ if (it.hasNext())
+ {
+ clients += ", ";
+ }
+ }
+ debugLog.info("Pausing for slow clients:" + clients);
+ }
+
+
+ if (log.isDebugEnabled()
+ && _sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+ {
+ log.debug(_slowClients.size() + " slow clients.");
+ }
sleep(PAUSE_SLEEP);
}
@@ -794,7 +853,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
else
{
- debugLog.info("Delay:" + _delay);
+ if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+ {
+ log.info("Total Delay :" + _delay + " "
+ + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")"));
+ }
}
}
@@ -825,7 +888,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
* Perform the sleep , swallowing any InteruptException.
*
* NOTE: If a sleep request is > 10s then reset only sleep for 5s
- *
+ *
* @param milli to sleep for
* @param nano sub miliseconds to sleep for
*/
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
index b437e165b4..0075e45a8c 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
@@ -113,6 +113,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i
setPropertiesOnMessage(assignSender, testProperties);
assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignSender.setStringProperty("ROLE", "SENDER");
+ assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
senderConversation.send(senderControlTopic, assignSender);
@@ -170,6 +171,7 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i
setPropertiesOnMessage(assignReceiver, _testProperties);
assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
assignReceiver.setStringProperty("ROLE", "RECEIVER");
+ assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName);
receiverConversation.send(receiverControlTopic, assignReceiver);
diff --git a/qpid/java/integrationtests/src/resources/sustained-log4j.xml b/qpid/java/integrationtests/src/resources/sustained-log4j.xml
new file mode 100644
index 0000000000..c5ab3137bf
--- /dev/null
+++ b/qpid/java/integrationtests/src/resources/sustained-log4j.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+
+ <appender name="FileAppender" class="org.apache.log4j.FileAppender">
+ <param name="File" value="${QPID_WORK}/log/${logprefix}qpid${logsuffix}.log"/>
+ <param name="Append" value="false"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p (%F:%L) - %m%n"/>
+ <!--param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/-->
+ </layout>
+ </appender>
+
+ <category name="SustainedTest">
+ <priority value="${sustained.level}"/>
+ </category>
+
+ <category name="org.apache">
+ <priority value="warn"/>
+ </category>
+
+ <category name="org.apache.qpid.interop">
+ <priority value="${interop.logging.level}"/>
+ </category>
+
+
+ <category name="org.apache.qpid.sustained">
+ <priority value="${amqj.logging.level}"/>
+ </category>
+
+ <!--category name="org.apache.qpid.server.txn">
+ <priority value="debug"/>
+ </category>-->
+
+ <root>
+ <priority value="all"/>
+ <appender-ref ref="STDOUT"/>
+ <!--appender-ref ref="ArchivingFileAppender"/-->
+ </root>
+</log4j:configuration>
diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml
index 2e1a792c49..cf39e43ecc 100644
--- a/qpid/java/pom.xml
+++ b/qpid/java/pom.xml
@@ -391,6 +391,29 @@ under the License.
<version>0.5</version>
</plugin>
+ <plugin>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <version>1.0-alpha-5</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>process</goal>
+ </goals>
+ <configuration>
+ <resourceBundles>
+ <resourceBundle>org.apache:apache-incubator-disclaimer-resource-bundle:1.1</resourceBundle>
+ <resourceBundle>org.apache:apache-jar-resource-bundle:1.2</resourceBundle>
+ </resourceBundles>
+ <properties>
+ <addLicense>true</addLicense>
+ <projectName>Apache Qpid</projectName>
+ </properties>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
</plugins>
</pluginManagement>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
index 05fbceca20..048fcfb0b3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
@@ -25,10 +25,11 @@ import junit.framework.TestCase;
import org.apache.log4j.NDC;
import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,8 +42,11 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
/**
* ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS
@@ -58,6 +62,10 @@ import java.util.List;
* connected.
* <tr><td> Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is
* connected.
+ * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when a consumer is
+ * disconnected.
+ * <tr><dt> Check that an immediate message results in no consumers code, in a transaction, when a consumer is
+ * disconnected.
* </table>
*
* @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties,
@@ -73,63 +81,215 @@ public class ImmediateMessageTest extends TestCase
private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class);
/** Used to read the tests configurable properties through. */
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+ ParsedProperties testProps;
- /** All these tests should have the immediate flag on. */
- private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+ /** Used to create unique destination names for each test.
+ * @todo Move into the test framework.
+ */
+ private static AtomicLong uniqueDestsId = new AtomicLong();
/** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_517_ImmediateOkNoTx() throws Exception
+ public void test_QPID_517_ImmediateOkNoTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_517_ImmediateOkTx() throws Exception
+ public void test_QPID_517_ImmediateOkTxP2P() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoRouteTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedNoTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, in a transaction, when a consumer is disconnected. */
+ public void test_QPID_517_ImmediateFailsConsumerDisconnectedTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoConsumersException.class);
}
/** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception
+ public void test_QPID_517_ImmediateFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception
+ public void test_QPID_517_ImmediateFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ PublisherReceiver testClients = PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
{
NDC.push(getName());
+ testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the immediate flag on. */
+ testProps.setProperty(IMMEDIATE_PROPNAME, true);
+
+ /** Bind the receivers consumer by default. */
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true);
+
// Ensure that the in-vm broker is created.
TransportConnection.createVMBroker(1);
}
@@ -220,10 +380,64 @@ public class ImmediateMessageTest extends TestCase
return false;
}
+ /**
+ * Reports the number of exceptions held by this monitor.
+ *
+ * @return The number of exceptions held by this monitor.
+ */
+ public int size()
+ {
+ return exceptions.size();
+ }
+
public void reset()
{
exceptions = new ArrayList();
}
+
+ /**
+ * Provides a dump of the stack traces of all exceptions that this exception monitor was notified of. Mainly
+ * use for debugging/test failure reporting purposes.
+ *
+ * @return A string containing a dump of the stack traces of all exceptions.
+ */
+ public String toString()
+ {
+ String result = "ExceptionMonitor: holds " + exceptions.size() + " exceptions.\n\n";
+
+ for (JMSException ex : exceptions)
+ {
+ result += getStackTrace(ex) + "\n";
+ }
+
+ return result;
+ }
+
+ /**
+ * Prints an exception stack trace into a string.
+ *
+ * @param t The throwable to get the stack trace from.
+ *
+ * @return A string containing the throwables stack trace.
+ */
+ public static String getStackTrace(Throwable t)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw, true);
+ t.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+
+ return sw.toString();
+ }
+ }
+
+ public static class MessageMonitor implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message): called");
+ }
}
/**
@@ -290,22 +504,30 @@ public class ImmediateMessageTest extends TestCase
{
try
{
- int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
- boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
- String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME);
- String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME);
+ // Get a unique offset to append to destination names to make them unique to the connection.
+ long uniqueId = uniqueDestsId.incrementAndGet();
+
+ // Extract the standard test configuration parameters relevant to the connection.
+ String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId;
+ String destinationReceiveRoot =
+ messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME) + "_" + uniqueId;
boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME);
boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME);
boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME);
boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME);
+
+ // Check which JMS flags and options are to be set.
+ int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ boolean durableSubscription = messagingProps.getPropertyAsBoolean(DURABLE_SUBSCRIPTION_PROPNAME);
// Check if any Qpid/AMQP specific flags or options need to be set.
boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
boolean needsQpidOptions = immediate | mandatory;
- log.debug("ackMode = " + ackMode);
+ /*log.debug("ackMode = " + ackMode);
log.debug("useTopics = " + useTopics);
log.debug("destinationSendRoot = " + destinationSendRoot);
log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
@@ -316,7 +538,7 @@ public class ImmediateMessageTest extends TestCase
log.debug("transactional = " + transactional);
log.debug("immediate = " + immediate);
log.debug("mandatory = " + mandatory);
- log.debug("needsQpidOptions = " + needsQpidOptions);
+ log.debug("needsQpidOptions = " + needsQpidOptions);*/
// Create connection, sessions and producer/consumer pairs on each session.
Connection connection = createConnection(messagingProps);
@@ -329,7 +551,7 @@ public class ImmediateMessageTest extends TestCase
Session receiverSession = connection.createSession(transactional, ackMode);
Destination publisherProducerDestination =
- useTopics ? publisherSession.createTopic(destinationSendRoot)
+ useTopics ? (Destination) publisherSession.createTopic(destinationSendRoot)
: publisherSession.createQueue(destinationSendRoot);
MessageProducer publisherProducer =
@@ -342,13 +564,29 @@ public class ImmediateMessageTest extends TestCase
createPublisherConsumer
? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+ if (publisherConsumer != null)
+ {
+ publisherConsumer.setMessageListener(new MessageMonitor());
+ }
+
MessageProducer receiverProducer =
createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
: null;
+ Destination receiverConsumerDestination =
+ useTopics ? (Destination) receiverSession.createTopic(destinationSendRoot)
+ : receiverSession.createQueue(destinationSendRoot);
+
MessageConsumer receiverConsumer =
- createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
- : null;
+ createReceiverConsumer
+ ? ((durableSubscription && useTopics)
+ ? receiverSession.createDurableSubscriber((Topic) receiverConsumerDestination, "testsub")
+ : receiverSession.createConsumer(receiverConsumerDestination)) : null;
+
+ if (receiverConsumer != null)
+ {
+ receiverConsumer.setMessageListener(new MessageMonitor());
+ }
// Start listening for incoming messages.
connection.start();
@@ -372,7 +610,8 @@ public class ImmediateMessageTest extends TestCase
public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
{
- return client.getSession().createMessage();
+ return client.getSession().createTextMessage("Hello");
+ // return client.getSession().createMessage();
}
/**
@@ -428,12 +667,12 @@ public class ImmediateMessageTest extends TestCase
public MessageProducer getProducer()
{
- return null;
+ return producer;
}
public MessageConsumer getConsumer()
{
- return null;
+ return consumer;
}
public void send(Message message) throws JMSException
@@ -524,6 +763,10 @@ public class ImmediateMessageTest extends TestCase
public ExceptionMonitor getExceptionMonitor();
+ public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */);
+
+ public void testNoExceptions(ParsedProperties testProps);
+
public void close();
}
@@ -603,62 +846,63 @@ public class ImmediateMessageTest extends TestCase
}
}
- public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
+ public void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
{
- PublisherReceiver testClients;
-
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- testClients = createPublisherReceiverPairSharedConnection(testProps);
- testClients.start();
-
- testClients.send(testProps, 1);
-
+ start();
+ send(testProps, 1);
pause(1000L);
String errors = "";
- if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass))
+ ExceptionMonitor connectionExceptionMonitor = getConnectionExceptionMonitor();
+ if (!connectionExceptionMonitor.assertOneJMSExceptionWithLinkedCause(aClass))
{
- errors += "Was expecting linked exception type " + aClass.getName() + ".\n";
+ errors += "Was expecting linked exception type " + aClass.getName() + " on the connection.\n";
+ errors +=
+ (connectionExceptionMonitor.size() > 0)
+ ? ("Actually got the following exceptions on the connection, " + connectionExceptionMonitor)
+ : "Got no exceptions on the connection.";
}
// Clean up the publisher/receiver client pair.
- testClients.close();
+ close();
assertEquals(errors, "", errors);
}
/**
*/
- public static void testNoExceptions(ParsedProperties testProps)
+ public void testNoExceptions(ParsedProperties testProps)
{
- PublisherReceiver testClients;
-
- // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
- testClients = createPublisherReceiverPairSharedConnection(testProps);
- testClients.start();
-
- testClients.send(testProps, 1);
-
+ start();
+ send(testProps, 1);
pause(1000L);
String errors = "";
- if (!testClients.getConnectionExceptionMonitor().assertNoExceptions())
+ if (!getConnectionExceptionMonitor().assertNoExceptions())
{
- errors += "There were connection exceptions.\n";
+ errors += "Was expecting no exceptions.\n";
+ errors += "Got the following exceptions on the connection, " + getConnectionExceptionMonitor();
}
- if (!testClients.getExceptionMonitor().assertNoExceptions())
+ if (!getExceptionMonitor().assertNoExceptions())
{
- errors += "There were exceptions on producer.\n";
+ errors += "Was expecting no exceptions.\n";
+ errors += "Got the following exceptions on the producer, " + getExceptionMonitor();
}
// Clean up the publisher/receiver client pair.
- testClients.close();
+ close();
assertEquals(errors, "", errors);
}
+
+ public static PublisherReceiver connectClients(ParsedProperties testProps)
+ {
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ return createPublisherReceiverPairSharedConnection(testProps);
+ }
}
/**
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
index f41acca11b..09a32aa3eb 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
@@ -49,6 +49,10 @@ import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
* connected.
* <tr><td> Check that a mandatory message results in no route code, upon transaction commit, when a consumer is
* connected.
+ * <tr><td> Check that a mandatory message is sent succesfully, not using transactions, when a consumer is
+ * disconnected but the route exists.
+ * <tr><dt> Check that a mandatory message is send successfully, in a transactions, when a consumer is
+ * disconnected but when the route exists.
* </table>
*/
public class MandatoryMessageTest extends TestCase
@@ -57,64 +61,234 @@ public class MandatoryMessageTest extends TestCase
private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class);
/** Used to read the tests configurable properties through. */
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+ ParsedProperties testProps;
- /** All these tests should have the mandatory flag on. */
- // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
- private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true);
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTxP2P() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message and get a linked no consumers exception.
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
/** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
- public void test_QPID_508_MandatoryOkNoTx() throws Exception
+ public void test_QPID_508_MandatoryOkNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
// Send one message with no errors.
- ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
- public void test_QPID_508_MandatoryOkTx() throws Exception
+ public void test_QPID_508_MandatoryOkTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, not using transactions, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
+
+ // Send one message with no errors.
+ testClients.testNoExceptions(testProps);
+ }
+
+ /**
+ * Check that a mandatory message is sent succesfully, in a transaction, when a consumer is disconnected but
+ * the route exists.
+ */
+ public void test_QPID_517_MandatoryOkConsumerDisconnectedTxPubSub() throws Exception
+ {
+ // Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
+
+ // Use durable subscriptions, so that the route remains open with no subscribers.
+ testProps.setProperty(DURABLE_SUBSCRIPTION_PROPNAME, true);
+
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
+ // Disconnect the consumer.
+ testClients.getReceiver().getConsumer().close();
// Send one message with no errors.
- ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ testClients.testNoExceptions(testProps);
}
/** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception
+ public void test_QPID_508_MandatoryFailsNoRouteNoTxPubSub() throws Exception
{
// Ensure transactional sessions are off.
testProps.setProperty(TRANSACTED_PROPNAME, false);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
/** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
- public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception
+ public void test_QPID_508_MandatoryFailsNoRouteTxPubSub() throws Exception
{
// Ensure transactional sessions are on.
testProps.setProperty(TRANSACTED_PROPNAME, true);
+ testProps.setProperty(PUBSUB_PROPNAME, true);
// Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
// collect its messages).
testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+ ImmediateMessageTest.PublisherReceiver testClients =
+ ImmediateMessageTest.PublisherReceiverImpl.connectClients(testProps);
+
// Send one message and get a linked no consumers exception.
- ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ testClients.testWithAssertions(testProps, AMQNoRouteException.class);
}
protected void setUp() throws Exception
{
NDC.push(getName());
+ testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the mandatory flag on. */
+ testProps.setProperty(MANDATORY_PROPNAME, true);
+
+ /** Bind the receivers consumer by default. */
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, true);
+
// Ensure that the in-vm broker is created.
TransportConnection.createVMBroker(1);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
index 9c8cefc492..b584c8c80b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import org.apache.qpid.jms.Session;
@@ -167,6 +187,12 @@ public class MessagingTestConfigProperties
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the durable subscriptions flag from, when doing pub/sub messaging. */
+ public static final String DURABLE_SUBSCRIPTION_PROPNAME = "durableSubscription";
+
+ /** Defines the default value of the durable subscriptions flag. */
+ public static final boolean DURABLE_SUBSCRIPTION_DEFAULT = false;
+
// ====================== Qpid Options and Flags ================================
/** Holds the name of the property to set the exclusive flag from. */
@@ -272,6 +298,7 @@ public class MessagingTestConfigProperties
defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_SUBSCRIPTION_PROPNAME, DURABLE_SUBSCRIPTION_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);