summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java45
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java5
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java2
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java24
-rw-r--r--java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java4
-rw-r--r--java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java4
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java7
16 files changed, 155 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 225fbec930..de9dc42de8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -280,4 +280,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
ConfigurationPlugin getConfiguration();
ManagedObject getManagedObject();
+
+ void setExclusive(boolean exclusive) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 806b7f3744..32b71a554b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -274,6 +274,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _queue.isOverfull();
}
+ public boolean isExclusive()
+ {
+ return _queue.isExclusive();
+ }
+
+ public void setExclusive(boolean exclusive) throws JMException
+ {
+ try
+ {
+ _queue.setExclusive(exclusive);
+ }
+ catch (AMQException e)
+ {
+ throw new JMException(e.toString());
+ }
+ }
+
/**
* Checks if there is any notification to be send to the listeners
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d47d229658..489a724254 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -53,6 +53,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -328,6 +330,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _exclusive;
}
+
+ public void setExclusive(boolean exclusive) throws AMQException
+ {
+ _exclusive = exclusive;
+
+ if(isDurable())
+ {
+ getVirtualHost().getDurableConfigurationStore().updateQueue(this);
+ }
+ }
public Exchange getAlternateExchange()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 40f265e00f..627f059c53 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -94,6 +94,7 @@ public class DerbyMessageStore implements MessageStore
private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME;
private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+ private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?";
private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
private static final String SELECT_FROM_BINDINGS =
"SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
@@ -845,6 +846,50 @@ public class DerbyMessageStore implements MessageStore
}
}
}
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * NOTE: Currently only updates the exclusivity.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ public void updateQueue(final AMQQueue queue) throws AMQException
+ {
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
+ stmt.setString(1, queue.getNameShortString().toString());
+
+ ResultSet rs = stmt.executeQuery();
+
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY);
+
+ stmt2.setBoolean(1,queue.isExclusive());
+ stmt2.setString(2, queue.getNameShortString().toString());
+
+ stmt2.execute();
+ stmt2.close();
+ }
+
+ stmt.close();
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e);
+ }
+ }
+
+ }
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index a50e8e99b4..c169e3bcff 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -118,4 +118,13 @@ public interface DurableConfigurationStore
* @throws org.apache.qpid.AMQException If the operation fails for any reason.
*/
void removeQueue(AMQQueue queue) throws AMQException;
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void updateQueue(AMQQueue queue) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 565afd2539..9d9312cd26 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -148,6 +148,11 @@ public class MemoryMessageStore implements MessageStore
{
// Not required to do anything
}
+
+ public void updateQueue(final AMQQueue queue) throws AMQException
+ {
+ // Not required to do anything
+ }
public void configureTransactionLog(String name,
TransactionLogRecoveryHandler recoveryHandler,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index fcb06f56bf..c6055f35f6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -765,6 +765,10 @@ public class VirtualHostImpl implements VirtualHost
arguments = args;
}
}
+
+ public void updateQueue(AMQQueue queue) throws AMQException
+ {
+ }
}
@Override
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 6a98a255f8..3735ef123d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -197,7 +197,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertTrue(_queueMBean.getConsumerCount() == 3);
}
- public void testGeneralProperties()
+ public void testGeneralProperties() throws Exception
{
long maxQueueDepth = 1000; // in bytes
_queueMBean.setMaximumMessageCount(50000l);
@@ -211,7 +211,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertEquals("Queue Name does not match", new AMQShortString(getName()), _queueMBean.getName());
assertFalse("AutoDelete should not be set.",_queueMBean.isAutoDelete());
assertFalse("Queue should not be durable.",_queueMBean.isDurable());
- //TODO add isExclusive when supported
+
+ //set+get exclusivity using the mbean, and also verify it is actually updated in the queue
+ _queueMBean.setExclusive(true);
+ assertTrue("Exclusive property should be true.",_queueMBean.isExclusive());
+ assertTrue("Exclusive property should be true.",_queue.isExclusive());
+ _queueMBean.setExclusive(false);
+ assertFalse("Exclusive property should be false.",_queueMBean.isExclusive());
+ assertFalse("Exclusive property should be false.",_queue.isExclusive());
}
public void testExceptions() throws Exception
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 51b049787c..9bbae6df01 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -599,4 +599,9 @@ public class MockAMQQueue implements AMQQueue
{
return 0;
}
+
+ public void setExclusive(boolean exclusive)
+ {
+
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 9c12242a07..8cbc50ecbe 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -217,4 +217,9 @@ public class SkeletonMessageStore implements MessageStore
};
}
+ public void updateQueue(AMQQueue queue) throws AMQException
+ {
+
+ }
+
}
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
index 50acc264e6..e5ca69eabb 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
@@ -112,7 +112,7 @@ public interface ManagedExchange
* @param routingKey the routing key
* @throws IOException
* @throws JMException
- * @since 1.8
+ * @since Qpid JMX API 1.8
*/
@MBeanOperation(name="removeBinding",
description="Removes an exchange binding from the Queue",
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
index 282fc86ced..20e97adf8c 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
@@ -70,6 +70,7 @@ public interface ManagedQueue
String ATTR_CAPACITY = "Capacity";
String ATTR_FLOW_OVERFULL = "FlowOverfull";
String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity";
+ String ATTR_EXCLUSIVE = "Exclusive";
//All attribute names constant
String[] QUEUE_ATTRIBUTES = new String[]{
@@ -88,7 +89,8 @@ public interface ManagedQueue
ATTR_RCVD_MSG_COUNT,
ATTR_CAPACITY,
ATTR_FLOW_OVERFULL,
- ATTR_FLOW_RESUME_CAPACITY
+ ATTR_FLOW_RESUME_CAPACITY,
+ ATTR_EXCLUSIVE
};
/**
@@ -285,6 +287,26 @@ public interface ManagedQueue
*/
@MBeanAttribute(name="FlowOverfull", description="true if the queue is considered overfull by the Flow Control system")
boolean isFlowOverfull() throws IOException;
+
+ /**
+ * Returns whether the queue is exclusive or not.
+ *
+ * @since Qpid JMX API 2.0
+ * @return whether the queue is exclusive.
+ * @throws IOException
+ */
+ boolean isExclusive() throws IOException;
+
+ /**
+ * Sets whether the queue is exclusive or not.
+ *
+ * @since Qpid JMX API 2.0
+ * @param exclusive the capacity in bytes
+ * @throws IOException
+ * @throws JMException
+ */
+ @MBeanAttribute(name="Exclusive", description="Whether the queue is Exclusive or not")
+ void setExclusive(boolean exclusive) throws IOException, JMException;
//********** Operations *****************//
diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
index f61c41dea9..618403fdca 100644
--- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
+++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java
@@ -41,8 +41,8 @@ public interface ServerInformation
* indicates that version. If it is not present then a null value will be returned upon inspection and
* Qpid JMX API 1.1 can be assumed.
*/
- int QPID_JMX_API_MAJOR_VERSION = 1;
- int QPID_JMX_API_MINOR_VERSION = 8;
+ int QPID_JMX_API_MAJOR_VERSION = 2;
+ int QPID_JMX_API_MINOR_VERSION = 0;
/**
diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
index 0d9f2ff678..20cfec3758 100644
--- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
+++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java
@@ -46,8 +46,8 @@ public abstract class ApplicationRegistry
public static final long timeout = Long.parseLong(System.getProperty("timeout", "15000"));
//max supported broker management interface supported by this release of the management console
- public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 1;
- public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 8;
+ public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 2;
+ public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 0;
public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc";
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index b41aa661ea..8131e09b49 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -309,5 +309,12 @@ public class SlowMessageStore implements MessageStore
}
}
+ public void updateQueue(AMQQueue queue) throws AMQException
+ {
+ doPreDelay("updateQueue");
+ _realStore.updateQueue(queue);
+ doPostDelay("updateQueue");
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index a9dafe7ae8..ff80c91fac 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -38,6 +38,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.management.common.mbeans.LoggingManagement;
import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.UserManagement;
/**
@@ -330,6 +331,12 @@ public class JMXTestUtils
ObjectName objectName = getExchangeObjectName("test", exchangeName);
return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, ManagedExchange.class, false);
}
+
+ public ManagedQueue getManagedQueue(String queueName)
+ {
+ ObjectName objectName = getQueueObjectName("test", queueName);
+ return getManagedObject(ManagedQueue.class, objectName);
+ }
public LoggingManagement getLoggingManagement() throws MalformedObjectNameException
{