diff options
18 files changed, 555 insertions, 178 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f17a6fb60a..a418bb8f8a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -118,7 +118,7 @@ public class AMQQueue implements Managable, Comparable /** max allowed number of messages on a queue. */ @Configured(path = "maximumMessageCount", defaultValue = "0") - public int _maximumMessageCount; + public long _maximumMessageCount; /** max queue depth for the queue */ @Configured(path = "maximumQueueDepth", defaultValue = "0") @@ -350,12 +350,12 @@ public class AMQQueue implements Managable, Comparable return _totalMessagesReceived.get(); } - public int getMaximumMessageCount() + public long getMaximumMessageCount() { return _maximumMessageCount; } - public void setMaximumMessageCount(int value) + public void setMaximumMessageCount(long value) { _maximumMessageCount = value; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 056fb5fc01..7a32848c44 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -20,6 +20,8 @@ package org.apache.qpid.server.queue; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Date; +import java.text.SimpleDateFormat; import javax.management.JMException; import javax.management.MBeanException; @@ -44,6 +46,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.MBeanConstructor; @@ -58,8 +61,8 @@ import org.apache.qpid.server.store.StoreContext; @MBeanDescription("Management Interface for AMQQueue") public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { - private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class); + private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z"); /** * Since the MBean is not associated with a real channel we can safely create our own store context @@ -197,12 +200,12 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.getReceivedMessageCount(); } - public Integer getMaximumMessageCount() + public Long getMaximumMessageCount() { return _queue.getMaximumMessageCount(); } - public void setMaximumMessageCount(Integer value) + public void setMaximumMessageCount(Long value) { _queue.setMaximumMessageCount(value); } @@ -370,8 +373,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que AMQMessage msg = list.get(i - 1); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list - CommonContentHeaderProperties headerProperties = (CommonContentHeaderProperties) headerBody.properties; - String[] headerAttributes = headerProperties.toString().split(","); + String[] headerAttributes = getMessageHeaderProperties(headerBody); Object[] itemValues = {msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered()}; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); @@ -385,6 +387,35 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _messageList; } + private String[] getMessageHeaderProperties(ContentHeaderBody headerBody) + { + List<String> list = new ArrayList<String>(); + BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties; + list.add("reply-to = " + headerProperties.getReplyToAsString()); + list.add("propertyFlags = " + headerProperties.getPropertyFlags()); + list.add("ApplicationID = " + headerProperties.getAppIdAsString()); + list.add("ClusterID = " + headerProperties.getClusterIdAsString()); + list.add("UserId = " + headerProperties.getUserIdAsString()); + list.add("JMSMessageID = " + headerProperties.getMessageIdAsString()); + list.add("JMSCorrelationID = " + headerProperties.getCorrelationIdAsString()); + + int delMode = headerProperties.getDeliveryMode(); + list.add("JMSDeliveryMode = " + (delMode == 1 ? "Persistent" : "Non_Persistent")); + + list.add("JMSPriority = " + headerProperties.getPriority()); + list.add("JMSType = " + headerProperties.getType()); + + long longDate = headerProperties.getExpiration(); + String strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSExpiration = " + strDate); + + longDate = headerProperties.getTimestamp(); + strDate = (longDate != 0) ? _dateFormat.format(new Date(longDate)) : null; + list.add("JMSTimestamp = " + strDate); + + return list.toArray(new String[list.size()]); + } + /** * @see ManagedQueue#moveMessages * @param fromMessageId diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 879080e10c..cfa13c87fd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -401,7 +401,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _lock.lock(); AMQMessage message = _messages.poll(); - _totalMessageSize.addAndGet(-message.getSize()); + if (message != null) + { + _totalMessageSize.addAndGet(-message.getSize()); + } _lock.unlock(); } @@ -539,7 +542,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { subscriberHasPendingResend(false, sub, null); //better to use the above method as this keeps all the tracking in one location. -// _hasContent.remove(sub); + // _hasContent.remove(sub); } _extraMessages.decrementAndGet(); @@ -552,7 +555,10 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - _totalMessageSize.addAndGet(-message.getSize()); + if ((message != null) && (messageQueue == _messages)) + { + _totalMessageSize.addAndGet(-message.getSize()); + } } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java index 9b926be82d..061ab56024 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java @@ -156,7 +156,7 @@ public interface ManagedQueue * @return maximum muber of message allowed to be stored in the queue. * @throws IOException */ - Integer getMaximumMessageCount() throws IOException; + Long getMaximumMessageCount() throws IOException; /** * Sets the maximum number of messages allowed to be stored in the queue. @@ -164,7 +164,7 @@ public interface ManagedQueue * @throws IOException */ @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue") - void setMaximumMessageCount(Integer value) throws IOException; + void setMaximumMessageCount(Long value) throws IOException; /** * This is useful for setting notifications or taking required action if the size of messages diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index bc8e1232a7..00ccffdea1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -27,7 +27,7 @@ public enum NotificationCheck boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount = queue.getMessageCount();
- final int maximumMessageCount = queue.getMaximumMessageCount();
+ final long maximumMessageCount = queue.getMaximumMessageCount();
if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
{
listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 1eb3506720..236291968f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -42,7 +42,7 @@ import java.util.HashSet; /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */ public class AMQQueueAlertTest extends TestCase { - private final static int MAX_MESSAGE_COUNT = 50; + private final static long MAX_MESSAGE_COUNT = 50; private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB private final static long MAX_QUEUE_DEPTH = 10000; // 10 KB @@ -175,7 +175,7 @@ public class AMQQueueAlertTest extends TestCase new AMQShortString("consumer_tag"), true, null, false, false); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); - _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested + _queueMBean.setMaximumMessageCount(9999l); // Set a high value, because this is not being tested _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); // Send messages(no of message to be little more than what can cause a Queue_Depth alert) @@ -268,9 +268,9 @@ public class AMQQueueAlertTest extends TestCase _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); } - private void sendMessages(int messageCount, long size) throws AMQException + private void sendMessages(long messageCount, long size) throws AMQException { - AMQMessage[] messages = new AMQMessage[messageCount]; + AMQMessage[] messages = new AMQMessage[(int)messageCount]; for (int i = 0; i < messages.length; i++) { messages[i] = message(false, size); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 182c6a2d01..551eb8f0a0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -116,7 +116,7 @@ public class AMQQueueMBeanTest extends TestCase public void testGeneralProperties() { long maxQueueDepth = 1000; // in bytes - _queueMBean.setMaximumMessageCount(50000); + _queueMBean.setMaximumMessageCount(50000l); _queueMBean.setMaximumMessageSize(2000l); _queueMBean.setMaximumQueueDepth(maxQueueDepth); diff --git a/java/distribution/src/main/assembly/bin.xml b/java/distribution/src/main/assembly/bin.xml index 27eb239d28..fc782b26bb 100644 --- a/java/distribution/src/main/assembly/bin.xml +++ b/java/distribution/src/main/assembly/bin.xml @@ -95,6 +95,12 @@ <fileMode>420</fileMode> </file> <file> + <source>../broker/etc/passwdVhost</source> + <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> + <destName>passwdVhost</destName> + <fileMode>420</fileMode> + </file> + <file> <source>../broker/etc/qpid-server.conf</source> <outputDirectory>qpid-${qpid.version}/etc</outputDirectory> <destName>qpid-server.conf</destName> diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java index 489455bb4e..91dec841cf 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/Constants.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.management.ui; +import static org.apache.qpid.management.ui.Constants.CONNECTION_PROTOCOLS; + /** * Contains constants for the application * @author Bhupendra Bhardwaj @@ -44,6 +46,11 @@ public class Constants public final static String RESULT = "Result"; public final static String VIRTUAL_HOST = "VirtualHost"; public final static String DEFAULT_VH = "Default"; + public final static String DEFAULT_USERNAME = "guest"; + public final static String DEFAULT_PASSWORD = "guest"; + + public final static String USERNAME = "Username"; + public final static String PASSWORD = "Password"; // Attributes and operations are used to customize the GUI for Qpid. If these are changes in the // Qpid server, then these should be updated accordingly @@ -65,9 +72,13 @@ public class Constants public final static String EXCHANGE_TYPE = "ExchangeType"; public final static String[] EXCHANGE_TYPE_VALUES = {"direct", "topic", "headers"}; public final static String[] BOOLEAN_TYPE_VALUES = {"false", "true"}; - public final static String[] ATTRIBUTE_TABLE_TITLES = {"Attribute Name", "Value"}; + public final static String[] ATTRIBUTE_TABLE_TITLES = {"Attribute Name", "Value"}; + public static final String[] CONNECTION_PROTOCOLS ={"RMI"}; + public static final String DEFAULT_PROTOCOL = CONNECTION_PROTOCOLS[0]; public final static String ACTION_ADDSERVER = "New Connection"; + public final static String ACTION_RECONNECT = "Reconnect"; + public final static String ACTION_LOGIN = "Login"; public final static String QUEUE_SORT_BY_NAME = "Queue Name"; public final static String QUEUE_SORT_BY_DEPTH = "Queue Depth"; @@ -105,4 +116,11 @@ public class Constants public final static int OPERATION_IMPACT_ACTION = 1; public final static int OPERATION_IMPACT_ACTIONINFO = 2; public final static int OPERATION_IMPACT_UNKNOWN = 3; + + public final static String ERROR_SERVER_CONNECTION = "Server could not be connected"; + public final static String INFO_PROTOCOL = "Please select the protocol"; + public final static String INFO_HOST_ADDRESS = "Please enter the host address"; + public final static String INFO_HOST_PORT = "Please enter the port number"; + public final static String INFO_USERNAME = "Please enter the " + USERNAME; + public final static String INFO_PASSWORD = "Please enter the " + PASSWORD; } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ManagedServer.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ManagedServer.java index e3699bb1ee..480fdb429a 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ManagedServer.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ManagedServer.java @@ -20,60 +20,84 @@ */ package org.apache.qpid.management.ui; +import static org.apache.qpid.management.ui.Constants.DEFAULT_PROTOCOL; /** * Class representing a server being managed eg. MBeanServer * @author Bhupendra Bhardwaj */ public class ManagedServer extends ManagedObject { - private String host; - private String port; - private String url; - private String domain; - - public ManagedServer(String host, String port, String domain) + private String _host; + private int _port; + private String _url; + private String _domain; + private String _user; + private String _password; + private String _protocol = DEFAULT_PROTOCOL; + + public ManagedServer(String host, int port, String domain) { - this.host = host; - this.port = port; - this.domain = domain; - setName(host + ":" + port); + this(host, port, domain, null, null); } - public ManagedServer(String url, String domain) + public ManagedServer(String host, int port, String domain, String user, String password) { - this.url = url; - this.domain = domain; + setName(host + ":" + port); + _host = host; + _port = port; + _domain = domain; + _url = getRMIURL(host, port); + _user = user; + _password = password; } - + public String getDomain() { - return domain; + return _domain; } public String getHost() { - return host; + return _host; } - public String getPort() + public int getPort() { - return port; + return _port; } public String getUrl() { - return url; + return _url; + } + + public String getProtocol() + { + return _protocol; } - public void setHostAndPort(String host, String port) + public String getPassword() { - this.host = host; - this.port = port; - setName(host + ":" + port); + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getUser() + { + return _user; + } + + public void setUser(String user) + { + _user = user; } - public void setUrl(String url) + private String getRMIURL(String host, int port) { - this.url = url; + return "service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi"; } } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/AddServer.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/AddServer.java index 3724dfb33f..5a926e6474 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/AddServer.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/AddServer.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.management.ui.actions; +import static org.apache.qpid.management.ui.Constants.*; + import org.apache.qpid.management.ui.ApplicationRegistry; import org.apache.qpid.management.ui.ApplicationWorkbenchAdvisor; -import org.apache.qpid.management.ui.Constants; import org.apache.qpid.management.ui.exceptions.InfoRequiredException; import org.apache.qpid.management.ui.views.NavigationView; +import org.apache.qpid.management.ui.views.NumberVerifyListener; import org.apache.qpid.management.ui.views.ViewUtility; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; @@ -34,8 +36,6 @@ import org.eclipse.jface.viewers.ISelection; import org.eclipse.swt.SWT; import org.eclipse.swt.events.SelectionAdapter; import org.eclipse.swt.events.SelectionEvent; -import org.eclipse.swt.events.VerifyEvent; -import org.eclipse.swt.events.VerifyListener; import org.eclipse.swt.layout.GridData; import org.eclipse.swt.layout.GridLayout; import org.eclipse.swt.widgets.Button; @@ -51,8 +51,17 @@ import org.eclipse.ui.IWorkbenchWindowActionDelegate; public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDelegate { private IWorkbenchWindow _window; - private static final String[] _connectionTypes ={"RMI"}; private static final String[] _domains ={"org.apache.qpid"}; + + private NavigationView _navigationView; + private String _transport = DEFAULT_PROTOCOL; + private String _host; + private String _port; + private String _domain; + private String _user; + private String _password; + + private boolean _addServer; public AddServer() { @@ -76,20 +85,38 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg { if(_window != null) { + reset(); + createAddServerPopup(); try { - // TODO - //_window.getActivePage().showView(NavigationView.ID, Integer.toString(0), IWorkbenchPage.VIEW_ACTIVATE); - //_window.getActivePage().showView(MBeanView.ID, Integer.toString(0), IWorkbenchPage.VIEW_ACTIVATE); + if (_addServer) + { + getNavigationView().addNewServer(_transport, _host, Integer.parseInt(_port), _domain, _user, _password); + } } - catch (Exception ex) + catch(InfoRequiredException ex) { - + ViewUtility.popupInfoMessage("New connection", ex.getMessage()); + } + catch(Exception ex) + { + IStatus status = new Status(IStatus.ERROR, ApplicationWorkbenchAdvisor.PERSPECTIVE_ID, + IStatus.OK, ex.getMessage(), ex.getCause()); + ErrorDialog.openError(_window.getShell(), "Error", ERROR_SERVER_CONNECTION, status); } - createWidgets(); } } + private void reset() + { + _addServer = false; + _host = null; + _port = null; + _domain = null; + _user = null; + _password = null; + } + /** * Selection in the workbench has been changed. We * can change the state of the 'real' action here @@ -117,6 +144,15 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg this._window = window; } + private NavigationView getNavigationView() + { + if (_navigationView == null) + { + _navigationView = (NavigationView)_window.getActivePage().findView(NavigationView.ID); + } + + return _navigationView; + } /* public void run() @@ -127,18 +163,49 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg } } */ - private void createWidgets() + + /** + * Creates the shell and then opens the popup where user can enter new connection details. + * Connects to the new server and adds the server in the navigation page. + * Pops up any error occured in connecting to the new server + */ + private void createAddServerPopup() { Display display = Display.getCurrent(); final Shell shell = new Shell(display, SWT.BORDER | SWT.CLOSE); - shell.setText(Constants.ACTION_ADDSERVER); - shell.setImage(ApplicationRegistry.getImage(Constants.CONSOLE_IMAGE)); + shell.setText(ACTION_ADDSERVER); + shell.setImage(ApplicationRegistry.getImage(CONSOLE_IMAGE)); shell.setLayout(new GridLayout()); int x = display.getBounds().width; int y = display.getBounds().height; - shell.setBounds(x/4, y/4, 425, 250); - + shell.setBounds(x/3, y/3, 425, 275); + + createWidgets(shell); + + shell.open(); + _window.getShell().setEnabled(false); + + while (!shell.isDisposed()) + { + if (!display.readAndDispatch()) + { + display.sleep(); + } + } + + //If you create it, you dispose it. + shell.dispose(); + + // enable the main shell + _window.getShell().setEnabled(true); + _window.getShell().open(); + } + + // Creates SWT widgets for the user to add server connection details. + // Adds listeners to the widgets to take appropriate action + private void createWidgets(final Shell shell) + { Composite composite = new Composite(shell, SWT.NONE); composite.setLayoutData(new GridData(SWT.FILL, SWT.FILL, true, true)); GridLayout layout = new GridLayout(2, false); @@ -148,15 +215,18 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg layout.marginWidth = 20; composite.setLayout(layout); + /* Commenting this, as there is only one protocol at the moment. + * This can be uncommented and enhanced, if more protocols are added in future Label name = new Label(composite, SWT.NONE); name.setText("Connection Type"); GridData layoutData = new GridData(SWT.TRAIL, SWT.TOP, false, false); name.setLayoutData(layoutData); final Combo comboTransport = new Combo(composite, SWT.READ_ONLY); - comboTransport.setItems(_connectionTypes); + comboTransport.setItems(CONNECTION_PROTOCOLS); comboTransport.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); comboTransport.select(0); + */ Label host = new Label(composite, SWT.NONE); host.setText("Host"); @@ -166,20 +236,6 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg textHost.setText(""); textHost.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); textHost.setFocus(); - /* - //Commented so that it lets users copy paste the host name from somewhere else - textHost.addVerifyListener(new VerifyListener(){ - public void verifyText(VerifyEvent event) - { - if (!(Character.isLetterOrDigit(event.character) || - (event.character == '.') || - (event.character == '\b') )) - { - event.doit = false; - } - } - }); - */ Label port = new Label(composite, SWT.NONE); port.setText("Port"); @@ -188,20 +244,9 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg final Text textPort = new Text(composite, SWT.BORDER); textPort.setText(""); textPort.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); - /* commented to allow copy paste on this window - textPort.addVerifyListener(new VerifyListener(){ - public void verifyText(VerifyEvent event) - { - if (textPort.getText().length() == 4) - event.doit = false; - else if (!(Character.isDigit(event.character) || - (event.character == '\b'))) - { - event.doit = false; - } - } - }); - */ + // Verify if the value entered is numeric + textPort.addVerifyListener(new NumberVerifyListener()); + Label domain = new Label(composite, SWT.NONE); domain.setText("Domain"); @@ -212,75 +257,96 @@ public class AddServer/* extends Action*/ implements IWorkbenchWindowActionDeleg comboDomain.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); comboDomain.select(0); + + Label user = new Label(composite, SWT.NONE); + user.setText(USERNAME); + user.setLayoutData(new GridData(SWT.TRAIL, SWT.TOP, false, false)); + + final Text textUser = new Text(composite, SWT.BORDER); + textUser.setText(DEFAULT_USERNAME); + textUser.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + + Label password = new Label(composite, SWT.NONE); + password.setText(PASSWORD); + password.setLayoutData(new GridData(SWT.TRAIL, SWT.TOP, false, false)); + + final Text textPwd = new Text(composite, SWT.BORDER | SWT.SINGLE | SWT.PASSWORD); + textPwd.setText(DEFAULT_PASSWORD); + //textPwd.setEchoChar('*'); + textPwd.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + Composite buttonsComposite = new Composite(composite, SWT.NONE); buttonsComposite.setLayoutData(new GridData(SWT.FILL, SWT.FILL, true, true, 2, 1)); buttonsComposite.setLayout(new GridLayout(2, true)); final Button connectButton = new Button(buttonsComposite, SWT.PUSH | SWT.CENTER); - connectButton.setText(Constants.BUTTON_CONNECT); + connectButton.setText(BUTTON_CONNECT); GridData gridData = new GridData (SWT.TRAIL, SWT.BOTTOM, true, true); gridData.widthHint = 100; connectButton.setLayoutData(gridData); - connectButton.setFont(ApplicationRegistry.getFont(Constants.FONT_BUTTON)); + connectButton.setFont(ApplicationRegistry.getFont(FONT_BUTTON)); connectButton.addSelectionListener(new SelectionAdapter(){ - public void widgetSelected(SelectionEvent event) + public void widgetSelected(SelectionEvent event) + { + _host = textHost.getText(); + if ((_host == null) || (_host.trim().length() == 0)) + { + ViewUtility.popupInfoMessage(ACTION_ADDSERVER, INFO_HOST_ADDRESS); + textHost.setText(""); + textHost.setFocus(); + return; + } + + _port = textPort.getText(); + if ((_port == null) || (_port.trim().length() == 0)) + { + ViewUtility.popupInfoMessage(ACTION_ADDSERVER, INFO_HOST_PORT); + textPort.setText(""); + textPort.setFocus(); + return; + } + + _user = textUser.getText(); + if ((_user == null) || (_user.trim().length() == 0)) + { + ViewUtility.popupInfoMessage(ACTION_ADDSERVER, INFO_USERNAME); + textUser.setText(""); + textUser.setFocus(); + return; + } + + _password = textPwd.getText(); + if (_password == null) { - String transport = comboTransport.getText(); - String host = textHost.getText(); - String port = textPort.getText(); - String domain = comboDomain.getText(); - - NavigationView view = (NavigationView)_window.getActivePage().findView(NavigationView.ID); - try - { - view.addNewServer(transport, host, port, domain); - - if (!connectButton.getShell().isDisposed()) - connectButton.getShell().dispose(); - } - catch(InfoRequiredException ex) - { - ViewUtility.popupInfoMessage("New connection", ex.getMessage()); - } - catch(Exception ex) - { - IStatus status = new Status(IStatus.ERROR, ApplicationWorkbenchAdvisor.PERSPECTIVE_ID, - IStatus.OK, ex.getMessage(), ex.getCause()); - ErrorDialog.openError(shell, "Error", "Server could not be added", status); - } + ViewUtility.popupInfoMessage(ACTION_ADDSERVER, INFO_PASSWORD); + textPwd.setText(""); + textPwd.setFocus(); + return; } - }); + + _domain = comboDomain.getText(); + _addServer = true; + + if (!connectButton.getShell().isDisposed()) + { + connectButton.getShell().dispose(); + } + } + }); final Button cancelButton = new Button(buttonsComposite, SWT.PUSH); - cancelButton.setText(Constants.BUTTON_CANCEL); + cancelButton.setText(BUTTON_CANCEL); gridData = new GridData (SWT.LEAD, SWT.BOTTOM, true, true); gridData.widthHint = 100; cancelButton.setLayoutData(gridData); - cancelButton.setFont(ApplicationRegistry.getFont(Constants.FONT_BUTTON)); + cancelButton.setFont(ApplicationRegistry.getFont(FONT_BUTTON)); cancelButton.addSelectionListener(new SelectionAdapter(){ - public void widgetSelected(SelectionEvent event) - { - shell.dispose(); - } - }); - - shell.open(); - _window.getShell().setEnabled(false); - while (!shell.isDisposed()) - { - if (!display.readAndDispatch()) + public void widgetSelected(SelectionEvent event) { - display.sleep(); + shell.dispose(); } - } - - //If you create it, you dispose it. - shell.dispose(); - - // enable the main shell - _window.getShell().setEnabled(true); - _window.getShell().open(); + }); } } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/ReconnectServer.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/ReconnectServer.java index 25337f3fbe..8fe08462cd 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/ReconnectServer.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/actions/ReconnectServer.java @@ -20,21 +20,44 @@ */ package org.apache.qpid.management.ui.actions; +import static org.apache.qpid.management.ui.Constants.*; + +import org.apache.qpid.management.ui.ApplicationRegistry; import org.apache.qpid.management.ui.ApplicationWorkbenchAdvisor; +import org.apache.qpid.management.ui.Constants; import org.apache.qpid.management.ui.exceptions.InfoRequiredException; import org.apache.qpid.management.ui.views.NavigationView; +import org.apache.qpid.management.ui.views.TreeObject; import org.apache.qpid.management.ui.views.ViewUtility; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; import org.eclipse.jface.action.IAction; import org.eclipse.jface.dialogs.ErrorDialog; import org.eclipse.jface.viewers.ISelection; +import org.eclipse.swt.SWT; +import org.eclipse.swt.events.SelectionAdapter; +import org.eclipse.swt.events.SelectionEvent; +import org.eclipse.swt.layout.GridData; +import org.eclipse.swt.layout.GridLayout; +import org.eclipse.swt.widgets.Button; +import org.eclipse.swt.widgets.Combo; +import org.eclipse.swt.widgets.Composite; +import org.eclipse.swt.widgets.Display; +import org.eclipse.swt.widgets.Label; +import org.eclipse.swt.widgets.Shell; +import org.eclipse.swt.widgets.Text; import org.eclipse.ui.IWorkbenchWindow; import org.eclipse.ui.IWorkbenchWindowActionDelegate; public class ReconnectServer implements IWorkbenchWindowActionDelegate { private IWorkbenchWindow _window; + private NavigationView _navigationView; + private String _title; + private String _serverName; + private String _user; + private String _password; + private boolean _connect; /** * Selection in the workbench has been changed. We @@ -68,14 +91,36 @@ public class ReconnectServer implements IWorkbenchWindowActionDelegate this._window = window; } + private NavigationView getNavigationView() + { + if (_navigationView == null) + { + _navigationView = (NavigationView)_window.getActivePage().findView(NavigationView.ID); + } + + return _navigationView; + } + public void run(IAction action) { if(_window != null) { - NavigationView view = (NavigationView)_window.getActivePage().findView(NavigationView.ID); try { - view.reconnect(); + reset(); + // Check if a server node is selected to be reconnected. + TreeObject serverNode = getNavigationView().getSelectedServerNode(); + _serverName = serverNode.getName(); + _title = ACTION_LOGIN + " (" + _serverName + ")"; + + // Get the login details(username/password) + createLoginPopup(); + + if (_connect) + { + // Connect the server + getNavigationView().reconnect(_user, _password); + } } catch(InfoRequiredException ex) { @@ -85,8 +130,136 @@ public class ReconnectServer implements IWorkbenchWindowActionDelegate { IStatus status = new Status(IStatus.ERROR, ApplicationWorkbenchAdvisor.PERSPECTIVE_ID, IStatus.OK, ex.getMessage(), ex.getCause()); - ErrorDialog.openError(_window.getShell(), "Error", "Server could not be connected", status); + ErrorDialog.openError(_window.getShell(), "Error", ERROR_SERVER_CONNECTION, status); } } } + + private void reset() + { + _connect = false; + _user = null; + _password = null; + } + + // Create the login popup fot th user to enter usernaem and password + private void createLoginPopup() + { + Display display = Display.getCurrent(); + final Shell shell = new Shell(display, SWT.BORDER | SWT.CLOSE); + shell.setText(_title); + shell.setImage(ApplicationRegistry.getImage(CONSOLE_IMAGE)); + shell.setLayout(new GridLayout()); + + int x = display.getBounds().width; + int y = display.getBounds().height; + shell.setBounds(x/3, y/3, 350, 200); + + createWidgets(shell); + + shell.open(); + _window.getShell().setEnabled(false); + + while (!shell.isDisposed()) + { + if (!display.readAndDispatch()) + { + display.sleep(); + } + } + + //If you create it, you dispose it. + shell.dispose(); + + // enable the main shell + _window.getShell().setEnabled(true); + _window.getShell().open(); + } + + // Creates the SWT widgets in the popup shell, to enter username and password. + // Adds listeners to the widgets to take appropriate action + private void createWidgets(final Shell shell) + { + Composite composite = new Composite(shell, SWT.NONE); + composite.setLayoutData(new GridData(SWT.FILL, SWT.FILL, true, true)); + GridLayout layout = new GridLayout(2, false); + layout.horizontalSpacing = 10; + layout.verticalSpacing = 10; + layout.marginHeight = 20; + layout.marginWidth = 20; + composite.setLayout(layout); + + Label user = new Label(composite, SWT.NONE); + user.setText(USERNAME); + user.setLayoutData(new GridData(SWT.TRAIL, SWT.TOP, false, false)); + + final Text textUser = new Text(composite, SWT.BORDER); + textUser.setText(DEFAULT_USERNAME); + textUser.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + // Put cursor on this field + textUser.setFocus(); + + Label password = new Label(composite, SWT.NONE); + password.setText(PASSWORD); + password.setLayoutData(new GridData(SWT.TRAIL, SWT.TOP, false, false)); + + final Text textPwd = new Text(composite, SWT.BORDER | SWT.SINGLE | SWT.PASSWORD); + textPwd.setText(DEFAULT_PASSWORD); + textPwd.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); + + Composite buttonsComposite = new Composite(composite, SWT.NONE); + buttonsComposite.setLayoutData(new GridData(SWT.FILL, SWT.FILL, true, true, 2, 1)); + buttonsComposite.setLayout(new GridLayout(2, true)); + + + final Button connectButton = new Button(buttonsComposite, SWT.PUSH | SWT.CENTER); + connectButton.setText(Constants.BUTTON_CONNECT); + GridData gridData = new GridData (SWT.TRAIL, SWT.BOTTOM, true, true); + gridData.widthHint = 100; + connectButton.setLayoutData(gridData); + connectButton.setFont(ApplicationRegistry.getFont(Constants.FONT_BUTTON)); + connectButton.addSelectionListener(new SelectionAdapter(){ + public void widgetSelected(SelectionEvent event) + { + _user = textUser.getText(); + if ((_user == null) || (_user.trim().length() == 0)) + { + ViewUtility.popupInfoMessage(_title, INFO_USERNAME); + textUser.setText(""); + textUser.setFocus(); + return; + } + + _password = textPwd.getText(); + if (_password == null) + { + ViewUtility.popupInfoMessage(_title, INFO_PASSWORD); + textPwd.setText(""); + textPwd.setFocus(); + return; + } + + _connect = true; + + if (!connectButton.getShell().isDisposed()) + { + connectButton.getShell().dispose(); + } + } + }); + + final Button cancelButton = new Button(buttonsComposite, SWT.PUSH); + cancelButton.setText(Constants.BUTTON_CANCEL); + gridData = new GridData (SWT.LEAD, SWT.BOTTOM, true, true); + gridData.widthHint = 100; + cancelButton.setLayoutData(gridData); + cancelButton.setFont(ApplicationRegistry.getFont(Constants.FONT_BUTTON)); + cancelButton.addSelectionListener(new SelectionAdapter(){ + public void widgetSelected(SelectionEvent event) + { + shell.dispose(); + } + }); + } + } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java index c86c5f87ab..b0f9928c38 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/JMXServerRegistry.java @@ -83,7 +83,9 @@ public class JMXServerRegistry extends ServerRegistry { super(server); JMXServiceURL jmxUrl = new JMXServiceURL(server.getUrl()); - Map<String, Object> env = null; + Map<String, Object> env = new HashMap<String, Object>(); + String[] creds = {server.getUser(), server.getPassword()}; + env.put(JMXConnector.CREDENTIALS, creds); _jmxc = JMXConnectorFactory.connect(jmxUrl, env); _mbsc = _jmxc.getMBeanServerConnection(); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java index 8b4fd3afb5..5ceeb879b4 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/jmx/MBeanUtility.java @@ -21,6 +21,7 @@ package org.apache.qpid.management.ui.jmx; import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -65,6 +66,9 @@ public class MBeanUtility String debug = System.getProperty("debug"); _debug = "true".equalsIgnoreCase(debug) ? true : false; } + + public static final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE); + public static final BigInteger MAX_INT = BigInteger.valueOf(Integer.MAX_VALUE); /** * Retrieves the MBeanInfo from MBeanServer and stores in the application registry * @param mbean managed bean @@ -333,10 +337,19 @@ public class MBeanUtility Object newValue = value; if (attribute.getDataType().equals(Long.class.getName())) { + if (MAX_LONG.compareTo(new BigInteger(value)) == -1) + { + throw new ManagementConsoleException("Entered value is too big for \"" + + ViewUtility.getDisplayText(attribute.getName()) + "\""); + } newValue = new Long(Long.parseLong(value)); } else if (attribute.getDataType().equals(Integer.class.getName())) { + if (MAX_INT.compareTo(new BigInteger(value)) == -1) + { + throw new ManagementConsoleException("Entered value is too big for " + attribute.getName()); + } newValue = new Integer(Integer.parseInt(value)); } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java index e3a7402c52..437afeeda1 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java @@ -488,7 +488,7 @@ public class AttributesTabControl extends TabControl GridData layoutData = new GridData(SWT.TRAIL, SWT.TOP, false, false); label.setLayoutData(layoutData); Text value = new Text(parent, SWT.BEGINNING | SWT.BORDER |SWT.READ_ONLY); - value.setText(attribute.getName()); + value.setText(ViewUtility.getDisplayText(attribute.getName())); value.setLayoutData(new GridData(SWT.FILL, SWT.TOP, true, false)); @@ -527,23 +527,7 @@ public class AttributesTabControl extends TabControl { style = SWT.BEGINNING | SWT.BORDER; value = new Text(parent, style); - value.addVerifyListener(new VerifyListener() - { - public void verifyText(VerifyEvent event) - { - String string = event.text; - char [] chars = new char [string.length ()]; - string.getChars (0, chars.length, chars, 0); - for (int i=0; i<chars.length; i++) - { - if (!('0' <= chars [i] && chars [i] <= '9')) - { - event.doit = false; - return; - } - } - } - }); + value.addVerifyListener(new NumberVerifyListener()); // set data to access in the listener parent.setData(attribute); diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java index 4dd06415c5..e9215a4876 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NavigationView.java @@ -215,11 +215,6 @@ public class NavigationView extends ViewPart } } - private String getRMIURL(String host) - { - return "service:jmx:rmi:///jndi/rmi://" + host + "/jmxrmi"; - } - /** * Adds a new server node in the navigation view if server connection is successful. * @param transportProtocol @@ -228,24 +223,30 @@ public class NavigationView extends ViewPart * @param domain * @throws Exception */ - public void addNewServer(String transportProtocol, String host, String port, String domain) - throws Exception + public void addNewServer(String transportProtocol, String host, int port, + String domain, String user, String pwd) throws Exception { String serverAddress = host + ":" + port; String url = null; - ManagedServer managedServer = null; + ManagedServer managedServer = new ManagedServer(host, port, domain, user, pwd); if ("RMI".equals(transportProtocol)) { - url = getRMIURL(serverAddress); + url = managedServer.getUrl(); List<TreeObject> list = _serversRootNode.getChildren(); for (TreeObject node : list) { if (url.equals(node.getUrl())) - throw new InfoRequiredException("Server " + serverAddress + " is already added"); + { + // Server is already in the list of added servers, so now connect it. + // Set the server node as selected and then connect it. + _treeViewer.setSelection(new StructuredSelection(node)); + reconnect(user, pwd); + return; + } } - managedServer = new ManagedServer(url, domain); + // The server is not in the list of already added servers, so now connect and add it. managedServer.setName(serverAddress); createRMIServerConnection(managedServer); } @@ -710,14 +711,16 @@ public class NavigationView extends ViewPart * Connects the selected server node * @throws Exception */ - public void reconnect() throws Exception + public void reconnect(String user, String password) throws Exception { TreeObject selectedNode = getSelectedServerNode(); ManagedServer managedServer = (ManagedServer)selectedNode.getManagedObject(); if(_managedServerMap.containsKey(managedServer)) { throw new InfoRequiredException("Server " + managedServer.getName() + " is already connected"); - } + } + managedServer.setUser(user); + managedServer.setPassword(password); createRMIServerConnection(managedServer); // put the server in the managed server map @@ -880,7 +883,7 @@ public class NavigationView extends ViewPart return list; } - private TreeObject getSelectedServerNode() throws Exception + public TreeObject getSelectedServerNode() throws Exception { IStructuredSelection ss = (IStructuredSelection)_treeViewer.getSelection(); TreeObject selectedNode = (TreeObject)ss.getFirstElement(); @@ -935,11 +938,9 @@ public class NavigationView extends ViewPart { for (String serverAddress : serversList) { - String url = getRMIURL(serverAddress); - ManagedServer managedServer = new ManagedServer(url, "org.apache.qpid"); - managedServer.setName(serverAddress); + String[] server = serverAddress.split(":"); + ManagedServer managedServer = new ManagedServer(server[0], Integer.parseInt(server[1]), "org.apache.qpid"); TreeObject serverNode = new TreeObject(serverAddress, NODE_TYPE_SERVER); - serverNode.setUrl(url); serverNode.setManagedObject(managedServer); _serversRootNode.addChild(serverNode); } diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NumberVerifyListener.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NumberVerifyListener.java new file mode 100644 index 0000000000..1774209dae --- /dev/null +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/NumberVerifyListener.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.management.ui.views; + +import org.eclipse.swt.events.VerifyEvent; +import org.eclipse.swt.events.VerifyListener; + +/** + * Implementation of VeryfyListener for numeric values + * @author Bhupendra Bhardwaj + */ +public class NumberVerifyListener implements VerifyListener +{ + public void verifyText(VerifyEvent event) + { + String string = event.text; + char [] chars = new char [string.length ()]; + string.getChars (0, chars.length, chars, 0); + for (int i=0; i<chars.length; i++) + { + if (!('0' <= chars [i] && chars [i] <= '9')) + { + event.doit = false; + return; + } + } + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 2a3aff4692..c6a69807a3 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.jms.MessageProducer;
@@ -723,6 +724,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if ((remainingCount % _txBatchSize) == 0)
{
commitTx(_consumerSession);
+ if (!_consumerSession.getTransacted() &&
+ _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ // Acknowledge the messages when the session is not transacted but client_ack
+ ((AMQSession) _consumerSession).acknowledge();
+ }
}
// Forward the message and remaining count to any interested chained message listener.
|