diff options
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java')
-rw-r--r-- | java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java | 148 |
1 files changed, 82 insertions, 66 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index c055d1e840..6840c7344a 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -26,8 +26,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.LifetimePolicy; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; @@ -36,13 +38,13 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; @@ -55,25 +57,34 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject +public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject { private static final Logger _logger = Logger.getLogger(Session_1_0.class); private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + private final SessionEndpoint _endpoint; private VirtualHost _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = new LinkedHashMap<Integer, ServerTransaction>(); + + private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList = + new CopyOnWriteArrayList<Action<? super Session_1_0>>(); + private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); + private AtomicBoolean _closed = new AtomicBoolean(); - public Session_1_0(VirtualHost vhost, final Connection_1_0 connection) + public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) { _vhost = vhost; + _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; @@ -333,64 +344,41 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu LifetimePolicy lifetimePolicy = properties == null ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName())); + attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); + attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); - final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - false, - properties); - - - - if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) + if(lifetimePolicy instanceof DeleteOnNoLinks) { - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue from vhost", e); - } - } - } - }; - - _connection.addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - _connection.removeConnectionCloseTask(deleteQueueTask); - } - - - }); + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS); + } + else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + { + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinks) + else if(lifetimePolicy instanceof DeleteOnClose) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } else if(lifetimePolicy instanceof DeleteOnNoMessages) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + else { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } + + + // TODO convert AMQP 1-0 node properties to queue attributes + + final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); } catch (QpidSecurityException e) { @@ -462,11 +450,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } - public void forceEnd() - { - } - - @Override public UUID getId() { @@ -474,9 +457,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public AMQConnectionModel getConnectionModel() + public Connection_1_0 getConnectionModel() { - return _connection.getModel(); + return _connection; } @Override @@ -489,14 +472,35 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public void close() { - // TODO - required for AMQSessionModel / management initiated closing + performCloseTasks(); + _endpoint.end(); + } + + protected void performCloseTasks() + { + + if(_closed.compareAndSet(false, true)) + { + List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList); + _taskList.clear(); + for(Action<? super Session_1_0> task : taskList) + { + task.performAction(this); + } + } } @Override public void close(AMQConstant cause, String message) { - // TODO - required for AMQSessionModel + performCloseTasks(); + final End end = new End(); + final Error theError = new Error(); + theError.setDescription(message); + theError.setCondition(ConnectionError.CONNECTION_FORCED); + end.setError(theError); + _endpoint.end(end); } @Override @@ -586,8 +590,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public int getChannelId() { - // TODO - return 0; + return _endpoint.getSendingChannel(); } @Override @@ -609,13 +612,12 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu connectionId, getClientID(), remoteAddress, - _vhost.getName(), // TODO - virtual host - 0) // TODO - channel) - + "] "; + _vhost.getName(), + _endpoint.getSendingChannel()) + "] "; } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(Session_1_0 o) { return getId().compareTo(o.getId()); } @@ -625,4 +627,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu return _connection; } + @Override + public void addDeleteTask(final Action<? super Session_1_0> task) + { + if(!_closed.get()) + { + _taskList.add(task); + } + } + + @Override + public void removeDeleteTask(final Action<? super Session_1_0> task) + { + _taskList.remove(task); + } } |