diff options
author | Robert Greig <rgreig@apache.org> | 2007-04-05 11:47:50 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-04-05 11:47:50 +0000 |
commit | 4547988868ea17dd34064204de84e66206b16d5b (patch) | |
tree | 2ffa201455021216f7f76dd00ef9f9110df53da4 | |
parent | 67674e50665e7def7b90569e3b3d33c3f047db5b (diff) | |
download | qpid-python-4547988868ea17dd34064204de84e66206b16d5b.tar.gz |
Merged revisions 522994-523245 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r522994 | rgreig | 2007-03-27 17:48:23 +0100 (Tue, 27 Mar 2007) | 1 line
Test added for durability of messages under broker failure.
........
r523245 | rgreig | 2007-03-28 10:30:49 +0100 (Wed, 28 Mar 2007) | 1 line
Reversed accidental replacing of the word 'initialize' in comments to 'establishConnection' through a method refactoring.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525800 13f79535-47bb-0310-9956-ffa450edef68
9 files changed, 1282 insertions, 855 deletions
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java index cd8e0a80a1..bad49060ca 100644 --- a/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java +++ b/java/integrationtests/src/main/java/org/apache/qpid/util/ClasspathScanner.java @@ -61,10 +61,10 @@ public class ClasspathScanner * @return All the classes that match this collector.
*/
public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass, String matchingRegexp,
- boolean beanOnly)
+ boolean beanOnly)
{
log.debug("public static <T> Collection<Class<? extends T>> getMatches(Class<T> matchingClass = " + matchingClass
- + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
+ + ", String matchingRegexp = " + matchingRegexp + ", boolean beanOnly = " + beanOnly + "): called");
// Build a compiled regular expression from the pattern to match.
Pattern matchPattern = Pattern.compile(matchingRegexp);
@@ -95,11 +95,11 @@ public class ClasspathScanner * iteration.
*/
private static <T> void gatherFiles(File classRoot, String classFileName, Map<String, Class<? extends T>> result,
- Pattern matchPattern, Class<? extends T> matchClass)
+ Pattern matchPattern, Class<? extends T> matchClass)
{
log.debug("private static <T> void gatherFiles(File classRoot = " + classRoot + ", String classFileName = "
- + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
- + ", Class<? extends T> matchClass = " + matchClass + "): called");
+ + classFileName + ", Map<String, Class<? extends T>> result, Pattern matchPattern = " + matchPattern
+ + ", Class<? extends T> matchClass = " + matchClass + "): called");
File thisRoot = new File(classRoot, classFileName);
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java index e9215a4876..a861405d30 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,8 +20,6 @@ */ package org.apache.qpid.management.ui.views; -import static org.apache.qpid.management.ui.Constants.*; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -29,12 +27,14 @@ import java.util.HashMap; import java.util.List; import org.apache.qpid.management.ui.ApplicationRegistry; +import static org.apache.qpid.management.ui.Constants.*; import org.apache.qpid.management.ui.ManagedBean; import org.apache.qpid.management.ui.ManagedServer; import org.apache.qpid.management.ui.ServerRegistry; import org.apache.qpid.management.ui.exceptions.InfoRequiredException; import org.apache.qpid.management.ui.jmx.JMXServerRegistry; import org.apache.qpid.management.ui.jmx.MBeanUtility; + import org.eclipse.jface.preference.PreferenceStore; import org.eclipse.jface.viewers.DoubleClickEvent; import org.eclipse.jface.viewers.IDoubleClickListener; @@ -48,6 +48,7 @@ import org.eclipse.jface.viewers.TreeExpansionEvent; import org.eclipse.jface.viewers.TreeViewer; import org.eclipse.jface.viewers.Viewer; import org.eclipse.jface.viewers.ViewerSorter; + import org.eclipse.swt.SWT; import org.eclipse.swt.graphics.Font; import org.eclipse.swt.graphics.Image; @@ -62,6 +63,7 @@ import org.eclipse.swt.widgets.MenuItem; import org.eclipse.swt.widgets.Shell; import org.eclipse.swt.widgets.Tree; import org.eclipse.swt.widgets.TreeItem; + import org.eclipse.ui.part.ViewPart; /** @@ -71,29 +73,29 @@ import org.eclipse.ui.part.ViewPart; */ public class NavigationView extends ViewPart { - public static final String ID = "org.apache.qpid.management.ui.navigationView"; + public static final String ID = "org.apache.qpid.management.ui.navigationView"; public static final String INI_FILENAME = System.getProperty("user.home") + File.separator + "qpidManagementConsole.ini"; - + private static final String INI_SERVERS = "Servers"; private static final String INI_QUEUES = QUEUE + "s"; private static final String INI_CONNECTIONS = CONNECTION + "s"; private static final String INI_EXCHANGES = EXCHANGE + "s"; - + private TreeViewer _treeViewer = null; private TreeObject _rootNode = null; private TreeObject _serversRootNode = null; - + private PreferenceStore _preferences; // Map of connected servers private HashMap<ManagedServer, TreeObject> _managedServerMap = new HashMap<ManagedServer, TreeObject>(); - + private void createTreeViewer(Composite parent) { _treeViewer = new TreeViewer(parent); _treeViewer.setContentProvider(new ContentProviderImpl()); - _treeViewer.setLabelProvider(new LabelProviderImpl()); + _treeViewer.setLabelProvider(new LabelProviderImpl()); _treeViewer.setSorter(new ViewerSorterImpl()); - + // layout the tree viewer below the label field, to cover the area GridData layoutData = new GridData(); layoutData = new GridData(); @@ -103,118 +105,124 @@ public class NavigationView extends ViewPart layoutData.verticalAlignment = GridData.FILL; _treeViewer.getControl().setLayoutData(layoutData); _treeViewer.setUseHashlookup(true); - + createListeners(); } - + /** * Creates listeners for the JFace treeviewer */ private void createListeners() { - _treeViewer.addDoubleClickListener(new IDoubleClickListener() { + _treeViewer.addDoubleClickListener(new IDoubleClickListener() + { public void doubleClick(DoubleClickEvent event) { - IStructuredSelection ss = (IStructuredSelection)event.getSelection(); - if (ss == null || ss.getFirstElement() == null) + IStructuredSelection ss = (IStructuredSelection) event.getSelection(); + if ((ss == null) || (ss.getFirstElement() == null)) { return; } + boolean state = _treeViewer.getExpandedState(ss.getFirstElement()); _treeViewer.setExpandedState(ss.getFirstElement(), !state); } }); - - _treeViewer.addTreeListener(new ITreeViewerListener() { - public void treeExpanded(TreeExpansionEvent event) - { - _treeViewer.setExpandedState(event.getElement(), true); - // Following will cause the selection event to be sent, so commented - //_treeViewer.setSelection(new StructuredSelection(event.getElement())); - _treeViewer.refresh(); - } - public void treeCollapsed(TreeExpansionEvent event) + _treeViewer.addTreeListener(new ITreeViewerListener() { - _treeViewer.setExpandedState(event.getElement(), false); - _treeViewer.refresh(); - } - }); - + public void treeExpanded(TreeExpansionEvent event) + { + _treeViewer.setExpandedState(event.getElement(), true); + // Following will cause the selection event to be sent, so commented + // _treeViewer.setSelection(new StructuredSelection(event.getElement())); + _treeViewer.refresh(); + } + + public void treeCollapsed(TreeExpansionEvent event) + { + _treeViewer.setExpandedState(event.getElement(), false); + _treeViewer.refresh(); + } + }); + // This listener is for popup menu, which pops up if a queue,exchange or connection is selected // with right click. - _treeViewer.getTree().addListener(SWT.MenuDetect, new Listener () { - Display display = getSite().getShell().getDisplay(); - final Shell shell = new Shell (display); - - public void handleEvent(Event event) + _treeViewer.getTree().addListener(SWT.MenuDetect, new Listener() { - Tree widget = (Tree)event.widget; - TreeItem[] items = widget.getSelection(); - if (items == null) return; - - // Get the selected node - final TreeObject selectedNode = (TreeObject)items[0].getData(); - final TreeObject parentNode = selectedNode.getParent(); - - // This popup is only for mbeans and only connection,exchange and queue types - if (parentNode == null || - !MBEAN.equals(selectedNode.getType()) || - !(CONNECTION.equals(parentNode.getName()) || - QUEUE.equals(parentNode.getName()) || - EXCHANGE.equals(parentNode.getName())) - ) + Display display = getSite().getShell().getDisplay(); + final Shell shell = new Shell(display); + + public void handleEvent(Event event) { - return; - } - - Menu menu = new Menu (shell, SWT.POP_UP); - MenuItem item = new MenuItem (menu, SWT.PUSH); - // Add the action item, which will remove the node from the tree if selected - item.setText(ACTION_REMOVE_MBEANNODE); - item.addListener (SWT.Selection, new Listener () { - public void handleEvent (Event e) + Tree widget = (Tree) event.widget; + TreeItem[] items = widget.getSelection(); + if (items == null) { - removeManagedObject(parentNode, (ManagedBean)selectedNode.getManagedObject()); - _treeViewer.refresh(); - // set the selection to the parent node - _treeViewer.setSelection(new StructuredSelection(parentNode)); + return; } - }); - menu.setLocation (event.x, event.y); - menu.setVisible (true); - while (!menu.isDisposed () && menu.isVisible ()) - { - if (!display.readAndDispatch ()) + + // Get the selected node + final TreeObject selectedNode = (TreeObject) items[0].getData(); + final TreeObject parentNode = selectedNode.getParent(); + + // This popup is only for mbeans and only connection,exchange and queue types + if ((parentNode == null) || !MBEAN.equals(selectedNode.getType()) + || !(CONNECTION.equals(parentNode.getName()) || QUEUE.equals(parentNode.getName()) + || EXCHANGE.equals(parentNode.getName()))) { - display.sleep (); + return; } + + Menu menu = new Menu(shell, SWT.POP_UP); + MenuItem item = new MenuItem(menu, SWT.PUSH); + // Add the action item, which will remove the node from the tree if selected + item.setText(ACTION_REMOVE_MBEANNODE); + item.addListener(SWT.Selection, new Listener() + { + public void handleEvent(Event e) + { + removeManagedObject(parentNode, (ManagedBean) selectedNode.getManagedObject()); + _treeViewer.refresh(); + // set the selection to the parent node + _treeViewer.setSelection(new StructuredSelection(parentNode)); + } + }); + menu.setLocation(event.x, event.y); + menu.setVisible(true); + while (!menu.isDisposed() && menu.isVisible()) + { + if (!display.readAndDispatch()) + { + display.sleep(); + } + } + + menu.dispose(); } - menu.dispose (); - } - }); - } - + }); + } + /** * Creates Qpid Server connection using JMX RMI protocol * @param server * @throws Exception */ private void createRMIServerConnection(ManagedServer server) throws Exception - { + { try { // Currently Qpid Management Console only supports JMX MBeanServer - ServerRegistry serverRegistry = new JMXServerRegistry(server); - ApplicationRegistry.addServer(server, serverRegistry); + ServerRegistry serverRegistry = new JMXServerRegistry(server); + ApplicationRegistry.addServer(server, serverRegistry); } - catch(Exception ex) + catch (Exception ex) { ex.printStackTrace(); throw new Exception("Error in connecting to Qpid broker at " + server.getUrl(), ex); } } - + /** * Adds a new server node in the navigation view if server connection is successful. * @param transportProtocol @@ -223,15 +231,15 @@ public class NavigationView extends ViewPart * @param domain * @throws Exception */ - public void addNewServer(String transportProtocol, String host, int port, - String domain, String user, String pwd) throws Exception + public void addNewServer(String transportProtocol, String host, int port, String domain, String user, String pwd) + throws Exception { String serverAddress = host + ":" + port; String url = null; ManagedServer managedServer = new ManagedServer(host, port, domain, user, pwd); - + if ("RMI".equals(transportProtocol)) - { + { url = managedServer.getUrl(); List<TreeObject> list = _serversRootNode.getChildren(); for (TreeObject node : list) @@ -242,10 +250,11 @@ public class NavigationView extends ViewPart // Set the server node as selected and then connect it. _treeViewer.setSelection(new StructuredSelection(node)); reconnect(user, pwd); + return; } } - + // The server is not in the list of already added servers, so now connect and add it. managedServer.setName(serverAddress); createRMIServerConnection(managedServer); @@ -254,28 +263,28 @@ public class NavigationView extends ViewPart { throw new InfoRequiredException(transportProtocol + " transport is not supported"); } - + // Server connection is successful. Now add the server in the tree TreeObject serverNode = new TreeObject(serverAddress, NODE_TYPE_SERVER); serverNode.setUrl(url); serverNode.setManagedObject(managedServer); _serversRootNode.addChild(serverNode); - + // Add server in the connected server map _managedServerMap.put(managedServer, serverNode); - + // populate the server tree - populateServer(serverNode); - + populateServer(serverNode); + // Add the Queue/Exchanges/Connections from config file into the navigation tree addConfiguredItems(managedServer); - + _treeViewer.refresh(); - - // save server address in file + + // save server address in file addServerInConfigFile(serverAddress); } - + /** * Create the config file, if it doesn't already exist. * Exits the application if the file could not be created. @@ -290,29 +299,29 @@ public class NavigationView extends ViewPart file.createNewFile(); } } - catch(IOException ex) + catch (IOException ex) { System.out.println("Could not write to the file " + INI_FILENAME); System.out.println(ex); System.exit(1); } } - + /** * Server addresses are stored in a file. When user launches the application again, the * server addresses are picked up from the file and shown in the navigfation view. This method - * adds the server address in a file, when a new server is added in the navigation view. + * adds the server address in a file, when a new server is added in the navigation view. * @param serverAddress */ private void addServerInConfigFile(String serverAddress) { // Check if the address already exists List<String> list = getServerListFromFile(); - if (list != null && list.contains(serverAddress)) + if ((list != null) && list.contains(serverAddress)) { return; } - + // Get the existing server list and add to that String servers = _preferences.getString(INI_SERVERS); String value = (servers.length() != 0) ? (servers + "," + serverAddress) : serverAddress; @@ -321,12 +330,12 @@ public class NavigationView extends ViewPart { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { System.err.println("Could not add " + serverAddress + " in " + INI_SERVERS + " (" + INI_FILENAME + ")"); System.out.println(ex); } - } + } /** * Adds the item (Queue/Exchange/Connection) to the config file @@ -337,20 +346,20 @@ public class NavigationView extends ViewPart */ private void addItemInConfigFile(TreeObject node) { - ManagedBean mbean = (ManagedBean)node.getManagedObject(); + ManagedBean mbean = (ManagedBean) node.getManagedObject(); String server = mbean.getServer().getName(); String virtualhost = mbean.getVirtualHostName(); String type = node.getParent().getName() + "s"; String name = node.getName(); String itemKey = server + "." + virtualhost + "." + type; - + // Check if the item already exists in the config file List<String> list = getConfiguredItemsFromFile(itemKey); - if (list != null && list.contains(name)) + if ((list != null) && list.contains(name)) { return; } - + // Add this item to the existing list of items String items = _preferences.getString(itemKey); String value = (items.length() != 0) ? (items + "," + name) : name; @@ -359,21 +368,21 @@ public class NavigationView extends ViewPart { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { System.err.println("Could not add " + name + " in " + itemKey + " (" + INI_FILENAME + ")"); System.out.println(ex); } } - + private void removeItemFromConfigFile(TreeObject node) { - ManagedBean mbean = (ManagedBean)node.getManagedObject(); + ManagedBean mbean = (ManagedBean) node.getManagedObject(); String server = mbean.getServer().getName(); String vHost = mbean.getVirtualHostName(); String type = node.getParent().getName() + "s"; String itemKey = server + "." + vHost + "." + type; - + List<String> list = getConfiguredItemsFromFile(itemKey); if (list.contains(node.getName())) { @@ -383,29 +392,30 @@ public class NavigationView extends ViewPart { value += item + ","; } + value = (value.lastIndexOf(",") != -1) ? value.substring(0, value.lastIndexOf(",")) : value; - + _preferences.putValue(itemKey, value); try { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { - System.err.println("Error in updating the config file "+ INI_FILENAME); + System.err.println("Error in updating the config file " + INI_FILENAME); System.out.println(ex); } } } /** - * Queries the qpid server for MBeans and populates the navigation view with all MBeans for + * Queries the qpid server for MBeans and populates the navigation view with all MBeans for * the given server node. * @param serverNode */ private void populateServer(TreeObject serverNode) { - ManagedServer server = (ManagedServer)serverNode.getManagedObject(); + ManagedServer server = (ManagedServer) serverNode.getManagedObject(); String domain = server.getDomain(); try { @@ -414,29 +424,30 @@ public class NavigationView extends ViewPart TreeObject domainNode = new TreeObject(domain, NODE_TYPE_DOMAIN); domainNode.setParent(serverNode); - populateDomain(domainNode); + populateDomain(domainNode); } else { List<TreeObject> domainList = new ArrayList<TreeObject>(); - List<String> domains = MBeanUtility.getAllDomains(server);; + List<String> domains = MBeanUtility.getAllDomains(server); + ; for (String domainName : domains) - { + { TreeObject domainNode = new TreeObject(domainName, NODE_TYPE_DOMAIN); domainNode.setParent(serverNode); domainList.add(domainNode); - populateDomain(domainNode); + populateDomain(domainNode); } } } - catch(Exception ex) + catch (Exception ex) { System.out.println("\nError in connecting to Qpid broker "); ex.printStackTrace(); } } - + /** * Queries the Qpid Server and populates the given domain node with all MBeans undser that domain. * @param domain @@ -446,19 +457,19 @@ public class NavigationView extends ViewPart @SuppressWarnings("unchecked") private void populateDomain(TreeObject domain) throws IOException, Exception { - ManagedServer server = (ManagedServer)domain.getParent().getManagedObject(); - + ManagedServer server = (ManagedServer) domain.getParent().getManagedObject(); + // Now populate the mbenas under those types List<ManagedBean> mbeans = MBeanUtility.getManagedObjectsForDomain(server, domain.getName()); for (ManagedBean mbean : mbeans) { mbean.setServer(server); ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server); - serverRegistry.addManagedObject(mbean); - + serverRegistry.addManagedObject(mbean); + // Add all mbeans other than Connections, Exchanges and Queues. Because these will be added - // manually by selecting from MBeanView - if (!(mbean.isConnection() || mbean.isExchange() || mbean.isQueue()) ) + // manually by selecting from MBeanView + if (!(mbean.isConnection() || mbean.isExchange() || mbean.isQueue())) { addManagedBean(domain, mbean); } @@ -471,10 +482,11 @@ public class NavigationView extends ViewPart { addDefaultNodes(domain); } + break; } } - + /** * Add these three types - Connection, Exchange, Queue * By adding these, these will always be available, even if there are no mbeans under thse types @@ -493,7 +505,7 @@ public class NavigationView extends ViewPart typeChild.setParent(parent); typeChild.setVirtualHost(parent.getVirtualHost()); } - + /** * Checks if a particular mbeantype is already there in the navigation view for a domain. * This is used while populating domain with mbeans. @@ -506,24 +518,30 @@ public class NavigationView extends ViewPart List<TreeObject> childNodes = parent.getChildren(); for (TreeObject child : childNodes) { - if ((NODE_TYPE_MBEANTYPE.equals(child.getType()) || NODE_TYPE_TYPEINSTANCE.equals(child.getType())) && - typeName.equals(child.getName())) + if ((NODE_TYPE_MBEANTYPE.equals(child.getType()) || NODE_TYPE_TYPEINSTANCE.equals(child.getType())) + && typeName.equals(child.getName())) + { return child; + } } + return null; } - + private boolean doesMBeanNodeAlreadyExist(TreeObject typeNode, String mbeanName) { List<TreeObject> childNodes = typeNode.getChildren(); for (TreeObject child : childNodes) { if (MBEAN.equals(child.getType()) && mbeanName.equals(child.getName())) + { return true; + } } + return false; } - + /** * Adds the given MBean to the given domain node. Creates Notification node for the MBean. * sample ObjectNames - @@ -533,18 +551,18 @@ public class NavigationView extends ViewPart * @param mbean * @throws Exception */ - private void addManagedBean(TreeObject domain, ManagedBean mbean)// throws Exception + private void addManagedBean(TreeObject domain, ManagedBean mbean) // throws Exception { String name = mbean.getName(); // Split the mbean type into array of Strings, to create hierarchy // eg. type=VirtualHost.VirtualHostManager,VirtualHost=localhost will be: - // localhost->VirtualHostManager + // localhost->VirtualHostManager // eg. type=org.apache.qpid:type=VirtualHost.Queue,VirtualHost=test,name=ping will be: - // test->Queue->ping + // test->Queue->ping String[] types = mbean.getType().split("\\."); TreeObject typeNode = null; TreeObject parentNode = domain; - + // Run this loop till all nodes(hierarchy) for this mbean are created. This loop only creates // all the required parent nodes for the mbean for (int i = 0; i < types.length; i++) @@ -554,34 +572,34 @@ public class NavigationView extends ViewPart // If value is not null, then there will be a parent node for this mbean // eg. for type=VirtualHost the value is "test" typeNode = getMBeanTypeNode(parentNode, type); - + // create the type node if not already created if (typeNode == null) { // If the ObjectName doesn't have name property, that means there will be only one instance // of this mbean for given "type". So there will be no type node created for this mbean. - if (name == null && (i == types.length -1)) + if ((name == null) && (i == (types.length - 1))) { break; } - + // create a node for "type" typeNode = createTypeNode(parentNode, type); typeNode.setVirtualHost(mbean.getVirtualHostName()); } - + // now type node create becomes the parent node for next node in hierarchy parentNode = typeNode; - - /* + + /* * Now create instances node for this type if value exists. */ - if (valueOftype == null) + if (valueOftype == null) { - // No instance node will be created when value is null (eg type=Queue) + // No instance node will be created when value is null (eg type=Queue) break; - } - + } + // For different virtual hosts, the nodes with given value will be created. // eg type=VirtualHost, value=test typeNode = getMBeanTypeNode(parentNode, valueOftype); @@ -589,55 +607,60 @@ public class NavigationView extends ViewPart { typeNode = createTypeInstanceNode(parentNode, valueOftype); typeNode.setVirtualHost(mbean.getVirtualHostName()); - + // Create default nodes for VHost instances if (type.equals(VIRTUAL_HOST)) { addDefaultNodes(typeNode); } } + parentNode = typeNode; } - + if (typeNode == null) { typeNode = parentNode; } - + // Check if an MBean is already added if (doesMBeanNodeAlreadyExist(typeNode, name)) + { return; - + } + // Add the mbean node now TreeObject mbeanNode = new TreeObject(mbean); mbeanNode.setParent(typeNode); - + // Add the mbean to the config file if (mbean.isQueue() || mbean.isExchange() || mbean.isConnection()) { addItemInConfigFile(mbeanNode); } - + // Add notification node // TODO: show this only if the mbean sends any notification TreeObject notificationNode = new TreeObject(NOTIFICATION, NOTIFICATION); notificationNode.setParent(mbeanNode); } - + private TreeObject createTypeNode(TreeObject parent, String name) { TreeObject typeNode = new TreeObject(name, NODE_TYPE_MBEANTYPE); typeNode.setParent(parent); + return typeNode; } - + private TreeObject createTypeInstanceNode(TreeObject parent, String name) { TreeObject typeNode = new TreeObject(name, NODE_TYPE_TYPEINSTANCE); typeNode.setParent(parent); + return typeNode; } - + /** * Removes all the child nodes of the given parent node. Used when closing a server. * @param parent @@ -649,7 +672,7 @@ public class NavigationView extends ViewPart { removeManagedObject(child); } - + list.clear(); } @@ -666,10 +689,11 @@ public class NavigationView extends ViewPart { if (MBEAN.equals(child.getType())) { - String name = mbean.getName() != null ? mbean.getName() : mbean.getType(); + String name = (mbean.getName() != null) ? mbean.getName() : mbean.getType(); if (child.getName().equals(name)) { objectToRemove = child; + break; } } @@ -678,35 +702,39 @@ public class NavigationView extends ViewPart removeManagedObject(child, mbean); } } - + if (objectToRemove != null) { list.remove(objectToRemove); removeItemFromConfigFile(objectToRemove); } - + } - + /** * Closes the Qpid server connection */ public void disconnect() throws Exception { - TreeObject selectedNode = getSelectedServerNode(); - ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject(); + TreeObject selectedNode = getSelectedServerNode(); + ManagedServer managedServer = (ManagedServer) selectedNode.getManagedObject(); if (!_managedServerMap.containsKey(managedServer)) + { return; + } // Close server connection ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(managedServer); - if (serverRegistry == null) // server connection is already closed + if (serverRegistry == null) // server connection is already closed + { return; - + } + serverRegistry.closeServerConnection(); // Add server to the closed server list and the worker thread will remove the server from required places. ApplicationRegistry.serverConnectionClosed(managedServer); } - + /** * Connects the selected server node * @throws Exception @@ -714,27 +742,28 @@ public class NavigationView extends ViewPart public void reconnect(String user, String password) throws Exception { TreeObject selectedNode = getSelectedServerNode(); - ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject(); - if(_managedServerMap.containsKey(managedServer)) + ManagedServer managedServer = (ManagedServer) selectedNode.getManagedObject(); + if (_managedServerMap.containsKey(managedServer)) { throw new InfoRequiredException("Server " + managedServer.getName() + " is already connected"); - } + } + managedServer.setUser(user); managedServer.setPassword(password); createRMIServerConnection(managedServer); - + // put the server in the managed server map _managedServerMap.put(managedServer, selectedNode); - + // populate the server tree now populateServer(selectedNode); - + // Add the Queue/Exchanges/Connections from config file into the navigation tree addConfiguredItems(managedServer); - - _treeViewer.refresh(); + + _treeViewer.refresh(); } - + /** * Adds the items(queues/exchanges/connectins) from config file to the server tree * @param server @@ -750,13 +779,13 @@ public class NavigationView extends ViewPart List<String> items = getConfiguredItemsFromFile(itemKey); List<ManagedBean> mbeans = serverRegistry.getQueues(virtualHost); addConfiguredItems(items, mbeans); - + // Add Exchanges itemKey = server.getName() + "." + virtualHost + "." + INI_EXCHANGES; items = getConfiguredItemsFromFile(itemKey); mbeans = serverRegistry.getExchanges(virtualHost); addConfiguredItems(items, mbeans); - + // Add Connections itemKey = server.getName() + "." + virtualHost + "." + INI_CONNECTIONS; items = getConfiguredItemsFromFile(itemKey); @@ -764,7 +793,7 @@ public class NavigationView extends ViewPart addConfiguredItems(items, mbeans); } } - + /** * Gets the mbeans corresponding to the items and adds those to the navigation tree * @param items @@ -772,11 +801,11 @@ public class NavigationView extends ViewPart */ private void addConfiguredItems(List<String> items, List<ManagedBean> mbeans) { - if ((items == null) || items.isEmpty() | (mbeans == null) || mbeans.isEmpty()) + if ((items == null) || (items.isEmpty() | (mbeans == null)) || mbeans.isEmpty()) { return; } - + for (String item : items) { for (ManagedBean mbean : mbeans) @@ -784,12 +813,13 @@ public class NavigationView extends ViewPart if (item.equals(mbean.getName())) { addManagedBean(mbean); + break; } } } } - + /** * Closes the Qpid server connection if not already closed and removes the server node from the navigation view and * also from the ini file stored in the system. @@ -798,9 +828,9 @@ public class NavigationView extends ViewPart public void removeServer() throws Exception { disconnect(); - + // Remove from the Tree - String serverNodeName = getSelectedServerNode().getName(); + String serverNodeName = getSelectedServerNode().getName(); List<TreeObject> list = _serversRootNode.getChildren(); TreeObject objectToRemove = null; for (TreeObject child : list) @@ -808,46 +838,48 @@ public class NavigationView extends ViewPart if (child.getName().equals(serverNodeName)) { objectToRemove = child; + break; } } - + if (objectToRemove != null) { list.remove(objectToRemove); } - + _treeViewer.refresh(); - + // Remove from the ini file removeServerFromConfigFile(serverNodeName); } - + private void removeServerFromConfigFile(String serverNodeName) { List<String> serversList = getServerListFromFile(); serversList.remove(serverNodeName); - + String value = ""; for (String item : serversList) { value += item + ","; } + value = (value.lastIndexOf(",") != -1) ? value.substring(0, value.lastIndexOf(",")) : value; - + _preferences.putValue(INI_SERVERS, value); - + try { _preferences.save(); } - catch(IOException ex) + catch (IOException ex) { - System.err.println("Error in updating the config file "+ INI_FILENAME); + System.err.println("Error in updating the config file " + INI_FILENAME); System.out.println(ex); } } - + /** * @return the server addresses from the ini file * @throws Exception @@ -856,7 +888,7 @@ public class NavigationView extends ViewPart { return getConfiguredItemsFromFile(INI_SERVERS); } - + /** * Returns the list of items from the config file. * sample ini file: @@ -879,59 +911,60 @@ public class NavigationView extends ViewPart list.add(item); } } - + return list; } - + public TreeObject getSelectedServerNode() throws Exception { - IStructuredSelection ss = (IStructuredSelection)_treeViewer.getSelection(); - TreeObject selectedNode = (TreeObject)ss.getFirstElement(); - if (ss.isEmpty() || selectedNode == null || (!selectedNode.getType().equals(NODE_TYPE_SERVER))) + IStructuredSelection ss = (IStructuredSelection) _treeViewer.getSelection(); + TreeObject selectedNode = (TreeObject) ss.getFirstElement(); + if (ss.isEmpty() || (selectedNode == null) || (!selectedNode.getType().equals(NODE_TYPE_SERVER))) { throw new InfoRequiredException("Please select the server"); } return selectedNode; } - /** + + /** * This is a callback that will allow us to create the viewer and initialize * it. */ - public void createPartControl(Composite parent) + public void createPartControl(Composite parent) { Composite composite = new Composite(parent, SWT.NONE); GridLayout gridLayout = new GridLayout(); gridLayout.marginHeight = 2; gridLayout.marginWidth = 2; - gridLayout.horizontalSpacing = 0; - gridLayout.verticalSpacing = 2; + gridLayout.horizontalSpacing = 0; + gridLayout.verticalSpacing = 2; composite.setLayout(gridLayout); - + createTreeViewer(composite); _rootNode = new TreeObject("ROOT", "ROOT"); _serversRootNode = new TreeObject(NAVIGATION_ROOT, "ROOT"); _serversRootNode.setParent(_rootNode); - + _treeViewer.setInput(_rootNode); // set viewer as selection event provider for MBeanView - getSite().setSelectionProvider(_treeViewer); - + getSite().setSelectionProvider(_treeViewer); + // Start worker thread to refresh tree for added or removed objects - (new Thread(new Worker())).start(); - + (new Thread(new Worker())).start(); + createConfigFile(); _preferences = new PreferenceStore(INI_FILENAME); - + try { _preferences.load(); } - catch(IOException ex) + catch (IOException ex) { System.out.println(ex); } - + // load the list of servers already added from file List<String> serversList = getServerListFromFile(); if (serversList != null) @@ -945,23 +978,22 @@ public class NavigationView extends ViewPart _serversRootNode.addChild(serverNode); } } + _treeViewer.refresh(); - - } - /** - * Passing the focus request to the viewer's control. - */ - public void setFocus() - { + } + + /** + * Passing the focus request to the viewer's control. + */ + public void setFocus() + { } - } - public void refresh() { _treeViewer.refresh(); } - + /** * Content provider class for the tree viewer */ @@ -971,36 +1003,39 @@ public class NavigationView extends ViewPart { return getChildren(parent); } - + public Object[] getChildren(final Object parentElement) { - final TreeObject node = (TreeObject)parentElement; + final TreeObject node = (TreeObject) parentElement; + return node.getChildren().toArray(new TreeObject[0]); } - + public Object getParent(final Object element) { - final TreeObject node = (TreeObject)element; + final TreeObject node = (TreeObject) element; + return node.getParent(); } - + public boolean hasChildren(final Object element) { final TreeObject node = (TreeObject) element; + return !node.getChildren().isEmpty(); } - + public void inputChanged(final Viewer viewer, final Object oldInput, final Object newInput) { // Do nothing } - + public void dispose() { // Do nothing } } - + /** * Label provider class for the tree viewer */ @@ -1008,28 +1043,32 @@ public class NavigationView extends ViewPart { public Image getImage(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(NOTIFICATION)) { return ApplicationRegistry.getImage(NOTIFICATION_IMAGE); } else if (!node.getType().equals(MBEAN)) { - if (_treeViewer.getExpandedState(node)) - return ApplicationRegistry.getImage(OPEN_FOLDER_IMAGE); - else - return ApplicationRegistry.getImage(CLOSED_FOLDER_IMAGE); - + if (_treeViewer.getExpandedState(node)) + { + return ApplicationRegistry.getImage(OPEN_FOLDER_IMAGE); + } + else + { + return ApplicationRegistry.getImage(CLOSED_FOLDER_IMAGE); + } + } else { return ApplicationRegistry.getImage(MBEAN_IMAGE); } } - + public String getText(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(NODE_TYPE_MBEANTYPE)) { return node.getName() + "s"; @@ -1039,35 +1078,42 @@ public class NavigationView extends ViewPart return node.getName(); } } - + public Font getFont(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(NODE_TYPE_SERVER)) { if (node.getChildren().isEmpty()) + { return ApplicationRegistry.getFont(FONT_NORMAL); + } else + { return ApplicationRegistry.getFont(FONT_BOLD); + } } + return ApplicationRegistry.getFont(FONT_NORMAL); } } // End of LabelProviderImpl - - + private class ViewerSorterImpl extends ViewerSorter { public int category(Object element) { - TreeObject node = (TreeObject)element; + TreeObject node = (TreeObject) element; if (node.getType().equals(MBEAN)) + { return 1; + } + return 2; } } - + /** - * Worker thread, which keeps looking for new ManagedObjects to be added and + * Worker thread, which keeps looking for new ManagedObjects to be added and * unregistered objects to be removed from the tree. * @author Bhupendra Bhardwaj */ @@ -1075,33 +1121,31 @@ public class NavigationView extends ViewPart { public void run() { - while(true) + while (true) { if (!_managedServerMap.isEmpty()) { - refreshRemovedObjects(); + refreshRemovedObjects(); refreshClosedServerConnections(); } - + try { Thread.sleep(3000); } - catch(Exception ex) - { + catch (Exception ex) + { } + + } // end of while loop + } // end of run method. + } // end of Worker class - } - - }// end of while loop - }// end of run method. - }// end of Worker class - /** * Adds the mbean to the navigation tree * @param mbean * @throws Exception */ - public void addManagedBean(ManagedBean mbean)// throws Exception + public void addManagedBean(ManagedBean mbean) // throws Exception { TreeObject treeServerObject = _managedServerMap.get(mbean.getServer()); List<TreeObject> domains = treeServerObject.getChildren(); @@ -1111,22 +1155,25 @@ public class NavigationView extends ViewPart if (child.getName().equals(mbean.getDomain())) { domain = child; + break; } } - + addManagedBean(domain, mbean); _treeViewer.refresh(); } - + private void refreshRemovedObjects() { for (ManagedServer server : _managedServerMap.keySet()) { final ServerRegistry serverRegistry = ApplicationRegistry.getServerRegistry(server); - if (serverRegistry == null) // server connection is closed + if (serverRegistry == null) // server connection is closed + { continue; - + } + final List<ManagedBean> removalList = serverRegistry.getObjectsToBeRemoved(); if (removalList != null) { @@ -1145,19 +1192,22 @@ public class NavigationView extends ViewPart if (child.getName().equals(mbean.getDomain())) { domain = child; + break; } } + removeManagedObject(domain, mbean); - //serverRegistry.removeManagedObject(mbean); + // serverRegistry.removeManagedObject(mbean); } + _treeViewer.refresh(); } }); } } } - + /** * Gets the list of closed server connection from the ApplicationRegistry and then removes * the closed server nodes from the navigation view @@ -1169,20 +1219,20 @@ public class NavigationView extends ViewPart { Display display = getSite().getShell().getDisplay(); display.syncExec(new Runnable() - { - public void run() { - for (ManagedServer server : closedServers) + public void run() { - removeManagedObject(_managedServerMap.get(server)); - _managedServerMap.remove(server); - ApplicationRegistry.removeServer(server); + for (ManagedServer server : closedServers) + { + removeManagedObject(_managedServerMap.get(server)); + _managedServerMap.remove(server); + ApplicationRegistry.removeServer(server); + } + + _treeViewer.refresh(); } - - _treeViewer.refresh(); - } - }); + }); } } - -}
\ No newline at end of file + +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 013bda5927..e3b0249ed3 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -21,6 +21,7 @@ package org.apache.qpid.ping;
import java.util.List;
+import java.util.Properties;
import javax.jms.Destination;
@@ -31,54 +32,36 @@ import org.apache.qpid.requestreply.PingPongProducer; * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues.
* It is an all in one ping client, that produces and consumes its own pings.
*
+ * <p/>The constructor increments a count of the number of ping clients created. It is assumed that where many
+ * are created they will all be run in parallel and be active in sending and consuming pings at the same time.
+ * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear
+ * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of
+ * active ping clients. The {@link #getConsumersPerTopic()} method is used to supply this multiplier under these
+ * conditions.
+ *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create a ping pong producer that listens to its own pings <td> {@link PingPongProducer}
+ * <tr><td> Create a ping producer that listens to its own pings <td> {@link PingPongProducer}
+ * <tr><td> Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings.
* </table>
*/
public class PingClient extends PingPongProducer
{
+ /** Used to count the number of ping clients created. */
private static int _pingClientCount;
/**
- * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
- * for details. This constructor creates ping pong producer but de-registers its reply-to destination message
- * listener, and replaces it by listening to all of its ping destinations.
+ * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+ * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates
+ * producer and consumer sessions on it, to send and recieve its pings and replies on.
*
- * @param brokerDetails The URL of the broker to send pings to.
- * @param username The username to log onto the broker with.
- * @param password The password to log onto the broker with.
- * @param virtualpath The virtual host name to use on the broker.
- * @param destinationName The name (or root where multiple destinations are used) of the desitination to send
- * pings to.
- * @param selector The selector to filter replies with.
- * @param transacted Indicates whether or not pings are sent and received in transactions.
- * @param persistent Indicates whether pings are sent using peristent delivery.
- * @param messageSize Specifies the size of ping messages to send.
- * @param verbose Indicates that information should be printed to the console on every ping.
- * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover.
- * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover.
- * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
- * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover.
- * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all.
- * @param txBatchSize Specifies the number of pings to send in each transaction.
- * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
- * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
- * possible, with no rate restriction.
- * @param pubsub True to ping topics, false to ping queues.
- * @param unique True to use unique destinations for each ping pong producer, false to share.
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
- public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
- boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
- int ackMode, long pausetime) throws Exception
+ public PingClient(Properties overrides) throws Exception
{
- super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
- verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique, ackMode, pausetime);
+ super(overrides);
_pingClientCount++;
}
@@ -94,6 +77,11 @@ public class PingClient extends PingPongProducer return _pingDestinations;
}
+ /**
+ * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging.
+ *
+ * @return The scaling up of the number of expected pub/sub pings.
+ */
public int getConsumersPerTopic()
{
if (_isUnique)
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java new file mode 100644 index 0000000000..77526141d6 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -0,0 +1,389 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.util.MathUtils;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
+ * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
+ * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
+ * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
+ * failure conditions when using durable messaging.
+ *
+ * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
+ * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
+ * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
+ * with.
+ *
+ * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
+ *
+ * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
+ * additionally accepts the following parameters:
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> numMessages <th> 100 <th> The total number of messages to send.
+ * <tr><td> duration <th> 30S <th> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
+ * m minutes and s seconds).
+ * </table>
+ *
+ * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
+ * when no parameters are specified.
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
+ * <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
+ * <tr><td> persistent <td> true <td> Only makes sense to test persistent.
+ * <tr><td> commitBatchSize <td> 10
+ * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
+ * </table>
+ *
+ * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
+ * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
+ * wait for the second signal before receiving its pings.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Send and receive pings.
+ * <tr><td> Accept user input to signal stop sending.
+ * <tr><td> Accept user input to signal start receiving.
+ * <tr><td> Provide feedback on pings sent versus pings received.
+ * </table>
+ */
+public class PingDurableClient extends PingPongProducer implements ExceptionListener
+{
+ private static final Logger log = Logger.getLogger(PingDurableClient.class);
+
+ public static final String NUM_MESSAGES_PROPNAME = "numMessages";
+ public static final String NUM_MESSAGES_DEFAULT = "100";
+ public static final String DURATION_PROPNAME = "duration";
+ public static final String DURATION_DEFAULT = "30S";
+
+ /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
+ private static final long TIME_OUT = 3000;
+
+ static
+ {
+ defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
+ defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
+ defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
+ defaults.setProperty(TRANSACTED_PROPNAME, "true");
+ defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
+ defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
+ defaults.setProperty(RATE_PROPNAME, "20");
+ }
+
+ /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
+ private int numMessages;
+
+ /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
+ private long duration;
+
+ /** Used to indciate that this application should terminate. Set by the shutdown hook. */
+ private boolean terminate = false;
+
+ /**
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public PingDurableClient(Properties overrides) throws Exception
+ {
+ super(overrides);
+ log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
+
+ // Extract the additional configuration parameters.
+ ParsedProperties properties = new ParsedProperties(defaults);
+ properties.putAll(overrides);
+
+ numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
+ String durationSpec = properties.getProperty(DURATION_PROPNAME);
+
+ if (durationSpec != null)
+ {
+ duration = MathUtils.parseDuration(durationSpec) * 1000000;
+ }
+ }
+
+ /**
+ * Starts the ping/wait/receive process.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ try
+ {
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ Properties options = processCommandLine(args);
+ PingDurableClient pingProducer = new PingDurableClient(options);
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ // pingProducer.getConnection().setExceptionListener(pingProducer);
+
+ // Run the test procedure.
+ int sent = pingProducer.send();
+ pingProducer.waitForUser("Press return to begin receiving the pings.");
+ pingProducer.receive(sent);
+
+ System.exit(0);
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Performs the main test procedure implemented by this ping client. See the class level comment for details.
+ */
+ public int send() throws Exception
+ {
+ log.debug("public void sendWaitReceive(): called");
+
+ log.debug("duration = " + duration);
+ log.debug("numMessages = " + numMessages);
+
+ if (duration > 0)
+ {
+ System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds.");
+ }
+
+ if (_rate > 0)
+ {
+ System.out.println("Sending at " + _rate + " messages per second.");
+ }
+
+ if (numMessages > 0)
+ {
+ System.out.println("Sending up to " + numMessages + " messages.");
+ }
+
+ // Establish the connection and the message producer.
+ establishConnection(true, false);
+ getConnection().start();
+
+ Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+
+ // Send pings until a terminating condition is received.
+ boolean endCondition = false;
+ int messagesSent = 0;
+ int messagesCommitted = 0;
+ int messagesNotCommitted = 0;
+ long start = System.nanoTime();
+
+ // Clear console in.
+ clearConsole();
+
+ while (!endCondition)
+ {
+ boolean committed = false;
+
+ try
+ {
+ committed = sendMessage(messagesSent, message) && _transacted;
+
+ messagesSent++;
+ messagesNotCommitted++;
+
+ // Keep count of the number of messsages currently committed and pending commit.
+ if (committed)
+ {
+ log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
+ messagesCommitted += messagesNotCommitted;
+ messagesNotCommitted = 0;
+
+ System.out.println("Commited: " + messagesCommitted);
+ }
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException whilst sending.");
+ _publish = false;
+ }
+
+ // Determine if the end condition has been met, based on the number of messages, time passed, errors on
+ // the connection or user input.
+ long now = System.nanoTime();
+
+ if ((duration != 0) && ((now - start) > duration))
+ {
+ System.out.println("Send halted because duration expired.");
+ endCondition = true;
+ }
+ else if ((numMessages != 0) && (messagesSent >= numMessages))
+ {
+ System.out.println("Send halted because # messages completed.");
+ endCondition = true;
+ }
+ else if (System.in.available() > 0)
+ {
+ System.out.println("Send halted by user input.");
+ endCondition = true;
+
+ clearConsole();
+ }
+ else if (!_publish)
+ {
+ System.out.println("Send halted by error on the connection.");
+ endCondition = true;
+ }
+ }
+
+ log.debug("messagesSent = " + messagesSent);
+ log.debug("messagesCommitted = " + messagesCommitted);
+ log.debug("messagesNotCommitted = " + messagesNotCommitted);
+
+ System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
+ + ", Messages not Committed = " + messagesNotCommitted);
+
+ // Clean up the connection.
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ // Ignore as did best could manage to clean up.
+ }
+
+ return messagesSent;
+ }
+
+ private void receive(int messagesSent) throws Exception
+ {
+ // Re-establish the connection and the message consumer.
+ _queueJVMSequenceID = new AtomicInteger();
+ _queueSharedID = new AtomicInteger();
+
+ establishConnection(false, true);
+ _consumer.setMessageListener(null);
+ _connection.start();
+
+ // Try to receive all of the pings that were successfully sent.
+ int messagesReceived = 0;
+ boolean endCondition = false;
+
+ while (!endCondition)
+ {
+ // Message received = _consumer.receiveNoWait();
+ Message received = _consumer.receive(TIME_OUT);
+ log.debug("received = " + received);
+
+ if (received != null)
+ {
+ messagesReceived++;
+ }
+
+ // Determine if the end condition has been met, based on the number of messages and time passed since last
+ // receiving a message.
+ if (received == null)
+ {
+ System.out.println("Timed out.");
+ endCondition = true;
+ }
+ else if (messagesReceived >= messagesSent)
+ {
+ System.out.println("Got all messages.");
+ endCondition = true;
+ }
+ }
+
+ log.debug("messagesReceived = " + messagesReceived);
+
+ System.out.println("Messages received: " + messagesReceived);
+
+ // Clean up the connection.
+ close();
+ }
+
+ /**
+ * Clears any pending input from the console.
+ */
+ private void clearConsole()
+ {
+ try
+ {
+ BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
+
+ // System.in.skip(System.in.available());
+ while (bis.ready())
+ {
+ bis.readLine();
+ }
+ }
+ catch (IOException e)
+ { }
+ }
+
+ /**
+ * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
+ * effect of making this pinger listen to its own pings.
+ *
+ * @return The ping destinations.
+ */
+ public List<Destination> getReplyDestinations()
+ {
+ return _pingDestinations;
+ }
+
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
+ * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
+ * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
+ * message should stop, not that the application should termiante.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
+ {
+ public void run()
+ {
+ stop();
+ terminate = true;
+ }
+ });
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index c6a69807a3..44f7083bb5 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -34,20 +34,22 @@ import javax.jms.*; import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.topic.Config;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
/**
* PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
@@ -65,31 +67,37 @@ import uk.co.thebadgerset.junit.extensions.Throttle; * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
* testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
*
- * <p/><table><caption>CRC Card</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
- * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
- * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
- * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
+ * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
+ * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
+ * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
* <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
- * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
- * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
- * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
- * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
- * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
- * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
- * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
- * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
- * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
- * <tr><td> username <td> guest <td> The username to access the broker with.
- * <tr><td> password <td> guest <td> The password to access the broker with.
- * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
- * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
- * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
- * <tr><td> ackMode <td> NO_ACK <td> The message acknowledgement mode.
- * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
+ * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
+ * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
+ * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
+ * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
+ * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
+ * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
+ * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
+ * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
+ * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
+ * <tr><td> username <td> guest <td> The username to access the broker with.
+ * <tr><td> password <td> guest <td> The password to access the broker with.
+ * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
+ * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
+ * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
+ * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
+ * 0 - SESSION_TRANSACTED
+ * 1 - AUTO_ACKNOWLEDGE
+ * 2 - CLIENT_ACKNOWLEDGE
+ * 3 - DUPS_OK_ACKNOWLEDGE
+ * 257 - NO_ACKNOWLEDGE
+ * 258 - PRE_ACKNOWLEDGE
+ * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
* </table>
*
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
@@ -121,7 +129,7 @@ import uk.co.thebadgerset.junit.extensions.Throttle; */
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
{
- private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+ private static final Logger log = Logger.getLogger(PingPongProducer.class);
/** Holds the name of the property to get the test message size from. */
public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
@@ -181,31 +189,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
/** Holds the default failover after commit test flag. */
- public static final String FAIL_AFTER_COMMIT_DEFAULT = "false";
+ public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
/** Holds the name of the proeprty to get the fail before commit flag from. */
public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
/** Holds the default failover before commit test flag. */
- public static final String FAIL_BEFORE_COMMIT_DEFAULT = "false";
+ public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
/** Holds the name of the proeprty to get the fail after send flag from. */
public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
/** Holds the default failover after send test flag. */
- public static final String FAIL_AFTER_SEND_DEFAULT = "false";
+ public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
/** Holds the name of the property to get the fail before send flag from. */
public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
/** Holds the default failover before send test flag. */
- public static final String FAIL_BEFORE_SEND_DEFAULT = "false";
+ public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
/** Holds the name of the property to get the fail once flag from. */
public static final String FAIL_ONCE_PROPNAME = "failOnce";
/** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
- public static final String FAIL_ONCE_DEFAULT = "true";
+ public static final boolean FAIL_ONCE_DEFAULT = true;
/** Holds the name of the property to get the broker access username from. */
public static final String USERNAME_PROPNAME = "username";
@@ -223,7 +231,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String SELECTOR_PROPNAME = "selector";
/** Holds the default message selector. */
- public static final String SELECTOR_DEFAULT = null;
+ public static final String SELECTOR_DEFAULT = "";
/** Holds the name of the proeprty to get the destination count from. */
public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
@@ -253,7 +261,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String ACK_MODE_PROPNAME = "ackMode";
/** Defines the default message acknowledgement mode. */
- public static final int ACK_MODE_DEFAULT = Session.NO_ACKNOWLEDGE;
+ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
/** Holds the name of the property to get the pause between batches property from. */
public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch";
@@ -273,96 +281,140 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
- /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
- private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
-
- /**
- * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
- * ping producers on the same JVM.
- */
- private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ /** Holds the default configuration properties. */
+ public static ParsedProperties defaults = new ParsedProperties();
- /** A convenient formatter to use when time stamping output. */
- protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+ static
+ {
+ defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
+ defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+ defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
+ defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
+ defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
+ defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
+ defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
+ defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
+ defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
+ defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
+ defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
+ defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT);
+ defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+ }
- /**
- * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
- * creating multiple ping producers in the same JVM.
- */
- protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
-
- /** Holds the destination where the response messages will arrive. */
- private Destination _replyDestination;
+ protected String _brokerDetails;
+ protected String _username;
+ protected String _password;
+ protected String _virtualpath;
+ protected String _destinationName;
+ protected String _selector;
+ protected boolean _transacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
/** Holds the acknowledgement mode used for sending and receiving messages. */
- private int _ackMode = Session.NO_ACKNOWLEDGE;
+ private int _ackMode;
/** Determines what size of messages this producer sends. */
protected int _messageSize;
/** Used to indicate that the ping loop should print out whenever it pings. */
- protected boolean _verbose = VERBOSE_DEFAULT;
+ protected boolean _verbose;
- /** Holds the session on which ping replies are received. */
- protected Session _consumerSession;
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ protected boolean _isPubSub;
- /** Used to restrict the sending rate to a specified limit. */
- private Throttle _rateLimiter = null;
+ /** Flag used to indicate if the destinations should be unique client. */
+ protected boolean _isUnique;
- /** Holds a message listener that this message listener chains all its messages to. */
- private ChainedMessageListener _chainedMessageListener = null;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ protected boolean _failBeforeCommit;
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- protected boolean _isPubSub = PUBSUB_DEFAULT;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ protected boolean _failAfterCommit;
- /** Flag used to indicate if the destinations should be unique client. */
- protected static boolean _isUnique = UNIQUE_DESTS_DEFAULT;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ protected boolean _failBeforeSend;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ protected boolean _failAfterSend;
+
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ protected boolean _failOnce;
+
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ protected int _txBatchSize;
+
+ protected int _noOfDestinations;
+ protected int _rate;
+
+ /** Holds the wait time to insert between every batch of messages committed. */
+ private long _pauseBatch;
+
+ /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
+ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
/**
- * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
- * on the same JVM using this id generator will allow them to ping on the same queues.
+ * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
+ * ping producers on the same JVM.
*/
- protected AtomicInteger _queueSharedId = new AtomicInteger();
+ private static Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
- /** Used to tell the ping loop when to terminate, it only runs while this is true. */
- protected boolean _publish = true;
+ /** A convenient formatter to use when time stamping output. */
+ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/** Holds the connection to the broker. */
- private Connection _connection;
+ protected Connection _connection;
+
+ /** Holds the session on which ping replies are received. */
+ protected Session _consumerSession;
/** Holds the producer session, needed to create ping messages. */
- private Session _producerSession;
+ protected Session _producerSession;
- /** Holds the set of destinations that this ping producer pings. */
- protected List<Destination> _pingDestinations = new ArrayList<Destination>();
+ /** Holds the destination where the response messages will arrive. */
+ protected Destination _replyDestination;
- /** Holds the message producer to send the pings through. */
- protected MessageProducer _producer;
+ /** Holds the set of destinations that this ping producer pings. */
+ protected List<Destination> _pingDestinations;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
- protected boolean _failBeforeCommit = false;
+ /** Used to restrict the sending rate to a specified limit. */
+ protected Throttle _rateLimiter;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
- protected boolean _failAfterCommit = false;
+ /** Holds a message listener that this message listener chains all its messages to. */
+ protected ChainedMessageListener _chainedMessageListener = null;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
- protected boolean _failBeforeSend = false;
+ /**
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
+ */
+ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
- protected boolean _failAfterSend = false;
+ /**
+ * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
+ * on the same JVM using this id generator will allow them to ping on the same queues.
+ */
+ protected AtomicInteger _queueSharedID = new AtomicInteger();
- /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
- protected boolean _failOnce = true;
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
- /** Holds the number of sends that should be performed in every transaction when using transactions. */
- protected int _txBatchSize = TX_BATCH_SIZE_DEFAULT;
+ /** Holds the message producer to send the pings through. */
+ protected MessageProducer _producer;
- /** Holds the wait time to insert between every batch of messages committed. */
- private static long _pauseBatch = PAUSE_AFTER_BATCH_DEFAULT;
+ /** Holds the message consumer to receive the ping replies through. */
+ protected MessageConsumer _consumer;
/**
* Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
@@ -370,202 +422,195 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
static int _consumersPerTopic = 1;
+ /** The prompt to display when asking the user to kill the broker for failover testing. */
+ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+
/**
- * Creates a ping producer with the specified parameters, of which there are many. See their individual comments for
- * details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, to send
- * and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
+ * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
+ * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
+ * it, to send and recieve its pings and replies on.
*
- * @param brokerDetails The URL of the broker to send pings to.
- * @param username The username to log onto the broker with.
- * @param password The password to log onto the broker with.
- * @param virtualpath The virtual host name to use on the broker.
- * @param destinationName The name (or root where multiple destinations are used) of the desitination to send pings to.
- * @param selector The selector to filter replies with.
- * @param transacted Indicates whether or not pings are sent and received in transactions.
- * @param persistent Indicates whether pings are sent using peristent delivery.
- * @param messageSize Specifies the size of ping messages to send.
- * @param verbose Indicates that information should be printed to the console on every ping.
- * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test
- * failover.
- * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test
- * failover.
- * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
- * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test
- * failover.
- * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not
- * all.
- * @param txBatchSize Specifies the number of pings to send in each transaction.
- * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
- * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
- * possible, with no rate restriction.
- * @param pubsub True to ping topics, false to ping queues.
- * @param unique True to use unique destinations for each ping pong producer, false to share.
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
- public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
- boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
- boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
+ public PingPongProducer(Properties overrides) throws Exception
{
- _logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
- + ", String password = " + password + ", String virtualpath = " + virtualpath
- + ", String destinationName = " + destinationName + ", String selector = " + selector
- + ", boolean transacted = " + transacted + ", boolean persistent = " + persistent
- + ", int messageSize = " + messageSize + ", boolean verbose = " + verbose + ", boolean afterCommit = "
- + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
- + ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
- + txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
- + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + ", ackMode = " + ackMode
- + "): called");
-
- // Keep all the relevant options.
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = txBatchSize;
- _isPubSub = pubsub;
- _isUnique = unique;
- _pauseBatch = pause;
-
- if (ackMode != 0)
- {
- _ackMode = ackMode;
- }
+ log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+
+ // Create a set of parsed properties from the defaults overriden by the passed in values.
+ ParsedProperties properties = new ParsedProperties(defaults);
+ properties.putAll(overrides);
+
+ // Extract the configuration properties to set the pinger up with.
+ _brokerDetails = properties.getProperty(BROKER_PROPNAME);
+ _username = properties.getProperty(USERNAME_PROPNAME);
+ _password = properties.getProperty(PASSWORD_PROPNAME);
+ _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
+ _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
+ _selector = properties.getProperty(SELECTOR_PROPNAME);
+ _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
+ _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
+ _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
+ _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
+ _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
+ _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
+ _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
+ _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
+ _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
+ _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+ _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
+ _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
+ _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+ _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
// Check that one or more destinations were specified.
- if (noOfDestinations < 1)
+ if (_noOfDestinations < 1)
{
throw new IllegalArgumentException("There must be at least one destination.");
}
- // Create a connection to the broker.
+ // Set up a throttle to control the send rate, if a rate > 0 is specified.
+ if (_rate > 0)
+ {
+ _rateLimiter = new BatchedThrottle();
+ _rateLimiter.setRate(_rate);
+ }
+
+ // Create the connection and message producers/consumers.
+ // establishConnection(true, true);
+ }
+
+ /**
+ * Establishes a connection to the broker and creates message consumers and producers based on the parameters
+ * that this ping client was created with.
+ *
+ * @param producer Flag to indicate whether or not the producer should be set up.
+ * @param consumer Flag to indicate whether or not the consumers should be set up.
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
+ */
+ public void establishConnection(boolean producer, boolean consumer) throws Exception
+ {
+ log.debug("public void establishConnection(): called");
+
+ // Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
String clientID = address.getHostName() + System.currentTimeMillis();
- _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
+ // Create a connection to the broker.
+ createConnection(clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
- _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
+ _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+ _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
- // Set up a throttle to control the send rate, if a rate > 0 is specified.
- if (rate > 0)
+ // Create the destinations to send pings to and receive replies from.
+ _replyDestination = _consumerSession.createTemporaryQueue();
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+
+ // Create the message producer only if instructed to.
+ if (producer)
{
- _rateLimiter = new BatchedThrottle();
- _rateLimiter.setRate(rate);
+ createProducer();
}
- // Create the temporary queue for replies.
- _replyDestination = _consumerSession.createTemporaryQueue();
+ // Create the message consumer only if instructed to.
+ if (consumer)
+ {
+ createReplyConsumers(getReplyDestinations(), _selector);
+ }
+ }
- // Create the producer and the consumers for all reply destinations.
- createProducer();
- createPingDestinations(noOfDestinations, selector, destinationName, unique);
- createReplyConsumers(getReplyDestinations(), selector);
+ /**
+ * Establishes a connection to the broker, based on the configuration parameters that this ping client was
+ * created with.
+ *
+ * @param clientID The clients identifier.
+ *
+ * @throws AMQException Any underlying exceptions are allowed to fall through.
+ * @throws URLSyntaxException Any underlying exceptions are allowed to fall through.
+ */
+ protected void createConnection(String clientID) throws AMQException, URLSyntaxException
+ {
+ _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs to be
- * started to bounce the pings back again.
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
+ * to be started to bounce the pings back again.
*
* @param args The command line arguments.
- *
- * @throws Exception When something went wrong with the test
*/
- public static void main(String[] args) throws Exception
+ public static void main(String[] args)
{
- // Extract the command line.
- Config config = new Config();
- config.setOptions(args);
- if (args.length == 0)
+ try
{
- _logger.info("Running test with default values...");
- // usage();
- // System.exit(0);
- }
+ Properties options = processCommandLine(args);
- String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = VIRTUAL_HOST_DEFAULT;
- String selector = (config.getSelector() == null) ? SELECTOR_DEFAULT : config.getSelector();
- boolean verbose = true;
- boolean transacted = config.isTransacted();
- boolean persistent = config.usePersistentMessages();
- int messageSize = (config.getPayload() != 0) ? config.getPayload() : MESSAGE_SIZE_DEAFULT;
- // int messageCount = config.getMessages();
- int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DESTINATION_COUNT_DEFAULT;
- int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : TX_BATCH_SIZE_DEFAULT;
- int rate = (config.getRate() != 0) ? config.getRate() : RATE_DEFAULT;
- boolean pubsub = config.isPubSub();
-
- String destName = config.getDestination();
- if (destName == null)
- {
- destName = PING_QUEUE_NAME_DEFAULT;
- }
+ // Create a ping producer overriding its defaults with all options passed on the command line.
+ PingPongProducer pingProducer = new PingPongProducer(options);
+ pingProducer.establishConnection(true, true);
- boolean afterCommit = false;
- boolean beforeCommit = false;
- boolean afterSend = false;
- boolean beforeSend = false;
- boolean failOnce = false;
+ // Start the ping producers dispatch thread running.
+ pingProducer.getConnection().start();
- for (String arg : args)
- {
- if (arg.startsWith("failover:"))
- {
- // failover:<before|after>:<send:commit> | failover:once
- String[] parts = arg.split(":");
- if (parts.length == 3)
- {
- if (parts[2].equals("commit"))
- {
- afterCommit = parts[1].equals("after");
- beforeCommit = parts[1].equals("before");
- }
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
- if (parts[2].equals("send"))
- {
- afterSend = parts[1].equals("after");
- beforeSend = parts[1].equals("before");
- }
+ // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
+ pingProducer.getConnection().setExceptionListener(pingProducer);
- if (parts[1].equals("once"))
- {
- failOnce = true;
- }
- }
- else
- {
- System.out.println("Unrecognized failover request:" + arg);
- }
- }
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.run();
+ pingThread.join();
}
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ log.error("Top level handler caught execption.", e);
+ System.exit(1);
+ }
+ }
- // Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, USERNAME_DEFAULT, PASSWORD_DEFAULT, virtualpath, destName, selector,
- transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
+ /**
+ * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+ * a map of properties containing them.
+ *
+ * @param args The command line.
+ *
+ * @return A set of properties containing all name=value pairs from the command line.
+ *
+ * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
+ * CommandLineParser class.
+ */
+ protected static Properties processCommandLine(String[] args)
+ {
+ // Use the command line parser to evaluate the command line.
+ CommandLineParser commandLine = new CommandLineParser(new String[][] {});
- pingProducer.getConnection().start();
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
+ try
+ {
+ options = commandLine.parseCommandLine(args);
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingProducer.getConnection().setExceptionListener(pingProducer);
+ // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
+ // overridden values from there.
+ commandLine.addCommandLineToSysProperties();
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingProducer);
- pingThread.run();
- pingThread.join();
+ return options;
}
/**
@@ -582,9 +627,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- {
- // ignore
- }
+ { }
}
}
@@ -596,27 +639,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
public List<Destination> getReplyDestinations()
{
- _logger.debug("public List<Destination> getReplyDestinations(): called");
+ log.debug("public List<Destination> getReplyDestinations(): called");
List<Destination> replyDestinations = new ArrayList<Destination>();
replyDestinations.add(_replyDestination);
+ log.debug("replyDestinations = " + replyDestinations);
+
return replyDestinations;
}
/**
- * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery flag
- * is set accoring the ping producer creation options.
+ * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+ * flag is set accoring the ping producer creation options.
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createProducer() throws JMSException
{
- _logger.debug("public void createProducer(): called");
+ log.debug("public void createProducer(): called");
_producer = (MessageProducer) _producerSession.createProducer(null);
- // _producer.setDisableMessageTimestamp(true);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
}
/**
@@ -632,13 +678,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ throws JMSException
{
- _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
- + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
- + unique + "): called");
+ log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+
+ _pingDestinations = new ArrayList<Destination>();
// Create the desired number of ping destinations and consumers for them.
+ log.debug("Creating " + noOfDestinations + " destinations to ping.");
+
for (int i = 0; i < noOfDestinations; i++)
{
AMQDestination destination;
@@ -648,26 +697,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
- _logger.debug("Creating unique destinations.");
+ log.debug("Creating unique destinations.");
id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
- _logger.debug("Creating shared destinations.");
- id = "_" + _queueSharedId.incrementAndGet();
+ log.debug("Creating shared destinations.");
+ id = "_" + _queueSharedID.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- _logger.debug("Created topic " + destination);
+ log.debug("Created topic " + destination);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
- _logger.debug("Created queue " + destination);
+ log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -676,6 +725,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
/**
+ * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+ *
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
+ *
+ * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+ {
+ log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");
+
+ for (Destination destination : destinations)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ _consumer =
+ _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
+ _consumer.setMessageListener(this);
+ }
+ }
+
+ /**
* Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating
* reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.
*
@@ -683,13 +755,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
public void onMessage(Message message)
{
- _logger.debug("public void onMessage(Message message): called");
+ log.debug("public void onMessage(Message message): called");
try
{
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- _logger.debug("correlationID = " + correlationID);
+ log.debug("correlationID = " + correlationID);
// Countdown on the traffic light if there is one for the matching correlation id.
PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
@@ -701,7 +773,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
@@ -716,20 +788,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
- _logger.debug("remainingCount = " + remainingCount);
- _logger.debug("trueCount = " + trueCount);
+ log.debug("remainingCount = " + remainingCount);
+ log.debug("trueCount = " + trueCount);
// Commit on transaction batch size boundaries. At this point in time the waiting producer remains
// blocked, even on the last message.
if ((remainingCount % _txBatchSize) == 0)
{
commitTx(_consumerSession);
- if (!_consumerSession.getTransacted() &&
- _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- // Acknowledge the messages when the session is not transacted but client_ack
- ((AMQSession) _consumerSession).acknowledge();
- }
}
// Forward the message and remaining count to any interested chained message listener.
@@ -748,7 +814,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
else
{
- _logger.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Got unexpected message with correlationId: " + correlationID);
}
// Print out ping times for every message in verbose mode only.
@@ -759,48 +825,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (timestamp != null)
{
long diff = System.nanoTime() - timestamp;
- _logger.trace("Time for round trip (nanos): " + diff);
+ log.trace("Time for round trip (nanos): " + diff);
}
}
}
catch (JMSException e)
{
- _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ log.warn("There was a JMSException: " + e.getMessage(), e);
}
- _logger.debug("public void onMessage(Message message): ending");
- }
-
- /**
- * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a
- * reply arrives, then a null reply is returned from this method. This method generates a new unqiue correlation id for
- * the messages.
- *
- * @param message The message to send.
- * @param numPings The number of ping messages to send.
- * @param timeout The timeout in milliseconds.
- *
- * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
- * all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- * @throws InterruptedException When interrupted by a timeout.
- */
- public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
- {
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + "): called");
-
- // Create a unique correlation id to put on the messages before sending them.
- String messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
-
- return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
- }
-
- public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
- {
- return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ log.debug("public void onMessage(Message message): ending");
}
/**
@@ -808,10 +842,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * reply arrives, then a null reply is returned from this method. This method allows the caller to specify the
* correlation id.
*
- * @param message The message to send.
+ * @param message The message to send. If this is null, one is generated.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- * @param messageCorrelationId The message correlation id.
+ * @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
* all prematurely.
@@ -820,10 +854,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+
+ // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
+ if (messageCorrelationId == null)
+ {
+ messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
+ }
try
{
@@ -858,31 +898,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- _logger.debug("numReplies = " + numReplies);
- _logger.debug("allMessagesReceived = " + allMessagesReceived);
+ log.debug("numReplies = " + numReplies);
+ log.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
long lastMessageReceievedAt = perCorrelationId.timeOutStart;
timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- _logger.debug("now = " + now);
- _logger.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ log.debug("now = " + now);
+ log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
}
while (!timedOut && !allMessagesReceived);
if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
- _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
+ log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
else if (_verbose)
{
- _logger.info("Got all replies on id, " + messageCorrelationId);
+ log.info("Got all replies on id, " + messageCorrelationId);
}
commitTx(_consumerSession);
- _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -905,8 +945,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
if (message == null)
{
@@ -923,40 +963,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there are uncommitted messages.
+ // Reset the committed flag to indicate that there may be uncommitted messages.
committed = false;
// Re-timestamp the message.
message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
- // Round robin the destinations as the messages are sent.
- // return _destinationCount;
- sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
-
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
- {
- _rateLimiter.throttle();
- }
-
- // Call commit every time the commit batch size is reached.
- if ((i % _txBatchSize) == 0)
- {
- commitTx(_producerSession);
- committed = true;
-
- /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
- Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
- the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
- */
- pause(_pauseBatch);
- }
+ // Send the message, passing in the message count.
+ committed = sendMessage(i, message);
// Spew out per message timings on every message sonly in verbose mode.
if (_verbose)
{
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
- + messageCorrelationId);
+ log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
}
}
@@ -968,7 +987,70 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
/**
- * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
+ * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
+ * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
+ * than one), and to determine if the transaction batch size has been reached and the sent messages should be
+ * committed.
+ *
+ * @param i The count of messages sent so far in a loop of multiple calls to this send method.
+ * @param message The message to send.
+ *
+ * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
+ */
+ protected boolean sendMessage(int i, Message message) throws JMSException
+ {
+ // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
+ // log.debug("_txBatchSize = " + _txBatchSize);
+
+ // Round robin the destinations as the messages are sent.
+ Destination destination = _pingDestinations.get(i % _pingDestinations.size());
+
+ // Prompt the user to kill the broker when doing failover testing.
+ if (_failBeforeSend)
+ {
+ if (_failOnce)
+ {
+ _failBeforeSend = false;
+ }
+
+ log.trace("Failing Before Send");
+ waitForUser(KILL_BROKER_PROMPT);
+ }
+
+ // Send the message either to its round robin destination, or its default destination.
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
+ {
+ _producer.send(destination, message);
+ }
+
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
+ {
+ _rateLimiter.throttle();
+ }
+
+ // Call commit every time the commit batch size is reached.
+ boolean committed = false;
+
+ // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
+ if (((i + 1) % _txBatchSize) == 0)
+ {
+ committed = commitTx(_producerSession);
+ }
+
+ return committed;
+ }
+
+ /**
+ * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction
+ * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared,
+ * which will terminate the pinger.
*/
public void pingLoop()
{
@@ -979,25 +1061,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT);
+ pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
}
catch (JMSException e)
{
_publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ log.debug("There was a JMSException: " + e.getMessage(), e);
}
catch (InterruptedException e)
{
_publish = false;
- _logger.debug("There was an interruption: " + e.getMessage(), e);
+ log.debug("There was an interruption: " + e.getMessage(), e);
}
}
- public Destination getReplyDestination()
- {
- return getReplyDestinations().get(0);
- }
-
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.
*
@@ -1038,8 +1115,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag has
- * been cleared.
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
*/
public void stop()
{
@@ -1066,8 +1143,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
public void onException(JMSException e)
{
+ log.debug("public void onException(JMSException e = " + e + "): called", e);
_publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
}
/**
@@ -1079,12 +1156,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook()
{
return new Thread(new Runnable()
- {
- public void run()
{
- stop();
- }
- });
+ public void run()
+ {
+ stop();
+ }
+ });
}
/**
@@ -1098,40 +1175,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
/**
- * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
- *
- * @param destinations The destinations to listen to.
- * @param selector A selector to filter the messages with.
- *
- * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
- */
- public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
- {
- _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
-
- for (Destination destination : destinations)
- {
- // Create a consumer for the destination and set this pinger to listen to its messages.
- MessageConsumer consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
- consumer.setMessageListener(this);
- }
- }
-
- /**
* Closes the pingers connection.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
public void close() throws JMSException
{
- _logger.debug("public void close(): called");
+ log.debug("public void close(): called");
- if (_connection != null)
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ finally
{
- _connection.close();
+ _connection = null;
+ _producerSession = null;
+ _consumerSession = null;
+ _producer = null;
+ _consumer = null;
+ _pingDestinations = null;
+ _replyDestination = null;
}
}
@@ -1150,25 +1217,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis *
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*
+ * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+ *
* @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
* method, because commits only apply to transactional pingers, but fail after send applied to transactional and
* non-transactional alike.
*/
- protected void commitTx(Session session) throws JMSException
+ protected boolean commitTx(Session session) throws JMSException
{
- _logger.debug("protected void commitTx(Session session): called");
+ log.debug("protected void commitTx(Session session): called");
+
+ boolean committed = false;
- _logger.trace("Batch time reached");
+ log.trace("Batch time reached");
if (_failAfterSend)
{
- _logger.trace("Batch size reached");
+ log.trace("Batch size reached");
if (_failOnce)
{
_failAfterSend = false;
}
- _logger.trace("Failing After Send");
- doFailover();
+ log.trace("Failing After Send");
+ waitForUser(KILL_BROKER_PROMPT);
}
if (session.getTransacted())
@@ -1182,13 +1253,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failBeforeCommit = false;
}
- _logger.trace("Failing Before Commit");
- doFailover();
+ log.trace("Failing Before Commit");
+ waitForUser(KILL_BROKER_PROMPT);
}
- long l = System.currentTimeMillis();
+ long l = System.nanoTime();
session.commit();
- _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms");
+ committed = true;
+ log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1197,84 +1269,56 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failAfterCommit = false;
}
- _logger.trace("Failing After Commit");
- doFailover();
+ log.trace("Failing After Commit");
+ waitForUser(KILL_BROKER_PROMPT);
}
- _logger.trace("Session Commited.");
+ log.trace("Session Commited.");
}
catch (JMSException e)
{
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
+ log.trace("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
{
- _logger.debug("No consumers on queue.");
+ log.debug("No consumers on queue.");
}
try
{
session.rollback();
- _logger.trace("Message rolled back.");
+ log.trace("Message rolled back.");
}
catch (JMSException jmse)
{
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+ log.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
}
}
}
+
+ return committed;
}
/**
- * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination of
- * the ping producer. If an explicit destination is set, this overrides the default.
+ * Outputs a prompt to the console and waits for the user to press return.
*
- * @param destination The destination to send to.
- * @param message The message to send.
- *
- * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ * @param prompt The prompt to display on the console.
*/
- protected void sendMessage(Destination destination, Message message) throws JMSException
+ protected void waitForUser(String prompt)
{
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
-
- _logger.trace("Failing Before Send");
- doFailover();
- }
-
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
- }
+ System.out.println(prompt);
- /**
- * Prompts the user to terminate the broker, in order to test failover functionality. This method will block until the
- * user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now then press return");
try
{
System.in.read();
}
catch (IOException e)
{
- // ignore
+ // Ignored.
}
System.out.println("Continuing.");
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index a782058fd4..3b8e670f8f 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -86,7 +86,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Sets up the test parameters with defaults. testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); + Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); } /** @@ -159,7 +159,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(numPings, timeout, perThreadSetup._correlationId); + int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); // Check that all the replies were received and log a fail if they were not. if (numReplies < perCorrelationId._expectedCount) @@ -247,8 +247,8 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA String correlationId = message.getJMSCorrelationID(); _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount - + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId()); + + "): called on batch boundary for message id: " + correlationId + " with thread id: " + + Thread.currentThread().getId()); // Get the details for the correlation id and check that they are not null. They can become null // if a test times out. diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java index 620ddd13f7..b303e16d2c 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -175,7 +175,7 @@ public class PingLatencyTestPerf extends PingTestPerf implements TimingControlle // Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < numPings)
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 6fb9d543a0..fd3bc3ff23 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -66,19 +66,22 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
/** Holds a property reader to extract the test parameters from. */
- protected ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+ protected ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
public PingTestPerf(String name)
{
super(name);
+ _logger.debug("testParameters = " + testParameters);
+
// Sets up the test parameters with defaults.
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT);
testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- PingPongProducer.PERSISTENT_MODE_DEFAULT);
+ PingPongProducer.PERSISTENT_MODE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
@@ -90,20 +93,20 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- PingPongProducer.DESTINATION_COUNT_DEFAULT);
+ PingPongProducer.DESTINATION_COUNT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
}
/**
@@ -139,20 +142,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
// Fail the test if the timeout was exceeded.
if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
{
Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
- + numReplies);
+ + numReplies);
}
}
@@ -167,43 +168,13 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware {
PerThreadSetup perThreadSetup = new PerThreadSetup();
- // Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
- String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
- String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
- String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
- String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
- boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
- boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
- String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
- boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
- int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
- boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
- boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
- boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
- boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
- boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
- int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME);
- Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
- boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME);
- int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
- int pausetime = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
-
- // Extract the test set up paramaeters.
- int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.DESTINATION_COUNT_PROPNAME));
-
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
synchronized (this)
{
// Establish a client to ping a Destination and listen the reply back from same Destination
- perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
- selector, transacted, persistent, messageSize, verbose,
- failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, destinationscount, rate, pubsub, unique,
- ackMode, pausetime);
+ perThreadSetup._pingClient = new PingClient(testParameters);
+ perThreadSetup._pingClient.establishConnection(true, true);
}
// Start the client connection
perThreadSetup._pingClient.getConnection().start();
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index a75cbf7e19..a09324b568 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -69,52 +69,55 @@ public class PingPongTestPerf extends AsymptoticTestCase // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
// of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
// private Properties testParameters = System.getProperties();
- private ParsedProperties testParameters = new TestContextProperties(System.getProperties());
+ private ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
public PingPongTestPerf(String name)
{
super(name);
+ _logger.debug(testParameters);
+
// Sets up the test parameters with defaults.
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
+ Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
+ Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
- Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
+ Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
- Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
+ Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
- Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
+ Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
+ Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
- Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
+ Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
- Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
+ Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
}
/**
@@ -147,14 +150,12 @@ public class PingPongTestPerf extends AsymptoticTestCase // Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// Send the message and wait for a reply.
int numReplies =
- perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT);
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
@@ -182,37 +183,21 @@ public class PingPongTestPerf extends AsymptoticTestCase boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
- int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
- boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
- boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
- boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
- boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
- int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.TX_BATCH_SIZE_PROPNAME);
- Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
- boolean unique = testParameters.getPropertyAsBoolean(PingPongProducer.UNIQUE_DESTS_PROPNAME);
- int ackMode = testParameters.getPropertyAsInteger(PingPongProducer.ACK_MODE_PROPNAME);
- long pause = testParameters.getPropertyAsInteger(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME);
synchronized (this)
{
// Establish a bounce back client on the ping queue to bounce back the pings.
- perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath,
- destinationName, persistent, transacted, selector,
- verbose, pubsub);
+ perThreadSetup._testPingBouncer =
+ new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
+ transacted, selector, verbose, pubsub);
// Start the connections for client and producer running.
perThreadSetup._testPingBouncer.getConnection().start();
- // Establish a ping-pong client on the ping queue to send the pings with.
-
- perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath,
- destinationName, selector, transacted, persistent,
- messageSize, verbose, failAfterCommit,
- failBeforeCommit, failAfterSend, failBeforeSend,
- failOnce, batchSize, 0, rate, pubsub, unique,
- ackMode, pause);
+ // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
+ perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
+ perThreadSetup._testPingProducer.establishConnection(true, true);
perThreadSetup._testPingProducer.getConnection().start();
}
|