diff options
| author | Robert Gemmell <robbie@apache.org> | 2010-07-16 15:19:46 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2010-07-16 15:19:46 +0000 |
| commit | ee7232042d82b6ce63c6398d61fa518e7beec4e3 (patch) | |
| tree | a197cbfe30bf8a472ce7bedf851352cf8db13d5f /java/broker/src | |
| parent | 9f89d600074f5e9f800d202e948adbe131cee5e8 (diff) | |
| download | qpid-python-ee7232042d82b6ce63c6398d61fa518e7beec4e3.tar.gz | |
QPID-2731: enable getting/setting queue exclusivity via JMX
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@964825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
10 files changed, 113 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 225fbec930..de9dc42de8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -280,4 +280,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer ConfigurationPlugin getConfiguration(); ManagedObject getManagedObject(); + + void setExclusive(boolean exclusive) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 806b7f3744..32b71a554b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -274,6 +274,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que return _queue.isOverfull(); } + public boolean isExclusive() + { + return _queue.isExclusive(); + } + + public void setExclusive(boolean exclusive) throws JMException + { + try + { + _queue.setExclusive(exclusive); + } + catch (AMQException e) + { + throw new JMException(e.toString()); + } + } + /** * Checks if there is any notification to be send to the listeners */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d47d229658..489a724254 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -53,6 +53,8 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -328,6 +330,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { return _exclusive; } + + public void setExclusive(boolean exclusive) throws AMQException + { + _exclusive = exclusive; + + if(isDurable()) + { + getVirtualHost().getDurableConfigurationStore().updateQueue(this); + } + } public Exchange getAlternateExchange() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 40f265e00f..627f059c53 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -94,6 +94,7 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; + private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?"; private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; private static final String SELECT_FROM_BINDINGS = "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; @@ -845,6 +846,50 @@ public class DerbyMessageStore implements MessageStore } } } + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * NOTE: Currently only updates the exclusivity. + * + * @param queue The queue to update the entry for. + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + public void updateQueue(final AMQQueue queue) throws AMQException + { + if (_state != State.RECOVERING) + { + try + { + Connection conn = newAutoCommitConnection(); + + PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); + stmt.setString(1, queue.getNameShortString().toString()); + + ResultSet rs = stmt.executeQuery(); + + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY); + + stmt2.setBoolean(1,queue.isExclusive()); + stmt2.setString(2, queue.getNameShortString().toString()); + + stmt2.execute(); + stmt2.close(); + } + + stmt.close(); + conn.close(); + } + catch (SQLException e) + { + throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e); + } + } + + } /** * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index a50e8e99b4..c169e3bcff 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -118,4 +118,13 @@ public interface DurableConfigurationStore * @throws org.apache.qpid.AMQException If the operation fails for any reason. */ void removeQueue(AMQQueue queue) throws AMQException; + + /** + * Updates the specified queue in the persistent store, IF it is already present. If the queue + * is not present in the store, it will not be added. + * + * @param queue The queue to update the entry for. + * @throws org.apache.qpid.AMQException If the operation fails for any reason. + */ + void updateQueue(AMQQueue queue) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 565afd2539..9d9312cd26 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -148,6 +148,11 @@ public class MemoryMessageStore implements MessageStore { // Not required to do anything } + + public void updateQueue(final AMQQueue queue) throws AMQException + { + // Not required to do anything + } public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler, diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index fcb06f56bf..c6055f35f6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -765,6 +765,10 @@ public class VirtualHostImpl implements VirtualHost arguments = args; } } + + public void updateQueue(AMQQueue queue) throws AMQException + { + } } @Override diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 6a98a255f8..3735ef123d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -197,7 +197,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertTrue(_queueMBean.getConsumerCount() == 3); } - public void testGeneralProperties() + public void testGeneralProperties() throws Exception { long maxQueueDepth = 1000; // in bytes _queueMBean.setMaximumMessageCount(50000l); @@ -211,7 +211,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase assertEquals("Queue Name does not match", new AMQShortString(getName()), _queueMBean.getName()); assertFalse("AutoDelete should not be set.",_queueMBean.isAutoDelete()); assertFalse("Queue should not be durable.",_queueMBean.isDurable()); - //TODO add isExclusive when supported + + //set+get exclusivity using the mbean, and also verify it is actually updated in the queue + _queueMBean.setExclusive(true); + assertTrue("Exclusive property should be true.",_queueMBean.isExclusive()); + assertTrue("Exclusive property should be true.",_queue.isExclusive()); + _queueMBean.setExclusive(false); + assertFalse("Exclusive property should be false.",_queueMBean.isExclusive()); + assertFalse("Exclusive property should be false.",_queue.isExclusive()); } public void testExceptions() throws Exception diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 51b049787c..9bbae6df01 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -599,4 +599,9 @@ public class MockAMQQueue implements AMQQueue { return 0; } + + public void setExclusive(boolean exclusive) + { + + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java index 9c12242a07..8cbc50ecbe 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -217,4 +217,9 @@ public class SkeletonMessageStore implements MessageStore }; } + public void updateQueue(AMQQueue queue) throws AMQException + { + + } + } |
