summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2010-07-16 15:19:46 +0000
committerRobert Gemmell <robbie@apache.org>2010-07-16 15:19:46 +0000
commitee7232042d82b6ce63c6398d61fa518e7beec4e3 (patch)
treea197cbfe30bf8a472ce7bedf851352cf8db13d5f /java/broker/src
parent9f89d600074f5e9f800d202e948adbe131cee5e8 (diff)
downloadqpid-python-ee7232042d82b6ce63c6398d61fa518e7beec4e3.tar.gz
QPID-2731: enable getting/setting queue exclusivity via JMX
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@964825 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java45
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java5
10 files changed, 113 insertions, 2 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 225fbec930..de9dc42de8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -280,4 +280,6 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
ConfigurationPlugin getConfiguration();
ManagedObject getManagedObject();
+
+ void setExclusive(boolean exclusive) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 806b7f3744..32b71a554b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -274,6 +274,23 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
return _queue.isOverfull();
}
+ public boolean isExclusive()
+ {
+ return _queue.isExclusive();
+ }
+
+ public void setExclusive(boolean exclusive) throws JMException
+ {
+ try
+ {
+ _queue.setExclusive(exclusive);
+ }
+ catch (AMQException e)
+ {
+ throw new JMException(e.toString());
+ }
+ }
+
/**
* Checks if there is any notification to be send to the listeners
*/
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d47d229658..489a724254 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -53,6 +53,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -328,6 +330,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
return _exclusive;
}
+
+ public void setExclusive(boolean exclusive) throws AMQException
+ {
+ _exclusive = exclusive;
+
+ if(isDurable())
+ {
+ getVirtualHost().getDurableConfigurationStore().updateQueue(this);
+ }
+ }
public Exchange getAlternateExchange()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 40f265e00f..627f059c53 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -94,6 +94,7 @@ public class DerbyMessageStore implements MessageStore
private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME;
private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
+ private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?";
private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
private static final String SELECT_FROM_BINDINGS =
"SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
@@ -845,6 +846,50 @@ public class DerbyMessageStore implements MessageStore
}
}
}
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * NOTE: Currently only updates the exclusivity.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ public void updateQueue(final AMQQueue queue) throws AMQException
+ {
+ if (_state != State.RECOVERING)
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+
+ PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
+ stmt.setString(1, queue.getNameShortString().toString());
+
+ ResultSet rs = stmt.executeQuery();
+
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY);
+
+ stmt2.setBoolean(1,queue.isExclusive());
+ stmt2.setString(2, queue.getNameShortString().toString());
+
+ stmt2.execute();
+ stmt2.close();
+ }
+
+ stmt.close();
+ conn.close();
+ }
+ catch (SQLException e)
+ {
+ throw new AMQException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e, e);
+ }
+ }
+
+ }
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index a50e8e99b4..c169e3bcff 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -118,4 +118,13 @@ public interface DurableConfigurationStore
* @throws org.apache.qpid.AMQException If the operation fails for any reason.
*/
void removeQueue(AMQQueue queue) throws AMQException;
+
+ /**
+ * Updates the specified queue in the persistent store, IF it is already present. If the queue
+ * is not present in the store, it will not be added.
+ *
+ * @param queue The queue to update the entry for.
+ * @throws org.apache.qpid.AMQException If the operation fails for any reason.
+ */
+ void updateQueue(AMQQueue queue) throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 565afd2539..9d9312cd26 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -148,6 +148,11 @@ public class MemoryMessageStore implements MessageStore
{
// Not required to do anything
}
+
+ public void updateQueue(final AMQQueue queue) throws AMQException
+ {
+ // Not required to do anything
+ }
public void configureTransactionLog(String name,
TransactionLogRecoveryHandler recoveryHandler,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index fcb06f56bf..c6055f35f6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -765,6 +765,10 @@ public class VirtualHostImpl implements VirtualHost
arguments = args;
}
}
+
+ public void updateQueue(AMQQueue queue) throws AMQException
+ {
+ }
}
@Override
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 6a98a255f8..3735ef123d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -197,7 +197,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertTrue(_queueMBean.getConsumerCount() == 3);
}
- public void testGeneralProperties()
+ public void testGeneralProperties() throws Exception
{
long maxQueueDepth = 1000; // in bytes
_queueMBean.setMaximumMessageCount(50000l);
@@ -211,7 +211,14 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
assertEquals("Queue Name does not match", new AMQShortString(getName()), _queueMBean.getName());
assertFalse("AutoDelete should not be set.",_queueMBean.isAutoDelete());
assertFalse("Queue should not be durable.",_queueMBean.isDurable());
- //TODO add isExclusive when supported
+
+ //set+get exclusivity using the mbean, and also verify it is actually updated in the queue
+ _queueMBean.setExclusive(true);
+ assertTrue("Exclusive property should be true.",_queueMBean.isExclusive());
+ assertTrue("Exclusive property should be true.",_queue.isExclusive());
+ _queueMBean.setExclusive(false);
+ assertFalse("Exclusive property should be false.",_queueMBean.isExclusive());
+ assertFalse("Exclusive property should be false.",_queue.isExclusive());
}
public void testExceptions() throws Exception
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 51b049787c..9bbae6df01 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -599,4 +599,9 @@ public class MockAMQQueue implements AMQQueue
{
return 0;
}
+
+ public void setExclusive(boolean exclusive)
+ {
+
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index 9c12242a07..8cbc50ecbe 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -217,4 +217,9 @@ public class SkeletonMessageStore implements MessageStore
};
}
+ public void updateQueue(AMQQueue queue) throws AMQException
+ {
+
+ }
+
}