summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java148
1 files changed, 82 insertions, 66 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c055d1e840..6840c7344a 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -26,8 +26,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
@@ -36,13 +38,13 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.model.*;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -55,25 +57,34 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
+public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
{
private static final Logger _logger = Logger.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+ private final SessionEndpoint _endpoint;
private VirtualHost _vhost;
private AutoCommitTransaction _transaction;
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
+
+ private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList =
+ new CopyOnWriteArrayList<Action<? super Session_1_0>>();
+
private final Connection_1_0 _connection;
private UUID _id = UUID.randomUUID();
+ private AtomicBoolean _closed = new AtomicBoolean();
- public Session_1_0(VirtualHost vhost, final Connection_1_0 connection)
+ public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
{
_vhost = vhost;
+ _endpoint = endpoint;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
@@ -333,64 +344,41 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
+ Map<String,Object> attributes = new HashMap<String,Object>();
+ attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()));
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
+ attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
- final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
- queueName,
- false, // durable
- null, // owner
- false, // autodelete
- false, // exclusive
- false,
- properties);
-
-
-
- if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
+ if(lifetimePolicy instanceof DeleteOnNoLinks)
{
- final Action<Connection_1_0> deleteQueueTask =
- new Action<Connection_1_0>()
- {
- public void performAction(Connection_1_0 session)
- {
- if (_vhost.getQueue(queueName) == tempQueue)
- {
- try
- {
- _vhost.removeQueue(tempQueue);
- }
- catch (QpidSecurityException e)
- {
- //TODO
- _logger.error("Error removing queue from vhost", e);
- }
- }
- }
- };
-
- _connection.addConnectionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- _connection.removeConnectionCloseTask(deleteQueueTask);
- }
-
-
- });
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+ {
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
- else if(lifetimePolicy instanceof DeleteOnNoLinks)
+ else if(lifetimePolicy instanceof DeleteOnClose)
{
-
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
else if(lifetimePolicy instanceof DeleteOnNoMessages)
{
-
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
- else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+ else
{
-
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
+
+
+ // TODO convert AMQP 1-0 node properties to queue attributes
+
+ final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes );
}
catch (QpidSecurityException e)
{
@@ -462,11 +450,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
- public void forceEnd()
- {
- }
-
-
@Override
public UUID getId()
{
@@ -474,9 +457,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public AMQConnectionModel getConnectionModel()
+ public Connection_1_0 getConnectionModel()
{
- return _connection.getModel();
+ return _connection;
}
@Override
@@ -489,14 +472,35 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
@Override
public void close()
{
- // TODO - required for AMQSessionModel / management initiated closing
+ performCloseTasks();
+ _endpoint.end();
+ }
+
+ protected void performCloseTasks()
+ {
+
+ if(_closed.compareAndSet(false, true))
+ {
+ List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList);
+ _taskList.clear();
+ for(Action<? super Session_1_0> task : taskList)
+ {
+ task.performAction(this);
+ }
+ }
}
@Override
public void close(AMQConstant cause, String message)
{
- // TODO - required for AMQSessionModel
+ performCloseTasks();
+ final End end = new End();
+ final Error theError = new Error();
+ theError.setDescription(message);
+ theError.setCondition(ConnectionError.CONNECTION_FORCED);
+ end.setError(theError);
+ _endpoint.end(end);
}
@Override
@@ -586,8 +590,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
@Override
public int getChannelId()
{
- // TODO
- return 0;
+ return _endpoint.getSendingChannel();
}
@Override
@@ -609,13 +612,12 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
connectionId,
getClientID(),
remoteAddress,
- _vhost.getName(), // TODO - virtual host
- 0) // TODO - channel)
- + "] ";
+ _vhost.getName(),
+ _endpoint.getSendingChannel()) + "] ";
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(Session_1_0 o)
{
return getId().compareTo(o.getId());
}
@@ -625,4 +627,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
return _connection;
}
+ @Override
+ public void addDeleteTask(final Action<? super Session_1_0> task)
+ {
+ if(!_closed.get())
+ {
+ _taskList.add(task);
+ }
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super Session_1_0> task)
+ {
+ _taskList.remove(task);
+ }
}