summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-04-05 11:47:50 +0000
committerRobert Greig <rgreig@apache.org>2007-04-05 11:47:50 +0000
commit4547988868ea17dd34064204de84e66206b16d5b (patch)
tree2ffa201455021216f7f76dd00ef9f9110df53da4
parent67674e50665e7def7b90569e3b3d33c3f047db5b (diff)
downloadqpid-python-4547988868ea17dd34064204de84e66206b16d5b.tar.gz
Merged revisions 522994-523245 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r522994 | rgreig | 2007-03-27 17:48:23 +0100 (Tue, 27 Mar 2007) | 1 line Test added for durability of messages under broker failure. ........ r523245 | rgreig | 2007-03-28 10:30:49 +0100 (Wed, 28 Mar 2007) | 1 line Reversed accidental replacing of the word 'initialize' in comments to 'establishConnection' through a method refactoring. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525800 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java10
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java600
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java56
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java389
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java930
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java8
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java2
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java67
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java75
9 files changed, 1282 insertions, 855 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
index cd8e0a80a1..bad49060ca 100644
--- a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
+++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java
@@ -61,10 +61,10 @@ public class ClasspathScanner
* @return All the classes that match this collector.
*/
public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp,
- boolean beanOnly)
+ boolean beanOnly)
{
log.debug("public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass = " + matchingClass
- + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
+ + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
// Build a compiled regular expression from the pattern to match.
Pattern matchPattern = Pattern.compile(matchingRegexp);
@@ -95,11 +95,11 @@ public class ClasspathScanner
* iteration.
*/
private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result,
- Pattern matchPattern, Class<? extends T> matchClass)
+ Pattern matchPattern, Class<? extends T> matchClass)
{
log.debug("private static <T> void gatherFiles(File classRoot = " + classRoot + ", String classFileName = "
- + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
- + ", Class<? extends T> matchClass = " + matchClass + "): called");
+ + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
+ + ", Class<? extends T> matchClass = " + matchClass + "): called");
File thisRoot = new File(classRoot, classFileName);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
index e9215a4876..a861405d30 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.management.ui.views;
-import static org.apache.qpid.management.ui.Constants.*;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -29,12 +27,14 @@ import java.util.HashMap;
import java.util.List;
import org.apache.qpid.management.ui.ApplicationRegistry;
+import static org.apache.qpid.management.ui.Constants.*;
import org.apache.qpid.management.ui.ManagedBean;
import org.apache.qpid.management.ui.ManagedServer;
import org.apache.qpid.management.ui.ServerRegistry;
import org.apache.qpid.management.ui.exceptions.InfoRequiredException;
import org.apache.qpid.management.ui.jmx.JMXServerRegistry;
import org.apache.qpid.management.ui.jmx.MBeanUtility;
+
import org.eclipse.jface.preference.PreferenceStore;
import org.eclipse.jface.viewers.DoubleClickEvent;
import org.eclipse.jface.viewers.IDoubleClickListener;
@@ -48,6 +48,7 @@ import org.eclipse.jface.viewers.TreeExpansionEvent;
import org.eclipse.jface.viewers.TreeViewer;
import org.eclipse.jface.viewers.Viewer;
import org.eclipse.jface.viewers.ViewerSorter;
+
import org.eclipse.swt.SWT;
import org.eclipse.swt.graphics.Font;
import org.eclipse.swt.graphics.Image;
@@ -62,6 +63,7 @@ import org.eclipse.swt.widgets.MenuItem;
import org.eclipse.swt.widgets.Shell;
import org.eclipse.swt.widgets.Tree;
import org.eclipse.swt.widgets.TreeItem;
+
import org.eclipse.ui.part.ViewPart;
/**
@@ -71,29 +73,29 @@ import org.eclipse.ui.part.ViewPart;
*/
public class NavigationView extends ViewPart
{
- public static final String ID = "org.apache.qpid.management.ui.navigationView";
+ public static final String ID = "org.apache.qpid.management.ui.navigationView";
public static final String INI_FILENAME = System.getProperty("user.home") + File.separator + "qpidManagementConsole.ini";
-
+
private static final String INI_SERVERS = "Servers";
private static final String INI_QUEUES = QUEUE + "s";
private static final String INI_CONNECTIONS = CONNECTION + "s";
private static final String INI_EXCHANGES = EXCHANGE + "s";
-
+
private TreeViewer _treeViewer = null;
private TreeObject _rootNode = null;
private TreeObject _serversRootNode = null;
-
+
private PreferenceStore _preferences;
// Map of connected servers
private HashMap<ManagedServer, TreeObject> _managedServerMap = new HashMap<ManagedServer, TreeObject>();
-
+
private void createTreeViewer(Composite parent)
{
_treeViewer = new TreeViewer(parent);
_treeViewer.setContentProvider(new ContentProviderImpl());
- _treeViewer.setLabelProvider(new LabelProviderImpl());
+ _treeViewer.setLabelProvider(new LabelProviderImpl());
_treeViewer.setSorter(new ViewerSorterImpl());
-
+
// layout the tree viewer below the label field, to cover the area
GridData layoutData = new GridData();
layoutData = new GridData();
@@ -103,118 +105,124 @@ public class NavigationView extends ViewPart
layoutData.verticalAlignment = GridData.FILL;
_treeViewer.getControl().setLayoutData(layoutData);
_treeViewer.setUseHashlookup(true);
-
+
createListeners();
}
-
+
/**
* Creates listeners for the JFace treeviewer
*/
private void createListeners()
{
- _treeViewer.addDoubleClickListener(new IDoubleClickListener() {
+ _treeViewer.addDoubleClickListener(new IDoubleClickListener()
+ {
public void doubleClick(DoubleClickEvent event)
{
- IStructuredSelection ss = (IStructuredSelection)event.getSelection();
- if (ss == null || ss.getFirstElement() == null)
+ IStructuredSelection ss = (IStructuredSelection) event.getSelection();
+ if ((ss == null) || (ss.getFirstElement() == null))
{
return;
}
+
boolean state = _treeViewer.getExpandedState(ss.getFirstElement());
_treeViewer.setExpandedState(ss.getFirstElement(), !state);
}
});
-
- _treeViewer.addTreeListener(new ITreeViewerListener() {
- public void treeExpanded(TreeExpansionEvent event)
- {
- _treeViewer.setExpandedState(event.getElement(), true);
- // Following will cause the selection event to be sent, so commented
- //_treeViewer.setSelection(new StructuredSelection(event.getElement()));
- _treeViewer.refresh();
- }
- public void treeCollapsed(TreeExpansionEvent event)
+ _treeViewer.addTreeListener(new ITreeViewerListener()
{
- _treeViewer.setExpandedState(event.getElement(), false);
- _treeViewer.refresh();
- }
- });
-
+ public void treeExpanded(TreeExpansionEvent event)
+ {
+ _treeViewer.setExpandedState(event.getElement(), true);
+ // Following will cause the selection event to be sent, so commented
+ // _treeViewer.setSelection(new StructuredSelection(event.getElement()));
+ _treeViewer.refresh();
+ }
+
+ public void treeCollapsed(TreeExpansionEvent event)
+ {
+ _treeViewer.setExpandedState(event.getElement(), false);
+ _treeViewer.refresh();
+ }
+ });
+
// This listener is for popup menu, which pops up if a queue,exchange or connection is selected
// with right click.
- _treeViewer.getTree().addListener(SWT.MenuDetect, new Listener () {
- Display display = getSite().getShell().getDisplay();
- final Shell shell = new Shell (display);
-
- public void handleEvent(Event event)
+ _treeViewer.getTree().addListener(SWT.MenuDetect, new Listener()
{
- Tree widget = (Tree)event.widget;
- TreeItem[] items = widget.getSelection();
- if (items == null) return;
-
- // Get the selected node
- final TreeObject selectedNode = (TreeObject)items[0].getData();
- final TreeObject parentNode = selectedNode.getParent();
-
- // This popup is only for mbeans and only connection,exchange and queue types
- if (parentNode == null ||
- !MBEAN.equals(selectedNode.getType()) ||
- !(CONNECTION.equals(parentNode.getName()) ||
- QUEUE.equals(parentNode.getName()) ||
- EXCHANGE.equals(parentNode.getName()))
- )
+ Display display = getSite().getShell().getDisplay();
+ final Shell shell = new Shell(display);
+
+ public void handleEvent(Event event)
{
- return;
- }
-
- Menu menu = new Menu (shell, SWT.POP_UP);
- MenuItem item = new MenuItem (menu, SWT.PUSH);
- // Add the action item, which will remove the node from the tree if selected
- item.setText(ACTION_REMOVE_MBEANNODE);
- item.addListener (SWT.Selection, new Listener () {
- public void handleEvent (Event e)
+ Tree widget = (Tree) event.widget;
+ TreeItem[] items = widget.getSelection();
+ if (items == null)
{
- removeManagedObject(parentNode, (ManagedBean)selectedNode.getManagedObject());
- _treeViewer.refresh();
- // set the selection to the parent node
- _treeViewer.setSelection(new StructuredSelection(parentNode));
+ return;
}
- });
- menu.setLocation (event.x, event.y);
- menu.setVisible (true);
- while (!menu.isDisposed () && menu.isVisible ())
- {
- if (!display.readAndDispatch ())
+
+ // Get the selected node
+ final TreeObject selectedNode = (TreeObject) items[0].getData();
+ final TreeObject parentNode = selectedNode.getParent();
+
+ // This popup is only for mbeans and only connection,exchange and queue types
+ if ((parentNode == null) || !MBEAN.equals(selectedNode.getType())
+ || !(CONNECTION.equals(parentNode.getName()) || QUEUE.equals(parentNode.getName())
+ || EXCHANGE.equals(parentNode.getName())))
{
- display.sleep ();
+ return;
}
+
+ Menu menu = new Menu(shell, SWT.POP_UP);
+ MenuItem item = new MenuItem(menu, SWT.PUSH);
+ // Add the action item, which will remove the node from the tree if selected
+ item.setText(ACTION_REMOVE_MBEANNODE);
+ item.addListener(SWT.Selection, new Listener()
+ {
+ public void handleEvent(Event e)
+ {
+ removeManagedObject(parentNode, (ManagedBean) selectedNode.getManagedObject());
+ _treeViewer.refresh();
+ // set the selection to the parent node
+ _treeViewer.setSelection(new StructuredSelection(parentNode));
+ }
+ });
+ menu.setLocation(event.x, event.y);
+ menu.setVisible(true);
+ while (!menu.isDisposed() && menu.isVisible())
+ {
+ if (!display.readAndDispatch())
+ {
+ display.sleep();
+ }
+ }
+
+ menu.dispose();
}
- menu.dispose ();
- }
- });
- }
-
+ });
+ }
+
/**
* Creates Qpid Server connection using JMX RMI protocol
* @param server
* @throws Exception
*/
private void createRMIServerConnection(ManagedServer server) throws Exception
- {
+ {
try
{
// Currently Qpid Management Console only supports JMX MBeanServer
- ServerRegistry serverRegistry = new JMXServerRegistry(server);
- ApplicationRegistry.addServer(server, serverRegistry);
+ ServerRegistry serverRegistry = new JMXServerRegistry(server);
+ ApplicationRegistry.addServer(server, serverRegistry);
}
- catch(Exception ex)
+ catch (Exception ex)
{
ex.printStackTrace();
throw new Exception("Error in connecting to Qpid broker at " + server.getUrl(), ex);
}
}
-
+
/**
* Adds a new server node in the navigation view if server connection is successful.
* @param transportProtocol
@@ -223,15 +231,15 @@ public class NavigationView extends ViewPart
* @param domain
* @throws Exception
*/
- public void addNewServer(String transportProtocol, String host, int port,
- String domain, String user, String pwd) throws Exception
+ public void addNewServer(String transportProtocol, String host, int port, String domain, String user, String pwd)
+ throws Exception
{
String serverAddress = host + ":" + port;
String url = null;
ManagedServer managedServer = new ManagedServer(host, port, domain, user, pwd);
-
+
if ("RMI".equals(transportProtocol))
- {
+ {
url = managedServer.getUrl();
List<TreeObject> list = _serversRootNode.getChildren();
for (TreeObject node : list)
@@ -242,10 +250,11 @@ public class NavigationView extends ViewPart
// Set the server node as selected and then connect it.
_treeViewer.setSelection(new StructuredSelection(node));
reconnect(user, pwd);
+
return;
}
}
-
+
// The server is not in the list of already added servers, so now connect and add it.
managedServer.setName(serverAddress);
createRMIServerConnection(managedServer);
@@ -254,28 +263,28 @@ public class NavigationView extends ViewPart
{
throw new InfoRequiredException(transportProtocol + " transport is not supported");
}
-
+
// Server connection is successful. Now add the server in the tree
TreeObject serverNode = new TreeObject(serverAddress, NODE_TYPE_SERVER);
serverNode.setUrl(url);
serverNode.setManagedObject(managedServer);
_serversRootNode.addChild(serverNode);
-
+
// Add server in the connected server map
_managedServerMap.put(managedServer, serverNode);
-
+
// populate the server tree
- populateServer(serverNode);
-
+ populateServer(serverNode);
+
// Add the Queue/Exchanges/Connections from config file into the navigation tree
addConfiguredItems(managedServer);
-
+
_treeViewer.refresh();
-
- // save server address in file
+
+ // save server address in file
addServerInConfigFile(serverAddress);
}
-
+
/**
* Create the config file, if it doesn't already exist.
* Exits the application if the file could not be created.
@@ -290,29 +299,29 @@ public class NavigationView extends ViewPart
file.createNewFile();
}
}
- catch(IOException ex)
+ catch (IOException ex)
{
System.out.println("Could not write to the file " + INI_FILENAME);
System.out.println(ex);
System.exit(1);
}
}
-
+
/**
* Server addresses are stored in a file. When user launches the application again, the
* server addresses are picked up from the file and shown in the navigfation view. This method
- * adds the server address in a file, when a new server is added in the navigation view.
+ * adds the server address in a file, when a new server is added in the navigation view.
* @param serverAddress
*/
private void addServerInConfigFile(String serverAddress)
{
// Check if the address already exists
List<String> list = getServerListFromFile();
- if (list != null && list.contains(serverAddress))
+ if ((list != null) && list.contains(serverAddress))
{
return;
}
-
+
// Get the existing server list and add to that
String servers = _preferences.getString(INI_SERVERS);
String value = (servers.length() != 0) ? (servers + "," + serverAddress) : serverAddress;
@@ -321,12 +330,12 @@ public class NavigationView extends ViewPart
{
_preferences.save();
}
- catch(IOException ex)
+ catch (IOException ex)
{
System.err.println("Could not add " + serverAddress + " in " + INI_SERVERS + " (" + INI_FILENAME + ")");
System.out.println(ex);
}
- }
+ }
/**
* Adds the item (Queue/Exchange/Connection) to the config file
@@ -337,20 +346,20 @@ public class NavigationView extends ViewPart
*/
private void addItemInConfigFile(TreeObject node)
{
- ManagedBean mbean = (ManagedBean)node.getManagedObject();
+ ManagedBean mbean = (ManagedBean) node.getManagedObject();
String server = mbean.getServer().getName();
String virtualhost = mbean.getVirtualHostName();
String type = node.getParent().getName() + "s";
String name = node.getName();
String itemKey = server + "." + virtualhost + "." + type;
-
+
// Check if the item already exists in the config file
List<String> list = getConfiguredItemsFromFile(itemKey);
- if (list != null && list.contains(name))
+ if ((list != null) && list.contains(name))
{
return;
}
-
+
// Add this item to the existing list of items
String items = _preferences.getString(itemKey);
String value = (items.length() != 0) ? (items + "," + name) : name;
@@ -359,21 +368,21 @@ public class NavigationView extends ViewPart
{
_preferences.save();
}
- catch(IOException ex)
+ catch (IOException ex)
{
System.err.println("Could not add " + name + " in " + itemKey + " (" + INI_FILENAME + ")");
System.out.println(ex);
}
}
-
+
private void removeItemFromConfigFile(TreeObject node)
{
- ManagedBean mbean = (ManagedBean)node.getManagedObject();
+ ManagedBean mbean = (ManagedBean) node.getManagedObject();
String server = mbean.getServer().getName();
String vHost = mbean.getVirtualHostName();
String type = node.getParent().getName() + "s";
String itemKey = server + "." + vHost + "." + type;
-
+
List<String> list = getConfiguredItemsFromFile(itemKey);
if (list.contains(node.getName()))
{
@@ -383,29 +392,30 @@ public class NavigationView extends ViewPart
{
value += item + ",";
}
+
value = (value.lastIndexOf(",") != -1) ? value.substring(0, value.lastIndexOf(",")) : value;
-
+
_preferences.putValue(itemKey, value);
try
{
_preferences.save();
}
- catch(IOException ex)
+ catch (IOException ex)
{
- System.err.println("Error in updating the config file "+ INI_FILENAME);
+ System.err.println("Error in updating the config file " + INI_FILENAME);
System.out.println(ex);
}
}
}
/**
- * Queries the qpid server for MBeans and populates the navigation view with all MBeans for
+ * Queries the qpid server for MBeans and populates the navigation view with all MBeans for
* the given server node.
* @param serverNode
*/
private void populateServer(TreeObject serverNode)
{
- ManagedServer server = (ManagedServer)serverNode.getManagedObject();
+ ManagedServer server = (ManagedServer) serverNode.getManagedObject();
String domain = server.getDomain();
try
{
@@ -414,29 +424,30 @@ public class NavigationView extends ViewPart
TreeObject domainNode = new TreeObject(domain, NODE_TYPE_DOMAIN);
domainNode.setParent(serverNode);
- populateDomain(domainNode);
+ populateDomain(domainNode);
}
else
{
List<TreeObject> domainList = new ArrayList<TreeObject>();
- List<String> domains = MBeanUtility.getAllDomains(server);;
+ List<String> domains = MBeanUtility.getAllDomains(server);
+ ;
for (String domainName : domains)
- {
+ {
TreeObject domainNode = new TreeObject(domainName, NODE_TYPE_DOMAIN);
domainNode.setParent(serverNode);
domainList.add(domainNode);
- populateDomain(domainNode);
+ populateDomain(domainNode);
}
}
}
- catch(Exception ex)
+ catch (Exception ex)
{
System.out.println("\nError in connecting to Qpid broker ");
ex.printStackTrace();
}
}
-
+
/**
* Queries the Qpid Server and populates the given domain node with all MBeans undser that domain.
* @param domain
@@ -446,19 +457,19 @@ public class NavigationView extends ViewPart
@SuppressWarnings("unchecked")
private void populateDomain(TreeObject domain) throws IOException, Exception
{
- ManagedServer server = (ManagedServer)domain.getParent().getManagedObject();
-
+ ManagedServer server = (ManagedServer) domain.getParent().getManagedObject();
+
// Now populate the mbenas under those types
List<ManagedBean> mbeans = MBeanUtility.getManagedObjectsForDomain(server, domain.getName());
for (ManagedBean mbean : mbeans)
{
mbean.setServer(server);
ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server);
- serverRegistry.addManagedObject(mbean);
-
+ serverRegistry.addManagedObject(mbean);
+
// Add all mbeans other than Connections, Exchanges and Queues. Because these will be added
- // manually by selecting from MBeanView
- if (!(mbean.isConnection() || mbean.isExchange() || mbean.isQueue()) )
+ // manually by selecting from MBeanView
+ if (!(mbean.isConnection() || mbean.isExchange() || mbean.isQueue()))
{
addManagedBean(domain, mbean);
}
@@ -471,10 +482,11 @@ public class NavigationView extends ViewPart
{
addDefaultNodes(domain);
}
+
break;
}
}
-
+
/**
* Add these three types - Connection, Exchange, Queue
* By adding these, these will always be available, even if there are no mbeans under thse types
@@ -493,7 +505,7 @@ public class NavigationView extends ViewPart
typeChild.setParent(parent);
typeChild.setVirtualHost(parent.getVirtualHost());
}
-
+
/**
* Checks if a particular mbeantype is already there in the navigation view for a domain.
* This is used while populating domain with mbeans.
@@ -506,24 +518,30 @@ public class NavigationView extends ViewPart
List<TreeObject> childNodes = parent.getChildren();
for (TreeObject child : childNodes)
{
- if ((NODE_TYPE_MBEANTYPE.equals(child.getType()) || NODE_TYPE_TYPEINSTANCE.equals(child.getType())) &&
- typeName.equals(child.getName()))
+ if ((NODE_TYPE_MBEANTYPE.equals(child.getType()) || NODE_TYPE_TYPEINSTANCE.equals(child.getType()))
+ && typeName.equals(child.getName()))
+ {
return child;
+ }
}
+
return null;
}
-
+
private boolean doesMBeanNodeAlreadyExist(TreeObject typeNode, String mbeanName)
{
List<TreeObject> childNodes = typeNode.getChildren();
for (TreeObject child : childNodes)
{
if (MBEAN.equals(child.getType()) && mbeanName.equals(child.getName()))
+ {
return true;
+ }
}
+
return false;
}
-
+
/**
* Adds the given MBean to the given domain node. Creates Notification node for the MBean.
* sample ObjectNames -
@@ -533,18 +551,18 @@ public class NavigationView extends ViewPart
* @param mbean
* @throws Exception
*/
- private void addManagedBean(TreeObject domain, ManagedBean mbean)// throws Exception
+ private void addManagedBean(TreeObject domain, ManagedBean mbean) // throws Exception
{
String name = mbean.getName();
// Split the mbean type into array of Strings, to create hierarchy
// eg. type=VirtualHost.VirtualHostManager,VirtualHost=localhost will be:
- // localhost->VirtualHostManager
+ // localhost->VirtualHostManager
// eg. type=org.apache.qpid:type=VirtualHost.Queue,VirtualHost=test,name=ping will be:
- // test->Queue->ping
+ // test->Queue->ping
String[] types = mbean.getType().split("\\.");
TreeObject typeNode = null;
TreeObject parentNode = domain;
-
+
// Run this loop till all nodes(hierarchy) for this mbean are created. This loop only creates
// all the required parent nodes for the mbean
for (int i = 0; i < types.length; i++)
@@ -554,34 +572,34 @@ public class NavigationView extends ViewPart
// If value is not null, then there will be a parent node for this mbean
// eg. for type=VirtualHost the value is "test"
typeNode = getMBeanTypeNode(parentNode, type);
-
+
// create the type node if not already created
if (typeNode == null)
{
// If the ObjectName doesn't have name property, that means there will be only one instance
// of this mbean for given "type". So there will be no type node created for this mbean.
- if (name == null && (i == types.length -1))
+ if ((name == null) && (i == (types.length - 1)))
{
break;
}
-
+
// create a node for "type"
typeNode = createTypeNode(parentNode, type);
typeNode.setVirtualHost(mbean.getVirtualHostName());
}
-
+
// now type node create becomes the parent node for next node in hierarchy
parentNode = typeNode;
-
- /*
+
+ /*
* Now create instances node for this type if value exists.
*/
- if (valueOftype == null)
+ if (valueOftype == null)
{
- // No instance node will be created when value is null (eg type=Queue)
+ // No instance node will be created when value is null (eg type=Queue)
break;
- }
-
+ }
+
// For different virtual hosts, the nodes with given value will be created.
// eg type=VirtualHost, value=test
typeNode = getMBeanTypeNode(parentNode, valueOftype);
@@ -589,55 +607,60 @@ public class NavigationView extends ViewPart
{
typeNode = createTypeInstanceNode(parentNode, valueOftype);
typeNode.setVirtualHost(mbean.getVirtualHostName());
-
+
// Create default nodes for VHost instances
if (type.equals(VIRTUAL_HOST))
{
addDefaultNodes(typeNode);
}
}
+
parentNode = typeNode;
}
-
+
if (typeNode == null)
{
typeNode = parentNode;
}
-
+
// Check if an MBean is already added
if (doesMBeanNodeAlreadyExist(typeNode, name))
+ {
return;
-
+ }
+
// Add the mbean node now
TreeObject mbeanNode = new TreeObject(mbean);
mbeanNode.setParent(typeNode);
-
+
// Add the mbean to the config file
if (mbean.isQueue() || mbean.isExchange() || mbean.isConnection())
{
addItemInConfigFile(mbeanNode);
}
-
+
// Add notification node
// TODO: show this only if the mbean sends any notification
TreeObject notificationNode = new TreeObject(NOTIFICATION, NOTIFICATION);
notificationNode.setParent(mbeanNode);
}
-
+
private TreeObject createTypeNode(TreeObject parent, String name)
{
TreeObject typeNode = new TreeObject(name, NODE_TYPE_MBEANTYPE);
typeNode.setParent(parent);
+
return typeNode;
}
-
+
private TreeObject createTypeInstanceNode(TreeObject parent, String name)
{
TreeObject typeNode = new TreeObject(name, NODE_TYPE_TYPEINSTANCE);
typeNode.setParent(parent);
+
return typeNode;
}
-
+
/**
* Removes all the child nodes of the given parent node. Used when closing a server.
* @param parent
@@ -649,7 +672,7 @@ public class NavigationView extends ViewPart
{
removeManagedObject(child);
}
-
+
list.clear();
}
@@ -666,10 +689,11 @@ public class NavigationView extends ViewPart
{
if (MBEAN.equals(child.getType()))
{
- String name = mbean.getName() != null ? mbean.getName() : mbean.getType();
+ String name = (mbean.getName() != null) ? mbean.getName() : mbean.getType();
if (child.getName().equals(name))
{
objectToRemove = child;
+
break;
}
}
@@ -678,35 +702,39 @@ public class NavigationView extends ViewPart
removeManagedObject(child, mbean);
}
}
-
+
if (objectToRemove != null)
{
list.remove(objectToRemove);
removeItemFromConfigFile(objectToRemove);
}
-
+
}
-
+
/**
* Closes the Qpid server connection
*/
public void disconnect() throws Exception
{
- TreeObject selectedNode = getSelectedServerNode();
- ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject();
+ TreeObject selectedNode = getSelectedServerNode();
+ ManagedServer managedServer = (ManagedServer) selectedNode.getManagedObject();
if (!_managedServerMap.containsKey(managedServer))
+ {
return;
+ }
// Close server connection
ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(managedServer);
- if (serverRegistry == null) // server connection is already closed
+ if (serverRegistry == null) // server connection is already closed
+ {
return;
-
+ }
+
serverRegistry.closeServerConnection();
// Add server to the closed server list and the worker thread will remove the server from required places.
ApplicationRegistry.serverConnectionClosed(managedServer);
}
-
+
/**
* Connects the selected server node
* @throws Exception
@@ -714,27 +742,28 @@ public class NavigationView extends ViewPart
public void reconnect(String user, String password) throws Exception
{
TreeObject selectedNode = getSelectedServerNode();
- ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject();
- if(_managedServerMap.containsKey(managedServer))
+ ManagedServer managedServer = (ManagedServer) selectedNode.getManagedObject();
+ if (_managedServerMap.containsKey(managedServer))
{
throw new InfoRequiredException("Server " + managedServer.getName() + " is already connected");
- }
+ }
+
managedServer.setUser(user);
managedServer.setPassword(password);
createRMIServerConnection(managedServer);
-
+
// put the server in the managed server map
_managedServerMap.put(managedServer, selectedNode);
-
+
// populate the server tree now
populateServer(selectedNode);
-
+
// Add the Queue/Exchanges/Connections from config file into the navigation tree
addConfiguredItems(managedServer);
-
- _treeViewer.refresh();
+
+ _treeViewer.refresh();
}
-
+
/**
* Adds the items(queues/exchanges/connectins) from config file to the server tree
* @param server
@@ -750,13 +779,13 @@ public class NavigationView extends ViewPart
List<String> items = getConfiguredItemsFromFile(itemKey);
List<ManagedBean> mbeans = serverRegistry.getQueues(virtualHost);
addConfiguredItems(items, mbeans);
-
+
// Add Exchanges
itemKey = server.getName() + "." + virtualHost + "." + INI_EXCHANGES;
items = getConfiguredItemsFromFile(itemKey);
mbeans = serverRegistry.getExchanges(virtualHost);
addConfiguredItems(items, mbeans);
-
+
// Add Connections
itemKey = server.getName() + "." + virtualHost + "." + INI_CONNECTIONS;
items = getConfiguredItemsFromFile(itemKey);
@@ -764,7 +793,7 @@ public class NavigationView extends ViewPart
addConfiguredItems(items, mbeans);
}
}
-
+
/**
* Gets the mbeans corresponding to the items and adds those to the navigation tree
* @param items
@@ -772,11 +801,11 @@ public class NavigationView extends ViewPart
*/
private void addConfiguredItems(List<String> items, List<ManagedBean> mbeans)
{
- if ((items == null) || items.isEmpty() | (mbeans == null) || mbeans.isEmpty())
+ if ((items == null) || (items.isEmpty() | (mbeans == null)) || mbeans.isEmpty())
{
return;
}
-
+
for (String item : items)
{
for (ManagedBean mbean : mbeans)
@@ -784,12 +813,13 @@ public class NavigationView extends ViewPart
if (item.equals(mbean.getName()))
{
addManagedBean(mbean);
+
break;
}
}
}
}
-
+
/**
* Closes the Qpid server connection if not already closed and removes the server node from the navigation view and
* also from the ini file stored in the system.
@@ -798,9 +828,9 @@ public class NavigationView extends ViewPart
public void removeServer() throws Exception
{
disconnect();
-
+
// Remove from the Tree
- String serverNodeName = getSelectedServerNode().getName();
+ String serverNodeName = getSelectedServerNode().getName();
List<TreeObject> list = _serversRootNode.getChildren();
TreeObject objectToRemove = null;
for (TreeObject child : list)
@@ -808,46 +838,48 @@ public class NavigationView extends ViewPart
if (child.getName().equals(serverNodeName))
{
objectToRemove = child;
+
break;
}
}
-
+
if (objectToRemove != null)
{
list.remove(objectToRemove);
}
-
+
_treeViewer.refresh();
-
+
// Remove from the ini file
removeServerFromConfigFile(serverNodeName);
}
-
+
private void removeServerFromConfigFile(String serverNodeName)
{
List<String> serversList = getServerListFromFile();
serversList.remove(serverNodeName);
-
+
String value = "";
for (String item : serversList)
{
value += item + ",";
}
+
value = (value.lastIndexOf(",") != -1) ? value.substring(0, value.lastIndexOf(",")) : value;
-
+
_preferences.putValue(INI_SERVERS, value);
-
+
try
{
_preferences.save();
}
- catch(IOException ex)
+ catch (IOException ex)
{
- System.err.println("Error in updating the config file "+ INI_FILENAME);
+ System.err.println("Error in updating the config file " + INI_FILENAME);
System.out.println(ex);
}
}
-
+
/**
* @return the server addresses from the ini file
* @throws Exception
@@ -856,7 +888,7 @@ public class NavigationView extends ViewPart
{
return getConfiguredItemsFromFile(INI_SERVERS);
}
-
+
/**
* Returns the list of items from the config file.
* sample ini file:
@@ -879,59 +911,60 @@ public class NavigationView extends ViewPart
list.add(item);
}
}
-
+
return list;
}
-
+
public TreeObject getSelectedServerNode() throws Exception
{
- IStructuredSelection ss = (IStructuredSelection)_treeViewer.getSelection();
- TreeObject selectedNode = (TreeObject)ss.getFirstElement();
- if (ss.isEmpty() || selectedNode == null || (!selectedNode.getType().equals(NODE_TYPE_SERVER)))
+ IStructuredSelection ss = (IStructuredSelection) _treeViewer.getSelection();
+ TreeObject selectedNode = (TreeObject) ss.getFirstElement();
+ if (ss.isEmpty() || (selectedNode == null) || (!selectedNode.getType().equals(NODE_TYPE_SERVER)))
{
throw new InfoRequiredException("Please select the server");
}
return selectedNode;
}
- /**
+
+ /**
* This is a callback that will allow us to create the viewer and initialize
* it.
*/
- public void createPartControl(Composite parent)
+ public void createPartControl(Composite parent)
{
Composite composite = new Composite(parent, SWT.NONE);
GridLayout gridLayout = new GridLayout();
gridLayout.marginHeight = 2;
gridLayout.marginWidth = 2;
- gridLayout.horizontalSpacing = 0;
- gridLayout.verticalSpacing = 2;
+ gridLayout.horizontalSpacing = 0;
+ gridLayout.verticalSpacing = 2;
composite.setLayout(gridLayout);
-
+
createTreeViewer(composite);
_rootNode = new TreeObject("ROOT", "ROOT");
_serversRootNode = new TreeObject(NAVIGATION_ROOT, "ROOT");
_serversRootNode.setParent(_rootNode);
-
+
_treeViewer.setInput(_rootNode);
// set viewer as selection event provider for MBeanView
- getSite().setSelectionProvider(_treeViewer);
-
+ getSite().setSelectionProvider(_treeViewer);
+
// Start worker thread to refresh tree for added or removed objects
- (new Thread(new Worker())).start();
-
+ (new Thread(new Worker())).start();
+
createConfigFile();
_preferences = new PreferenceStore(INI_FILENAME);
-
+
try
{
_preferences.load();
}
- catch(IOException ex)
+ catch (IOException ex)
{
System.out.println(ex);
}
-
+
// load the list of servers already added from file
List<String> serversList = getServerListFromFile();
if (serversList != null)
@@ -945,23 +978,22 @@ public class NavigationView extends ViewPart
_serversRootNode.addChild(serverNode);
}
}
+
_treeViewer.refresh();
-
- }
- /**
- * Passing the focus request to the viewer's control.
- */
- public void setFocus()
- {
+ }
+
+ /**
+ * Passing the focus request to the viewer's control.
+ */
+ public void setFocus()
+ { }
- }
-
public void refresh()
{
_treeViewer.refresh();
}
-
+
/**
* Content provider class for the tree viewer
*/
@@ -971,36 +1003,39 @@ public class NavigationView extends ViewPart
{
return getChildren(parent);
}
-
+
public Object[] getChildren(final Object parentElement)
{
- final TreeObject node = (TreeObject)parentElement;
+ final TreeObject node = (TreeObject) parentElement;
+
return node.getChildren().toArray(new TreeObject[0]);
}
-
+
public Object getParent(final Object element)
{
- final TreeObject node = (TreeObject)element;
+ final TreeObject node = (TreeObject) element;
+
return node.getParent();
}
-
+
public boolean hasChildren(final Object element)
{
final TreeObject node = (TreeObject) element;
+
return !node.getChildren().isEmpty();
}
-
+
public void inputChanged(final Viewer viewer, final Object oldInput, final Object newInput)
{
// Do nothing
}
-
+
public void dispose()
{
// Do nothing
}
}
-
+
/**
* Label provider class for the tree viewer
*/
@@ -1008,28 +1043,32 @@ public class NavigationView extends ViewPart
{
public Image getImage(Object element)
{
- TreeObject node = (TreeObject)element;
+ TreeObject node = (TreeObject) element;
if (node.getType().equals(NOTIFICATION))
{
return ApplicationRegistry.getImage(NOTIFICATION_IMAGE);
}
else if (!node.getType().equals(MBEAN))
{
- if (_treeViewer.getExpandedState(node))
- return ApplicationRegistry.getImage(OPEN_FOLDER_IMAGE);
- else
- return ApplicationRegistry.getImage(CLOSED_FOLDER_IMAGE);
-
+ if (_treeViewer.getExpandedState(node))
+ {
+ return ApplicationRegistry.getImage(OPEN_FOLDER_IMAGE);
+ }
+ else
+ {
+ return ApplicationRegistry.getImage(CLOSED_FOLDER_IMAGE);
+ }
+
}
else
{
return ApplicationRegistry.getImage(MBEAN_IMAGE);
}
}
-
+
public String getText(Object element)
{
- TreeObject node = (TreeObject)element;
+ TreeObject node = (TreeObject) element;
if (node.getType().equals(NODE_TYPE_MBEANTYPE))
{
return node.getName() + "s";
@@ -1039,35 +1078,42 @@ public class NavigationView extends ViewPart
return node.getName();
}
}
-
+
public Font getFont(Object element)
{
- TreeObject node = (TreeObject)element;
+ TreeObject node = (TreeObject) element;
if (node.getType().equals(NODE_TYPE_SERVER))
{
if (node.getChildren().isEmpty())
+ {
return ApplicationRegistry.getFont(FONT_NORMAL);
+ }
else
+ {
return ApplicationRegistry.getFont(FONT_BOLD);
+ }
}
+
return ApplicationRegistry.getFont(FONT_NORMAL);
}
} // End of LabelProviderImpl
-
-
+
private class ViewerSorterImpl extends ViewerSorter
{
public int category(Object element)
{
- TreeObject node = (TreeObject)element;
+ TreeObject node = (TreeObject) element;
if (node.getType().equals(MBEAN))
+ {
return 1;
+ }
+
return 2;
}
}
-
+
/**
- * Worker thread, which keeps looking for new ManagedObjects to be added and
+ * Worker thread, which keeps looking for new ManagedObjects to be added and
* unregistered objects to be removed from the tree.
* @author Bhupendra Bhardwaj
*/
@@ -1075,33 +1121,31 @@ public class NavigationView extends ViewPart
{
public void run()
{
- while(true)
+ while (true)
{
if (!_managedServerMap.isEmpty())
{
- refreshRemovedObjects();
+ refreshRemovedObjects();
refreshClosedServerConnections();
}
-
+
try
{
Thread.sleep(3000);
}
- catch(Exception ex)
- {
+ catch (Exception ex)
+ { }
+
+ } // end of while loop
+ } // end of run method.
+ } // end of Worker class
- }
-
- }// end of while loop
- }// end of run method.
- }// end of Worker class
-
/**
* Adds the mbean to the navigation tree
* @param mbean
* @throws Exception
*/
- public void addManagedBean(ManagedBean mbean)// throws Exception
+ public void addManagedBean(ManagedBean mbean) // throws Exception
{
TreeObject treeServerObject = _managedServerMap.get(mbean.getServer());
List<TreeObject> domains = treeServerObject.getChildren();
@@ -1111,22 +1155,25 @@ public class NavigationView extends ViewPart
if (child.getName().equals(mbean.getDomain()))
{
domain = child;
+
break;
}
}
-
+
addManagedBean(domain, mbean);
_treeViewer.refresh();
}
-
+
private void refreshRemovedObjects()
{
for (ManagedServer server : _managedServerMap.keySet())
{
final ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server);
- if (serverRegistry == null) // server connection is closed
+ if (serverRegistry == null) // server connection is closed
+ {
continue;
-
+ }
+
final List<ManagedBean> removalList = serverRegistry.getObjectsToBeRemoved();
if (removalList != null)
{
@@ -1145,19 +1192,22 @@ public class NavigationView extends ViewPart
if (child.getName().equals(mbean.getDomain()))
{
domain = child;
+
break;
}
}
+
removeManagedObject(domain, mbean);
- //serverRegistry.removeManagedObject(mbean);
+ // serverRegistry.removeManagedObject(mbean);
}
+
_treeViewer.refresh();
}
});
}
}
}
-
+
/**
* Gets the list of closed server connection from the ApplicationRegistry and then removes
* the closed server nodes from the navigation view
@@ -1169,20 +1219,20 @@ public class NavigationView extends ViewPart
{
Display display = getSite().getShell().getDisplay();
display.syncExec(new Runnable()
- {
- public void run()
{
- for (ManagedServer server : closedServers)
+ public void run()
{
- removeManagedObject(_managedServerMap.get(server));
- _managedServerMap.remove(server);
- ApplicationRegistry.removeServer(server);
+ for (ManagedServer server : closedServers)
+ {
+ removeManagedObject(_managedServerMap.get(server));
+ _managedServerMap.remove(server);
+ ApplicationRegistry.removeServer(server);
+ }
+
+ _treeViewer.refresh();
}
-
- _treeViewer.refresh();
- }
- });
+ });
}
}
-
-} \ No newline at end of file
+
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index 013bda5927..e3b0249ed3 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -21,6 +21,7 @@
package org.apache.qpid.ping;
import java.util.List;
+import java.util.Properties;
import javax.jms.Destination;
@@ -31,54 +32,36 @@ import org.apache.qpid.requestreply.PingPongProducer;
* to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues.
* It is an all in one ping client, that produces and consumes its own pings.
*
+ * <p/>The constructor increments a count of the number of ping clients created. It is assumed that where many
+ * are created they will all be run in parallel and be active in sending and consuming pings at the same time.
+ * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear
+ * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of
+ * active ping clients. The {@link #getConsumersPerTopic()} method is used to supply this multiplier under these
+ * conditions.
+ *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create a ping pong producer that listens to its own pings <td> {@link PingPongProducer}
+ * <tr><td> Create a ping producer that listens to its own pings <td> {@link PingPongProducer}
+ * <tr><td> Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings.
* </table>
*/
public class PingClient extends PingPongProducer
{
+ /** Used to count the number of ping clients created. */
private static int _pingClientCount;
/**
- * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
- * for details. This constructor creates ping pong producer but de-registers its reply-to destination message
- * listener, and replaces it by listening to all of its ping destinations.
+ * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+ * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates
+ * producer and consumer sessions on it, to send and recieve its pings and replies on.
*
- * @param brokerDetails The URL of the broker to send pings to.
- * @param username The username to log onto the broker with.
- * @param password The password to log onto the broker with.
- * @param virtualpath The virtual host name to use on the broker.
- * @param destinationName The name (or root where multiple destinations are used) of the desitination to send
- * pings to.
- * @param selector The selector to filter replies with.
- * @param transacted Indicates whether or not pings are sent and received in transactions.
- * @param persistent Indicates whether pings are sent using peristent delivery.
- * @param messageSize Specifies the size of ping messages to send.
- * @param verbose Indicates that information should be printed to the console on every ping.
- * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover.
- * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover.
- * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
- * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover.
- * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all.
- * @param txBatchSize Specifies the number of pings to send in each transaction.
- * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
- * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
- * possible, with no rate restriction.
- * @param pubsub True to ping topics, false to ping queues.
- * @param unique True to use unique destinations for each ping pong producer, false to share.
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
- public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
- boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
- int ackMode, long pausetime) throws Exception
+ public PingClient(Properties overrides) throws Exception
{
- super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
- verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique, ackMode, pausetime);
+ super(overrides);
_pingClientCount++;
}
@@ -94,6 +77,11 @@ public class PingClient extends PingPongProducer
return _pingDestinations;
}
+ /**
+ * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging.
+ *
+ * @return The scaling up of the number of expected pub/sub pings.
+ */
public int getConsumersPerTopic()
{
if (_isUnique)
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
new file mode 100644
index 0000000000..77526141d6
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
@@ -0,0 +1,389 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.util.MathUtils;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
+ * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
+ * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
+ * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
+ * failure conditions when using durable messaging.
+ *
+ * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
+ * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
+ * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
+ * with.
+ *
+ * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
+ *
+ * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
+ * additionally accepts the following parameters:
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> numMessages <th> 100 <th> The total number of messages to send.
+ * <tr><td> duration <th> 30S <th> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
+ * m minutes and s seconds).
+ * </table>
+ *
+ * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
+ * when no parameters are specified.
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
+ * <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
+ * <tr><td> persistent <td> true <td> Only makes sense to test persistent.
+ * <tr><td> commitBatchSize <td> 10
+ * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
+ * </table>
+ *
+ * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
+ * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
+ * wait for the second signal before receiving its pings.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Send and receive pings.
+ * <tr><td> Accept user input to signal stop sending.
+ * <tr><td> Accept user input to signal start receiving.
+ * <tr><td> Provide feedback on pings sent versus pings received.
+ * </table>
+ */
+public class PingDurableClient extends PingPongProducer implements ExceptionListener
+{
+ private static final Logger log = Logger.getLogger(PingDurableClient.class);
+
+ public static final String NUM_MESSAGES_PROPNAME = "numMessages";
+ public static final String NUM_MESSAGES_DEFAULT = "100";
+ public static final String DURATION_PROPNAME = "duration";
+ public static final String DURATION_DEFAULT = "30S";
+
+ /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
+ private static final long TIME_OUT = 3000;
+
+ static
+ {
+ defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
+ defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
+ defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
+ defaults.setProperty(TRANSACTED_PROPNAME, "true");
+ defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
+ defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
+ defaults.setProperty(RATE_PROPNAME, "20");
+ }
+
+ /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
+ private int numMessages;
+
+ /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
+ private long duration;
+
+ /** Used to indciate that this application should terminate. Set by the shutdown hook. */
+ private boolean terminate = false;
+
+ /**
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public PingDurableClient(Properties overrides) throws Exception
+ {
+ super(overrides);
+ log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
+
+ // Extract the additional configuration parameters.
+ ParsedProperties properties = new ParsedProperties(defaults);
+ properties.putAll(overrides);
+
+ numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
+ String durationSpec = properties.getProperty(DURATION_PROPNAME);
+
+ if (durationSpec != null)
+ {
+ duration = MathUtils.parseDuration(durationSpec) * 1000000;
+ }
+ }
+
+ /**
+ * Starts the ping/wait/receive process.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ Properties options = processCommandLine(args);
+ PingDurableClient pingProducer = new PingDurableClient(options);
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ // pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Run the test procedure.
+ int sent = pingProducer.send();
+ pingProducer.waitForUser("Press return to begin receiving the pings.");
+ pingProducer.receive(sent);
+
+ System.exit(0);
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Performs the main test procedure implemented by this ping client. See the class level comment for details.
+ */
+ public int send() throws Exception
+ {
+ log.debug("public void sendWaitReceive(): called");
+
+ log.debug("duration = " + duration);
+ log.debug("numMessages = " + numMessages);
+
+ if (duration > 0)
+ {
+ System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds.");
+ }
+
+ if (_rate > 0)
+ {
+ System.out.println("Sending at " + _rate + " messages per second.");
+ }
+
+ if (numMessages > 0)
+ {
+ System.out.println("Sending up to " + numMessages + " messages.");
+ }
+
+ // Establish the connection and the message producer.
+ establishConnection(true, false);
+ getConnection().start();
+
+ Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+
+ // Send pings until a terminating condition is received.
+ boolean endCondition = false;
+ int messagesSent = 0;
+ int messagesCommitted = 0;
+ int messagesNotCommitted = 0;
+ long start = System.nanoTime();
+
+ // Clear console in.
+ clearConsole();
+
+ while (!endCondition)
+ {
+ boolean committed = false;
+
+ try
+ {
+ committed = sendMessage(messagesSent, message) && _transacted;
+
+ messagesSent++;
+ messagesNotCommitted++;
+
+ // Keep count of the number of messsages currently committed and pending commit.
+ if (committed)
+ {
+ log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
+ messagesCommitted += messagesNotCommitted;
+ messagesNotCommitted = 0;
+
+ System.out.println("Commited: " + messagesCommitted);
+ }
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException whilst sending.");
+ _publish = false;
+ }
+
+ // Determine if the end condition has been met, based on the number of messages, time passed, errors on
+ // the connection or user input.
+ long now = System.nanoTime();
+
+ if ((duration != 0) && ((now - start) > duration))
+ {
+ System.out.println("Send halted because duration expired.");
+ endCondition = true;
+ }
+ else if ((numMessages != 0) && (messagesSent >= numMessages))
+ {
+ System.out.println("Send halted because # messages completed.");
+ endCondition = true;
+ }
+ else if (System.in.available() > 0)
+ {
+ System.out.println("Send halted by user input.");
+ endCondition = true;
+
+ clearConsole();
+ }
+ else if (!_publish)
+ {
+ System.out.println("Send halted by error on the connection.");
+ endCondition = true;
+ }
+ }
+
+ log.debug("messagesSent = " + messagesSent);
+ log.debug("messagesCommitted = " + messagesCommitted);
+ log.debug("messagesNotCommitted = " + messagesNotCommitted);
+
+ System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
+ + ", Messages not Committed = " + messagesNotCommitted);
+
+ // Clean up the connection.
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ // Ignore as did best could manage to clean up.
+ }
+
+ return messagesSent;
+ }
+
+ private void receive(int messagesSent) throws Exception
+ {
+ // Re-establish the connection and the message consumer.
+ _queueJVMSequenceID = new AtomicInteger();
+ _queueSharedID = new AtomicInteger();
+
+ establishConnection(false, true);
+ _consumer.setMessageListener(null);
+ _connection.start();
+
+ // Try to receive all of the pings that were successfully sent.
+ int messagesReceived = 0;
+ boolean endCondition = false;
+
+ while (!endCondition)
+ {
+ // Message received = _consumer.receiveNoWait();
+ Message received = _consumer.receive(TIME_OUT);
+ log.debug("received = " + received);
+
+ if (received != null)
+ {
+ messagesReceived++;
+ }
+
+ // Determine if the end condition has been met, based on the number of messages and time passed since last
+ // receiving a message.
+ if (received == null)
+ {
+ System.out.println("Timed out.");
+ endCondition = true;
+ }
+ else if (messagesReceived >= messagesSent)
+ {
+ System.out.println("Got all messages.");
+ endCondition = true;
+ }
+ }
+
+ log.debug("messagesReceived = " + messagesReceived);
+
+ System.out.println("Messages received: " + messagesReceived);
+
+ // Clean up the connection.
+ close();
+ }
+
+ /**
+ * Clears any pending input from the console.
+ */
+ private void clearConsole()
+ {
+ try
+ {
+ BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
+
+ // System.in.skip(System.in.available());
+ while (bis.ready())
+ {
+ bis.readLine();
+ }
+ }
+ catch (IOException e)
+ { }
+ }
+
+ /**
+ * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
+ * effect of making this pinger listen to its own pings.
+ *
+ * @return The ping destinations.
+ */
+ public List<Destination> getReplyDestinations()
+ {
+ return _pingDestinations;
+ }
+
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
+ * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
+ * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
+ * message should stop, not that the application should termiante.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
+ {
+ public void run()
+ {
+ stop();
+ terminate = true;
+ }
+ });
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index c6a69807a3..44f7083bb5 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -34,20 +34,22 @@ import javax.jms.*;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.topic.Config;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
/**
* PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
@@ -65,31 +67,37 @@ import uk.co.thebadgerset.junit.extensions.Throttle;
* transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
* testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
*
- * <p/><table><caption>CRC Card</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
- * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
- * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
- * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
+ * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
+ * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
+ * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
* <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
- * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
- * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
- * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
- * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
- * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
- * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
- * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
- * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
- * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
- * <tr><td> username <td> guest <td> The username to access the broker with.
- * <tr><td> password <td> guest <td> The password to access the broker with.
- * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
- * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
- * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
- * <tr><td> ackMode <td> NO_ACK <td> The message acknowledgement mode.
- * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
+ * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
+ * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
+ * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
+ * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
+ * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
+ * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
+ * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
+ * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
+ * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
+ * <tr><td> username <td> guest <td> The username to access the broker with.
+ * <tr><td> password <td> guest <td> The password to access the broker with.
+ * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
+ * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
+ * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
+ * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
+ * 0 - SESSION_TRANSACTED
+ * 1 - AUTO_ACKNOWLEDGE
+ * 2 - CLIENT_ACKNOWLEDGE
+ * 3 - DUPS_OK_ACKNOWLEDGE
+ * 257 - NO_ACKNOWLEDGE
+ * 258 - PRE_ACKNOWLEDGE
+ * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
* </table>
*
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
@@ -121,7 +129,7 @@ import uk.co.thebadgerset.junit.extensions.Throttle;
*/
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
{
- private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+ private static final Logger log = Logger.getLogger(PingPongProducer.class);
/** Holds the name of the property to get the test message size from. */
public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
@@ -181,31 +189,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
/** Holds the default failover after commit test flag. */
- public static final String FAIL_AFTER_COMMIT_DEFAULT = "false";
+ public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
/** Holds the name of the proeprty to get the fail before commit flag from. */
public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
/** Holds the default failover before commit test flag. */
- public static final String FAIL_BEFORE_COMMIT_DEFAULT = "false";
+ public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
/** Holds the name of the proeprty to get the fail after send flag from. */
public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
/** Holds the default failover after send test flag. */
- public static final String FAIL_AFTER_SEND_DEFAULT = "false";
+ public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
/** Holds the name of the property to get the fail before send flag from. */
public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
/** Holds the default failover before send test flag. */
- public static final String FAIL_BEFORE_SEND_DEFAULT = "false";
+ public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
/** Holds the name of the property to get the fail once flag from. */
public static final String FAIL_ONCE_PROPNAME = "failOnce";
/** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
- public static final String FAIL_ONCE_DEFAULT = "true";
+ public static final boolean FAIL_ONCE_DEFAULT = true;
/** Holds the name of the property to get the broker access username from. */
public static final String USERNAME_PROPNAME = "username";
@@ -223,7 +231,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String SELECTOR_PROPNAME = "selector";
/** Holds the default message selector. */
- public static final String SELECTOR_DEFAULT = null;
+ public static final String SELECTOR_DEFAULT = "";
/** Holds the name of the proeprty to get the destination count from. */
public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
@@ -253,7 +261,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String ACK_MODE_PROPNAME = "ackMode";
/** Defines the default message acknowledgement mode. */
- public static final int ACK_MODE_DEFAULT = Session.NO_ACKNOWLEDGE;
+ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
/** Holds the name of the property to get the pause between batches property from. */
public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch";
@@ -273,96 +281,140 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
- /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
- private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
-
- /**
- * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
- * ping producers on the same JVM.
- */
- private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ /** Holds the default configuration properties. */
+ public static ParsedProperties defaults = new ParsedProperties();
- /** A convenient formatter to use when time stamping output. */
- protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+ static
+ {
+ defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
+ defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+ defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
+ defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
+ defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
+ defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
+ defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
+ defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
+ defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
+ defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
+ defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT);
+ defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+ }
- /**
- * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
- * creating multiple ping producers in the same JVM.
- */
- protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
-
- /** Holds the destination where the response messages will arrive. */
- private Destination _replyDestination;
+ protected String _brokerDetails;
+ protected String _username;
+ protected String _password;
+ protected String _virtualpath;
+ protected String _destinationName;
+ protected String _selector;
+ protected boolean _transacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
/** Holds the acknowledgement mode used for sending and receiving messages. */
- private int _ackMode = Session.NO_ACKNOWLEDGE;
+ private int _ackMode;
/** Determines what size of messages this producer sends. */
protected int _messageSize;
/** Used to indicate that the ping loop should print out whenever it pings. */
- protected boolean _verbose = VERBOSE_DEFAULT;
+ protected boolean _verbose;
- /** Holds the session on which ping replies are received. */
- protected Session _consumerSession;
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ protected boolean _isPubSub;
- /** Used to restrict the sending rate to a specified limit. */
- private Throttle _rateLimiter = null;
+ /** Flag used to indicate if the destinations should be unique client. */
+ protected boolean _isUnique;
- /** Holds a message listener that this message listener chains all its messages to. */
- private ChainedMessageListener _chainedMessageListener = null;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ protected boolean _failBeforeCommit;
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- protected boolean _isPubSub = PUBSUB_DEFAULT;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ protected boolean _failAfterCommit;
- /** Flag used to indicate if the destinations should be unique client. */
- protected static boolean _isUnique = UNIQUE_DESTS_DEFAULT;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ protected boolean _failBeforeSend;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ protected boolean _failAfterSend;
+
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ protected boolean _failOnce;
+
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ protected int _txBatchSize;
+
+ protected int _noOfDestinations;
+ protected int _rate;
+
+ /** Holds the wait time to insert between every batch of messages committed. */
+ private long _pauseBatch;
+
+ /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
+ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
/**
- * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
- * on the same JVM using this id generator will allow them to ping on the same queues.
+ * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
+ * ping producers on the same JVM.
*/
- protected AtomicInteger _queueSharedId = new AtomicInteger();
+ private static Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
- /** Used to tell the ping loop when to terminate, it only runs while this is true. */
- protected boolean _publish = true;
+ /** A convenient formatter to use when time stamping output. */
+ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/** Holds the connection to the broker. */
- private Connection _connection;
+ protected Connection _connection;
+
+ /** Holds the session on which ping replies are received. */
+ protected Session _consumerSession;
/** Holds the producer session, needed to create ping messages. */
- private Session _producerSession;
+ protected Session _producerSession;
- /** Holds the set of destinations that this ping producer pings. */
- protected List<Destination> _pingDestinations = new ArrayList<Destination>();
+ /** Holds the destination where the response messages will arrive. */
+ protected Destination _replyDestination;
- /** Holds the message producer to send the pings through. */
- protected MessageProducer _producer;
+ /** Holds the set of destinations that this ping producer pings. */
+ protected List<Destination> _pingDestinations;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
- protected boolean _failBeforeCommit = false;
+ /** Used to restrict the sending rate to a specified limit. */
+ protected Throttle _rateLimiter;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
- protected boolean _failAfterCommit = false;
+ /** Holds a message listener that this message listener chains all its messages to. */
+ protected ChainedMessageListener _chainedMessageListener = null;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
- protected boolean _failBeforeSend = false;
+ /**
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
+ */
+ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
- protected boolean _failAfterSend = false;
+ /**
+ * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
+ * on the same JVM using this id generator will allow them to ping on the same queues.
+ */
+ protected AtomicInteger _queueSharedID = new AtomicInteger();
- /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
- protected boolean _failOnce = true;
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
- /** Holds the number of sends that should be performed in every transaction when using transactions. */
- protected int _txBatchSize = TX_BATCH_SIZE_DEFAULT;
+ /** Holds the message producer to send the pings through. */
+ protected MessageProducer _producer;
- /** Holds the wait time to insert between every batch of messages committed. */
- private static long _pauseBatch = PAUSE_AFTER_BATCH_DEFAULT;
+ /** Holds the message consumer to receive the ping replies through. */
+ protected MessageConsumer _consumer;
/**
* Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
@@ -370,202 +422,195 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
static int _consumersPerTopic = 1;
+ /** The prompt to display when asking the user to kill the broker for failover testing. */
+ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+
/**
- * Creates a ping producer with the specified parameters, of which there are many. See their individual comments for
- * details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, to send
- * and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
+ * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+ * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
+ * it, to send and recieve its pings and replies on.
*
- * @param brokerDetails The URL of the broker to send pings to.
- * @param username The username to log onto the broker with.
- * @param password The password to log onto the broker with.
- * @param virtualpath The virtual host name to use on the broker.
- * @param destinationName The name (or root where multiple destinations are used) of the desitination to send pings to.
- * @param selector The selector to filter replies with.
- * @param transacted Indicates whether or not pings are sent and received in transactions.
- * @param persistent Indicates whether pings are sent using peristent delivery.
- * @param messageSize Specifies the size of ping messages to send.
- * @param verbose Indicates that information should be printed to the console on every ping.
- * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test
- * failover.
- * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test
- * failover.
- * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
- * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test
- * failover.
- * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not
- * all.
- * @param txBatchSize Specifies the number of pings to send in each transaction.
- * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
- * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
- * possible, with no rate restriction.
- * @param pubsub True to ping topics, false to ping queues.
- * @param unique True to use unique destinations for each ping pong producer, false to share.
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
- public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
- boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
- boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
+ public PingPongProducer(Properties overrides) throws Exception
{
- _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
- + ", String password = " + password + ", String virtualpath = " + virtualpath
- + ", String destinationName = " + destinationName + ", String selector = " + selector
- + ", boolean transacted = " + transacted + ", boolean persistent = " + persistent
- + ", int messageSize = " + messageSize + ", boolean verbose = " + verbose + ", boolean afterCommit = "
- + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
- + ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
- + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
- + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + ", ackMode = " + ackMode
- + "): called");
-
- // Keep all the relevant options.
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = txBatchSize;
- _isPubSub = pubsub;
- _isUnique = unique;
- _pauseBatch = pause;
-
- if (ackMode != 0)
- {
- _ackMode = ackMode;
- }
+ log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+
+ // Create a set of parsed properties from the defaults overriden by the passed in values.
+ ParsedProperties properties = new ParsedProperties(defaults);
+ properties.putAll(overrides);
+
+ // Extract the configuration properties to set the pinger up with.
+ _brokerDetails = properties.getProperty(BROKER_PROPNAME);
+ _username = properties.getProperty(USERNAME_PROPNAME);
+ _password = properties.getProperty(PASSWORD_PROPNAME);
+ _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
+ _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
+ _selector = properties.getProperty(SELECTOR_PROPNAME);
+ _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
+ _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
+ _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
+ _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
+ _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
+ _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
+ _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
+ _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
+ _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
+ _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+ _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
+ _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
+ _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+ _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
// Check that one or more destinations were specified.
- if (noOfDestinations < 1)
+ if (_noOfDestinations < 1)
{
throw new IllegalArgumentException("There must be at least one destination.");
}
- // Create a connection to the broker.
+ // Set up a throttle to control the send rate, if a rate > 0 is specified.
+ if (_rate > 0)
+ {
+ _rateLimiter = new BatchedThrottle();
+ _rateLimiter.setRate(_rate);
+ }
+
+ // Create the connection and message producers/consumers.
+ // establishConnection(true, true);
+ }
+
+ /**
+ * Establishes a connection to the broker and creates message consumers and producers based on the parameters
+ * that this ping client was created with.
+ *
+ * @param producer Flag to indicate whether or not the producer should be set up.
+ * @param consumer Flag to indicate whether or not the consumers should be set up.
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public void establishConnection(boolean producer, boolean consumer) throws Exception
+ {
+ log.debug("public void establishConnection(): called");
+
+ // Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
String clientID = address.getHostName() + System.currentTimeMillis();
- _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
+ // Create a connection to the broker.
+ createConnection(clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
- _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
+ _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+ _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
- // Set up a throttle to control the send rate, if a rate > 0 is specified.
- if (rate > 0)
+ // Create the destinations to send pings to and receive replies from.
+ _replyDestination = _consumerSession.createTemporaryQueue();
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+
+ // Create the message producer only if instructed to.
+ if (producer)
{
- _rateLimiter = new BatchedThrottle();
- _rateLimiter.setRate(rate);
+ createProducer();
}
- // Create the temporary queue for replies.
- _replyDestination = _consumerSession.createTemporaryQueue();
+ // Create the message consumer only if instructed to.
+ if (consumer)
+ {
+ createReplyConsumers(getReplyDestinations(), _selector);
+ }
+ }
- // Create the producer and the consumers for all reply destinations.
- createProducer();
- createPingDestinations(noOfDestinations, selector, destinationName, unique);
- createReplyConsumers(getReplyDestinations(), selector);
+ /**
+ * Establishes a connection to the broker, based on the configuration parameters that this ping client was
+ * created with.
+ *
+ * @param clientID The clients identifier.
+ *
+ * @throws AMQException Any underlying exceptions are allowed to fall through.
+ * @throws URLSyntaxException Any underlying exceptions are allowed to fall through.
+ */
+ protected void createConnection(String clientID) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs to be
- * started to bounce the pings back again.
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
+ * to be started to bounce the pings back again.
*
* @param args The command line arguments.
- *
- * @throws Exception When something went wrong with the test
*/
- public static void main(String[] args) throws Exception
+ public static void main(String[] args)
{
- // Extract the command line.
- Config config = new Config();
- config.setOptions(args);
- if (args.length == 0)
+ try
{
- _logger.info("Running test with default values...");
- // usage();
- // System.exit(0);
- }
+ Properties options = processCommandLine(args);
- String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = VIRTUAL_HOST_DEFAULT;
- String selector = (config.getSelector() == null) ? SELECTOR_DEFAULT : config.getSelector();
- boolean verbose = true;
- boolean transacted = config.isTransacted();
- boolean persistent = config.usePersistentMessages();
- int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT;
- // int messageCount = config.getMessages();
- int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT;
- int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT;
- int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT;
- boolean pubsub = config.isPubSub();
-
- String destName = config.getDestination();
- if (destName == null)
- {
- destName = PING_QUEUE_NAME_DEFAULT;
- }
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ PingPongProducer pingProducer = new PingPongProducer(options);
+ pingProducer.establishConnection(true, true);
- boolean afterCommit = false;
- boolean beforeCommit = false;
- boolean afterSend = false;
- boolean beforeSend = false;
- boolean failOnce = false;
+ // Start the ping producers dispatch thread running.
+ pingProducer.getConnection().start();
- for (String arg : args)
- {
- if (arg.startsWith("failover:"))
- {
- // failover:<before|after>:<send:commit> | failover:once
- String[] parts = arg.split(":");
- if (parts.length == 3)
- {
- if (parts[2].equals("commit"))
- {
- afterCommit = parts[1].equals("after");
- beforeCommit = parts[1].equals("before");
- }
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
- if (parts[2].equals("send"))
- {
- afterSend = parts[1].equals("after");
- beforeSend = parts[1].equals("before");
- }
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ pingProducer.getConnection().setExceptionListener(pingProducer);
- if (parts[1].equals("once"))
- {
- failOnce = true;
- }
- }
- else
- {
- System.out.println("Unrecognized failover request:" + arg);
- }
- }
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.run();
+ pingThread.join();
}
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
- // Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, USERNAME_DEFAULT, PASSWORD_DEFAULT, virtualpath, destName, selector,
- transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
+ /**
+ * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+ * a map of properties containing them.
+ *
+ * @param args The command line.
+ *
+ * @return A set of properties containing all name=value pairs from the command line.
+ *
+ * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
+ * CommandLineParser class.
+ */
+ protected static Properties processCommandLine(String[] args)
+ {
+ // Use the command line parser to evaluate the command line.
+ CommandLineParser commandLine = new CommandLineParser(new String[][] {});
- pingProducer.getConnection().start();
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+ try
+ {
+ options = commandLine.parseCommandLine(args);
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingProducer.getConnection().setExceptionListener(pingProducer);
+ // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+ // overridden values from there.
+ commandLine.addCommandLineToSysProperties();
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingProducer);
- pingThread.run();
- pingThread.join();
+ return options;
}
/**
@@ -582,9 +627,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- {
- // ignore
- }
+ { }
}
}
@@ -596,27 +639,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public List<Destination> getReplyDestinations()
{
- _logger.debug("public List<Destination> getReplyDestinations(): called");
+ log.debug("public List<Destination> getReplyDestinations(): called");
List<Destination> replyDestinations = new ArrayList<Destination>();
replyDestinations.add(_replyDestination);
+ log.debug("replyDestinations = " + replyDestinations);
+
return replyDestinations;
}
/**
- * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery flag
- * is set accoring the ping producer creation options.
+ * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+ * flag is set accoring the ping producer creation options.
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createProducer() throws JMSException
{
- _logger.debug("public void createProducer(): called");
+ log.debug("public void createProducer(): called");
_producer = (MessageProducer) _producerSession.createProducer(null);
- // _producer.setDisableMessageTimestamp(true);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
}
/**
@@ -632,13 +678,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ throws JMSException
{
- _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
- + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
- + unique + "): called");
+ log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+
+ _pingDestinations = new ArrayList<Destination>();
// Create the desired number of ping destinations and consumers for them.
+ log.debug("Creating " + noOfDestinations + " destinations to ping.");
+
for (int i = 0; i < noOfDestinations; i++)
{
AMQDestination destination;
@@ -648,26 +697,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
- _logger.debug("Creating unique destinations.");
+ log.debug("Creating unique destinations.");
id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
- _logger.debug("Creating shared destinations.");
- id = "_" + _queueSharedId.incrementAndGet();
+ log.debug("Creating shared destinations.");
+ id = "_" + _queueSharedID.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- _logger.debug("Created topic " + destination);
+ log.debug("Created topic " + destination);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
- _logger.debug("Created queue " + destination);
+ log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -676,6 +725,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
+ * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+ *
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
+ *
+ * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+ {
+ log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");
+
+ for (Destination destination : destinations)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ _consumer =
+ _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
+ _consumer.setMessageListener(this);
+ }
+ }
+
+ /**
* Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating
* reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.
*
@@ -683,13 +755,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void onMessage(Message message)
{
- _logger.debug("public void onMessage(Message message): called");
+ log.debug("public void onMessage(Message message): called");
try
{
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- _logger.debug("correlationID = " + correlationID);
+ log.debug("correlationID = " + correlationID);
// Countdown on the traffic light if there is one for the matching correlation id.
PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
@@ -701,7 +773,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
@@ -716,20 +788,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- _logger.debug("remainingCount = " + remainingCount);
- _logger.debug("trueCount = " + trueCount);
+ log.debug("remainingCount = " + remainingCount);
+ log.debug("trueCount = " + trueCount);
// Commit on transaction batch size boundaries. At this point in time the waiting producer remains
// blocked, even on the last message.
if ((remainingCount % _txBatchSize) == 0)
{
commitTx(_consumerSession);
- if (!_consumerSession.getTransacted() &&
- _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- // Acknowledge the messages when the session is not transacted but client_ack
- ((AMQSession) _consumerSession).acknowledge();
- }
}
// Forward the message and remaining count to any interested chained message listener.
@@ -748,7 +814,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
else
{
- _logger.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Got unexpected message with correlationId: " + correlationID);
}
// Print out ping times for every message in verbose mode only.
@@ -759,48 +825,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if (timestamp != null)
{
long diff = System.nanoTime() - timestamp;
- _logger.trace("Time for round trip (nanos): " + diff);
+ log.trace("Time for round trip (nanos): " + diff);
}
}
}
catch (JMSException e)
{
- _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ log.warn("There was a JMSException: " + e.getMessage(), e);
}
- _logger.debug("public void onMessage(Message message): ending");
- }
-
- /**
- * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a
- * reply arrives, then a null reply is returned from this method. This method generates a new unqiue correlation id for
- * the messages.
- *
- * @param message The message to send.
- * @param numPings The number of ping messages to send.
- * @param timeout The timeout in milliseconds.
- *
- * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
- * all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- * @throws InterruptedException When interrupted by a timeout.
- */
- public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
- {
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + "): called");
-
- // Create a unique correlation id to put on the messages before sending them.
- String messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
-
- return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
- }
-
- public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
- {
- return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ log.debug("public void onMessage(Message message): ending");
}
/**
@@ -808,10 +842,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* reply arrives, then a null reply is returned from this method. This method allows the caller to specify the
* correlation id.
*
- * @param message The message to send.
+ * @param message The message to send. If this is null, one is generated.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- * @param messageCorrelationId The message correlation id.
+ * @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
* all prematurely.
@@ -820,10 +854,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+
+ // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
+ if (messageCorrelationId == null)
+ {
+ messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
+ }
try
{
@@ -858,31 +898,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- _logger.debug("numReplies = " + numReplies);
- _logger.debug("allMessagesReceived = " + allMessagesReceived);
+ log.debug("numReplies = " + numReplies);
+ log.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
long lastMessageReceievedAt = perCorrelationId.timeOutStart;
timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- _logger.debug("now = " + now);
- _logger.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ log.debug("now = " + now);
+ log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
}
while (!timedOut && !allMessagesReceived);
if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
else if (_verbose)
{
- _logger.info("Got all replies on id, " + messageCorrelationId);
+ log.info("Got all replies on id, " + messageCorrelationId);
}
commitTx(_consumerSession);
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -905,8 +945,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
if (message == null)
{
@@ -923,40 +963,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there are uncommitted messages.
+ // Reset the committed flag to indicate that there may be uncommitted messages.
committed = false;
// Re-timestamp the message.
message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
- // Round robin the destinations as the messages are sent.
- // return _destinationCount;
- sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
-
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
- {
- _rateLimiter.throttle();
- }
-
- // Call commit every time the commit batch size is reached.
- if ((i % _txBatchSize) == 0)
- {
- commitTx(_producerSession);
- committed = true;
-
- /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
- Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
- the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
- */
- pause(_pauseBatch);
- }
+ // Send the message, passing in the message count.
+ committed = sendMessage(i, message);
// Spew out per message timings on every message sonly in verbose mode.
if (_verbose)
{
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
- + messageCorrelationId);
+ log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
}
}
@@ -968,7 +987,70 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
+ * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
+ * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
+ * than one), and to determine if the transaction batch size has been reached and the sent messages should be
+ * committed.
+ *
+ * @param i The count of messages sent so far in a loop of multiple calls to this send method.
+ * @param message The message to send.
+ *
+ * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
+ */
+ protected boolean sendMessage(int i, Message message) throws JMSException
+ {
+ // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
+
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i % _pingDestinations.size());
+
+ // Prompt the user to kill the broker when doing failover testing.
+ if (_failBeforeSend)
+ {
+ if (_failOnce)
+ {
+ _failBeforeSend = false;
+ }
+
+ log.trace("Failing Before Send");
+ waitForUser(KILL_BROKER_PROMPT);
+ }
+
+ // Send the message either to its round robin destination, or its default destination.
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
+
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ committed = commitTx(_producerSession);
+ }
+
+ return committed;
+ }
+
+ /**
+ * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction
+ * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared,
+ * which will terminate the pinger.
*/
public void pingLoop()
{
@@ -979,25 +1061,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT);
+ pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
}
catch (JMSException e)
{
_publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ log.debug("There was a JMSException: " + e.getMessage(), e);
}
catch (InterruptedException e)
{
_publish = false;
- _logger.debug("There was an interruption: " + e.getMessage(), e);
+ log.debug("There was an interruption: " + e.getMessage(), e);
}
}
- public Destination getReplyDestination()
- {
- return getReplyDestinations().get(0);
- }
-
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.
*
@@ -1038,8 +1115,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag has
- * been cleared.
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
*/
public void stop()
{
@@ -1066,8 +1143,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void onException(JMSException e)
{
+ log.debug("public void onException(JMSException e = " + e + "): called", e);
_publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
}
/**
@@ -1079,12 +1156,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public Thread getShutdownHook()
{
return new Thread(new Runnable()
- {
- public void run()
{
- stop();
- }
- });
+ public void run()
+ {
+ stop();
+ }
+ });
}
/**
@@ -1098,40 +1175,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
- *
- * @param destinations The destinations to listen to.
- * @param selector A selector to filter the messages with.
- *
- * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
- */
- public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
- {
- _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
-
- for (Destination destination : destinations)
- {
- // Create a consumer for the destination and set this pinger to listen to its messages.
- MessageConsumer consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
- consumer.setMessageListener(this);
- }
- }
-
- /**
* Closes the pingers connection.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
public void close() throws JMSException
{
- _logger.debug("public void close(): called");
+ log.debug("public void close(): called");
- if (_connection != null)
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ finally
{
- _connection.close();
+ _connection = null;
+ _producerSession = null;
+ _consumerSession = null;
+ _producer = null;
+ _consumer = null;
+ _pingDestinations = null;
+ _replyDestination = null;
}
}
@@ -1150,25 +1217,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*
+ * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+ *
* @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
* method, because commits only apply to transactional pingers, but fail after send applied to transactional and
* non-transactional alike.
*/
- protected void commitTx(Session session) throws JMSException
+ protected boolean commitTx(Session session) throws JMSException
{
- _logger.debug("protected void commitTx(Session session): called");
+ log.debug("protected void commitTx(Session session): called");
+
+ boolean committed = false;
- _logger.trace("Batch time reached");
+ log.trace("Batch time reached");
if (_failAfterSend)
{
- _logger.trace("Batch size reached");
+ log.trace("Batch size reached");
if (_failOnce)
{
_failAfterSend = false;
}
- _logger.trace("Failing After Send");
- doFailover();
+ log.trace("Failing After Send");
+ waitForUser(KILL_BROKER_PROMPT);
}
if (session.getTransacted())
@@ -1182,13 +1253,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_failBeforeCommit = false;
}
- _logger.trace("Failing Before Commit");
- doFailover();
+ log.trace("Failing Before Commit");
+ waitForUser(KILL_BROKER_PROMPT);
}
- long l = System.currentTimeMillis();
+ long l = System.nanoTime();
session.commit();
- _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms");
+ committed = true;
+ log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1197,84 +1269,56 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_failAfterCommit = false;
}
- _logger.trace("Failing After Commit");
- doFailover();
+ log.trace("Failing After Commit");
+ waitForUser(KILL_BROKER_PROMPT);
}
- _logger.trace("Session Commited.");
+ log.trace("Session Commited.");
}
catch (JMSException e)
{
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
+ log.trace("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
{
- _logger.debug("No consumers on queue.");
+ log.debug("No consumers on queue.");
}
try
{
session.rollback();
- _logger.trace("Message rolled back.");
+ log.trace("Message rolled back.");
}
catch (JMSException jmse)
{
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+ log.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
}
}
}
+
+ return committed;
}
/**
- * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination of
- * the ping producer. If an explicit destination is set, this overrides the default.
+ * Outputs a prompt to the console and waits for the user to press return.
*
- * @param destination The destination to send to.
- * @param message The message to send.
- *
- * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ * @param prompt The prompt to display on the console.
*/
- protected void sendMessage(Destination destination, Message message) throws JMSException
+ protected void waitForUser(String prompt)
{
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
-
- _logger.trace("Failing Before Send");
- doFailover();
- }
-
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
- }
+ System.out.println(prompt);
- /**
- * Prompts the user to terminate the broker, in order to test failover functionality. This method will block until the
- * user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now then press return");
try
{
System.in.read();
}
catch (IOException e)
{
- // ignore
+ // Ignored.
}
System.out.println("Continuing.");
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index a782058fd4..3b8e670f8f 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -86,7 +86,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// Sets up the test parameters with defaults.
testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
- Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
+ Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT));
}
/**
@@ -159,7 +159,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
@@ -247,8 +247,8 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
String correlationId = message.getJMSCorrelationID();
_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount
- + "): called on batch boundary for message id: " + correlationId + " with thread id: "
- + Thread.currentThread().getId());
+ + "): called on batch boundary for message id: " + correlationId + " with thread id: "
+ + Thread.currentThread().getId());
// Get the details for the correlation id and check that they are not null. They can become null
// if a test times out.
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
index 620ddd13f7..b303e16d2c 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
@@ -175,7 +175,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < numPings)
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index 6fb9d543a0..fd3bc3ff23 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -66,19 +66,22 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
/** Holds a property reader to extract the test parameters from. */
- protected ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+ protected ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
public PingTestPerf(String name)
{
super(name);
+ _logger.debug("testParameters = " + testParameters);
+
// Sets up the test parameters with defaults.
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT);
testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- PingPongProducer.PERSISTENT_MODE_DEFAULT);
+ PingPongProducer.PERSISTENT_MODE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
@@ -90,20 +93,20 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- PingPongProducer.DESTINATION_COUNT_DEFAULT);
+ PingPongProducer.DESTINATION_COUNT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
}
/**
@@ -139,20 +142,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
// Fail the test if the timeout was exceeded.
if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
{
Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
- + numReplies);
+ + numReplies);
}
}
@@ -167,43 +168,13 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
{
PerThreadSetup perThreadSetup = new PerThreadSetup();
- // Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
- String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
- String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
- String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
- String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
- boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
- boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
- String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
- boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
- int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
- boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
- boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
- boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
- boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
- boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
- int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME);
- Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
- boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME);
- int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
- int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
-
- // Extract the test set up paramaeters.
- int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.DESTINATION_COUNT_PROPNAME));
-
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
synchronized (this)
{
// Establish a client to ping a Destination and listen the reply back from same Destination
- perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
- selector, transacted, persistent, messageSize, verbose,
- failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, destinationscount, rate, pubsub, unique,
- ackMode, pausetime);
+ perThreadSetup._pingClient = new PingClient(testParameters);
+ perThreadSetup._pingClient.establishConnection(true, true);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
index a75cbf7e19..a09324b568 100644
--- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -69,52 +69,55 @@ public class PingPongTestPerf extends AsymptoticTestCase
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
// of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
// private Properties testParameters = System.getProperties();
- private ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+ private ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
public PingPongTestPerf(String name)
{
super(name);
+ _logger.debug(testParameters);
+
// Sets up the test parameters with defaults.
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
+ Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
+ Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
- Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
+ Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
- Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
+ Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
- Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
+ Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
+ Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
- Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
+ Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
- Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
+ Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
}
/**
@@ -147,14 +150,12 @@ public class PingPongTestPerf extends AsymptoticTestCase
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// Send the message and wait for a reply.
int numReplies =
- perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT);
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
@@ -182,37 +183,21 @@ public class PingPongTestPerf extends AsymptoticTestCase
boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
- int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
- boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
- boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
- boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
- boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
- int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME);
- Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
- boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME);
- int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
- long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
synchronized (this)
{
// Establish a bounce back client on the ping queue to bounce back the pings.
- perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath,
- destinationName, persistent, transacted, selector,
- verbose, pubsub);
+ perThreadSetup._testPingBouncer =
+ new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
+ transacted, selector, verbose, pubsub);
// Start the connections for client and producer running.
perThreadSetup._testPingBouncer.getConnection().start();
- // Establish a ping-pong client on the ping queue to send the pings with.
-
- perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath,
- destinationName, selector, transacted, persistent,
- messageSize, verbose, failAfterCommit,
- failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, 0, rate, pubsub, unique,
- ackMode, pause);
+ // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
+ perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
+ perThreadSetup._testPingProducer.establishConnection(true, true);
perThreadSetup._testPingProducer.getConnection().start();
}