diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index bda110c114..9016c2ac66 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfigur import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.actors.BrokerActor; import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -73,7 +74,6 @@ import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.replication.ReplicationGroupListener; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; @@ -203,7 +203,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { if(!_exchangeAdapters.containsKey(exchange)) { - _exchangeAdapters.put(exchange, new ExchangeAdapter(this,exchange)); + final ExchangeAdapter adapter = new ExchangeAdapter(this, exchange); + _exchangeAdapters.put(exchange, adapter); + childAdded(adapter); + } } } @@ -221,7 +224,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { if(!_queueAdapters.containsKey(queue)) { - _queueAdapters.put(queue, new QueueAdapter(this, queue)); + final QueueAdapter adapter = new QueueAdapter(this, queue); + _queueAdapters.put(queue, adapter); + childAdded(adapter); } } } @@ -403,9 +408,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { attributes = new HashMap<String, Object>(attributes); - if (attributes.containsKey(Queue.TYPE)) + if (attributes.containsKey(Queue.QUEUE_TYPE)) { - String typeAttribute = MapValueConverter.getStringAttribute(Queue.TYPE, attributes, null); + String typeAttribute = MapValueConverter.getStringAttribute(Queue.QUEUE_TYPE, attributes, null); QueueType queueType = null; try { @@ -791,11 +796,11 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual op.withinTransaction(new Transaction() { - public void dequeue(final QueueEntry entry) + public void dequeue(final MessageInstance entry) { if(entry.acquire()) { - txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action() + txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action() { public void postCommit() { @@ -809,7 +814,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } - public void copy(QueueEntry entry, Queue queue) + public void copy(MessageInstance entry, Queue queue) { final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue(); @@ -820,7 +825,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { try { - toQueue.enqueue(message); + toQueue.enqueue(message, null); } catch(AMQException e) { @@ -835,7 +840,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } - public void move(final QueueEntry entry, Queue queue) + public void move(final MessageInstance entry, Queue queue) { final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue(); @@ -849,7 +854,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { try { - toQueue.enqueue(message); + toQueue.enqueue(message, null); } catch (AMQException e) { @@ -862,7 +867,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual entry.release(); } }); - txn.dequeue(entry.getQueue(), message, + txn.dequeue(entry.getOwningResource(), message, new ServerTransaction.Action() { |