diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 823e4cb16d..beed6be84b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.*; @@ -108,11 +111,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu source.setAddress(tempQueue.getName()); } String addr = source.getAddress(); - AMQQueue queue = _vhost.getQueue(addr); + MessageSource queue = _vhost.getMessageSource(addr); if(queue != null) { - destination = new QueueDestination(queue); + destination = new MessageSourceDestination(queue); @@ -249,11 +252,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } String addr = target.getAddress(); - Exchange exchg = _vhost.getExchange(addr); - if(exchg != null) + MessageDestination messageDestination = _vhost.getMessageDestination(addr); + if(messageDestination != null) { - destination = new ExchangeDestination(exchg, target.getDurable(), - target.getExpiryPolicy()); + destination = new NodeReceivingDestination(messageDestination, target.getDurable(), + target.getExpiryPolicy()); } else { @@ -343,10 +346,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) { - final Connection_1_0.Task deleteQueueTask = - new Connection_1_0.Task() + final Action<Connection_1_0> deleteQueueTask = + new Action<Connection_1_0>() { - public void doTask(Connection_1_0 session) + public void performAction(Connection_1_0 session) { if (_vhost.getQueue(queueName) == tempQueue) { @@ -365,9 +368,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu _connection.addConnectionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) + public void performAction(AMQQueue queue) { _connection.removeConnectionCloseTask(deleteQueueTask); } |