diff options
16 files changed, 155 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 225fbec930..de9dc42de8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -280,4 +280,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer ConfigurationPlugin getConfiguration(); ManagedObject getManagedObject(); + + void setExclusive(boolean exclusive) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 806b7f3744..32b71a554b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -274,6 +274,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.isOverfull(); } + public boolean isExclusive() + { + return _queue.isExclusive(); + } + + public void setExclusive(boolean exclusive) throws JMException + { + try + { + _queue.setExclusive(exclusive); + } + catch (AMQException e) + { + throw new JMException(e.toString()); + } + } + /** * Checks if there is any notification to be send to the listeners */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d47d229658..489a724254 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -53,6 +53,8 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -328,6 +330,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _exclusive; } + + public void setExclusive(boolean exclusive) throws AMQException + { + _exclusive = exclusive; + + if(isDurable()) + { + getVirtualHost().getDurableConfigurationStore().updateQueue(this); + } + } public Exchange getAlternateExchange() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 40f265e00f..627f059c53 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -94,6 +94,7 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?"; private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; private static final String SELECT_FROM_BINDINGS = "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; @@ -845,6 +846,50 @@ public class DerbyMessageStore implements MessageStore } } } + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * NOTE: Currently only updates the exclusivity. + * + * @param queue The queue to update the entry for. + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + public void updateQueue(final AMQQueue queue) throws AMQException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); + stmt.setString(1, queue.getNameShortString().toString()); + + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY); + + stmt2.setBoolean(1,queue.isExclusive()); + stmt2.setString(2, queue.getNameShortString().toString()); + + stmt2.execute(); + stmt2.close(); + } + + stmt.close(); + conn.close(); + } + catch (SQLException e) + { + throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + } + } + + } /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index a50e8e99b4..c169e3bcff 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -118,4 +118,13 @@ public interface DurableConfigurationStore * @throws org.apache.qpid.AMQException If the operation fails for any reason. */ void removeQueue(AMQQueue queue) throws AMQException; + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * @param queue The queue to update the entry for. + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + void updateQueue(AMQQueue queue) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 565afd2539..9d9312cd26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -148,6 +148,11 @@ public class MemoryMessageStore implements MessageStore { // Not required to do anything } + + public void updateQueue(final AMQQueue queue) throws AMQException + { + // Not required to do anything + } public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index fcb06f56bf..c6055f35f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -765,6 +765,10 @@ public class VirtualHostImpl implements VirtualHost arguments = args; } } + + public void updateQueue(AMQQueue queue) throws AMQException + { + } } @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 6a98a255f8..3735ef123d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -197,7 +197,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertTrue(_queueMBean.getConsumerCount() == 3); } - public void testGeneralProperties() + public void testGeneralProperties() throws Exception { long maxQueueDepth = 1000; // in bytes _queueMBean.setMaximumMessageCount(50000l); @@ -211,7 +211,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertEquals("Queue Name does not match", new AMQShortString(getName()), _queueMBean.getName()); assertFalse("AutoDelete should not be set.",_queueMBean.isAutoDelete()); assertFalse("Queue should not be durable.",_queueMBean.isDurable()); - //TODO add isExclusive when supported + + //set+get exclusivity using the mbean, and also verify it is actually updated in the queue + _queueMBean.setExclusive(true); + assertTrue("Exclusive property should be true.",_queueMBean.isExclusive()); + assertTrue("Exclusive property should be true.",_queue.isExclusive()); + _queueMBean.setExclusive(false); + assertFalse("Exclusive property should be false.",_queueMBean.isExclusive()); + assertFalse("Exclusive property should be false.",_queue.isExclusive()); } public void testExceptions() throws Exception diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 51b049787c..9bbae6df01 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -599,4 +599,9 @@ public class MockAMQQueue implements AMQQueue { return 0; } + + public void setExclusive(boolean exclusive) + { + + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 9c12242a07..8cbc50ecbe 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -217,4 +217,9 @@ public class SkeletonMessageStore implements MessageStore }; } + public void updateQueue(AMQQueue queue) throws AMQException + { + + } + } diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java index 50acc264e6..e5ca69eabb 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java @@ -112,7 +112,7 @@ public interface ManagedExchange * @param routingKey the routing key * @throws IOException * @throws JMException - * @since 1.8 + * @since Qpid JMX API 1.8 */ @MBeanOperation(name="removeBinding", description="Removes an exchange binding from the Queue", diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java index 282fc86ced..20e97adf8c 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java @@ -70,6 +70,7 @@ public interface ManagedQueue String ATTR_CAPACITY = "Capacity"; String ATTR_FLOW_OVERFULL = "FlowOverfull"; String ATTR_FLOW_RESUME_CAPACITY = "FlowResumeCapacity"; + String ATTR_EXCLUSIVE = "Exclusive"; //All attribute names constant String[] QUEUE_ATTRIBUTES = new String[]{ @@ -88,7 +89,8 @@ public interface ManagedQueue ATTR_RCVD_MSG_COUNT, ATTR_CAPACITY, ATTR_FLOW_OVERFULL, - ATTR_FLOW_RESUME_CAPACITY + ATTR_FLOW_RESUME_CAPACITY, + ATTR_EXCLUSIVE }; /** @@ -285,6 +287,26 @@ public interface ManagedQueue */ @MBeanAttribute(name="FlowOverfull", description="true if the queue is considered overfull by the Flow Control system") boolean isFlowOverfull() throws IOException; + + /** + * Returns whether the queue is exclusive or not. + * + * @since Qpid JMX API 2.0 + * @return whether the queue is exclusive. + * @throws IOException + */ + boolean isExclusive() throws IOException; + + /** + * Sets whether the queue is exclusive or not. + * + * @since Qpid JMX API 2.0 + * @param exclusive the capacity in bytes + * @throws IOException + * @throws JMException + */ + @MBeanAttribute(name="Exclusive", description="Whether the queue is Exclusive or not") + void setExclusive(boolean exclusive) throws IOException, JMException; //********** Operations *****************// diff --git a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java index f61c41dea9..618403fdca 100644 --- a/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java +++ b/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ServerInformation.java @@ -41,8 +41,8 @@ public interface ServerInformation * indicates that version. If it is not present then a null value will be returned upon inspection and * Qpid JMX API 1.1 can be assumed. */ - int QPID_JMX_API_MAJOR_VERSION = 1; - int QPID_JMX_API_MINOR_VERSION = 8; + int QPID_JMX_API_MAJOR_VERSION = 2; + int QPID_JMX_API_MINOR_VERSION = 0; /** diff --git a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java index 0d9f2ff678..20cfec3758 100644 --- a/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java +++ b/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/ApplicationRegistry.java @@ -46,8 +46,8 @@ public abstract class ApplicationRegistry public static final long timeout = Long.parseLong(System.getProperty("timeout", "15000")); //max supported broker management interface supported by this release of the management console - public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 1; - public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 8; + public static final int SUPPORTED_QPID_JMX_API_MAJOR_VERSION = 2; + public static final int SUPPORTED_QPID_JMX_API_MINOR_VERSION = 0; public static final String DATA_DIR = System.getProperty("user.home") + File.separator + ".qpidmc"; diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java index b41aa661ea..8131e09b49 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java @@ -309,5 +309,12 @@ public class SlowMessageStore implements MessageStore } } + public void updateQueue(AMQQueue queue) throws AMQException + { + doPreDelay("updateQueue"); + _realStore.updateQueue(queue); + doPostDelay("updateQueue"); + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java index a9dafe7ae8..ff80c91fac 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java +++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java @@ -38,6 +38,7 @@ import org.apache.qpid.management.common.mbeans.ManagedBroker; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.management.common.mbeans.LoggingManagement; import org.apache.qpid.management.common.mbeans.ConfigurationManagement; +import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.management.common.mbeans.UserManagement; /** @@ -330,6 +331,12 @@ public class JMXTestUtils ObjectName objectName = getExchangeObjectName("test", exchangeName); return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, ManagedExchange.class, false); } + + public ManagedQueue getManagedQueue(String queueName) + { + ObjectName objectName = getQueueObjectName("test", queueName); + return getManagedObject(ManagedQueue.class, objectName); + } public LoggingManagement getLoggingManagement() throws MalformedObjectNameException { |