summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol')
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java102
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java63
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java148
3 files changed, 149 insertions, 164 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index cae61f9d80..7f8237cc85 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
@@ -42,7 +43,7 @@ import java.util.List;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-public class Connection_1_0 implements ConnectionEventListener
+public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
private final Port _port;
@@ -53,8 +54,33 @@ public class Connection_1_0 implements ConnectionEventListener
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
- private List<Action<Connection_1_0>> _closeTasks =
- Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
+
+ private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+ private final LogSubject _logSubject = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getRemoteAddressString(),
+ _vhost.getName())
+ + "] ";
+
+ }
+ };
+
+ private volatile boolean _stopped;
+
+
+ private List<Action<? super Connection_1_0>> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
@@ -69,7 +95,7 @@ public class Connection_1_0 implements ConnectionEventListener
_transport = transport;
_conn = conn;
_connectionId = connectionId;
- _vhost.getConnectionRegistry().registerConnection(_model);
+ _vhost.getConnectionRegistry().registerConnection(this);
}
@@ -80,7 +106,7 @@ public class Connection_1_0 implements ConnectionEventListener
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- Session_1_0 session = new Session_1_0(_vhost, this);
+ Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
_sessions.add(session);
endpoint.setSessionEventListener(session);
}
@@ -90,24 +116,24 @@ public class Connection_1_0 implements ConnectionEventListener
_sessions.remove(session);
}
- void removeConnectionCloseTask(final Action<Connection_1_0> task)
+ public void removeDeleteTask(final Action<? super Connection_1_0> task)
{
_closeTasks.remove( task );
}
- void addConnectionCloseTask(final Action<Connection_1_0> task)
+ public void addDeleteTask(final Action<? super Connection_1_0> task)
{
_closeTasks.add( task );
}
public void closeReceived()
{
- List<Action<Connection_1_0>> taskCopy;
+ List<Action<? super Connection_1_0>> taskCopy;
synchronized (_closeTasks)
{
- taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
+ taskCopy = new ArrayList<Action<? super Connection_1_0>>(_closeTasks);
}
- for(Action<Connection_1_0> task : taskCopy)
+ for(Action<? super Connection_1_0> task : taskCopy)
{
task.performAction(this);
}
@@ -115,7 +141,7 @@ public class Connection_1_0 implements ConnectionEventListener
{
_closeTasks.clear();
}
- _vhost.getConnectionRegistry().deregisterConnection(_model);
+ _vhost.getConnectionRegistry().deregisterConnection(this);
}
@@ -125,30 +151,6 @@ public class Connection_1_0 implements ConnectionEventListener
closeReceived();
}
- private final AMQConnectionModel _model = new AMQConnectionModel()
- {
- private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
- private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
- private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
- private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
-
- private final LogSubject _logSubject = new LogSubject()
- {
- @Override
- public String toLogString()
- {
- return "[" +
- MessageFormat.format(CONNECTION_FORMAT,
- getConnectionId(),
- getClientId(),
- getRemoteAddressString(),
- _vhost.getName())
- + "] ";
-
- }
- };
-
- private volatile boolean _stopped;
@Override
public void close(AMQConstant cause, String message)
@@ -169,9 +171,9 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(Session_1_0 session, AMQConstant cause, String message)
{
- // TODO
+ session.close(cause, message);
}
@Override
@@ -181,9 +183,9 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public List<AMQSessionModel> getSessionModels()
+ public List<Session_1_0> getSessionModels()
{
- return new ArrayList<AMQSessionModel>(_sessions);
+ return new ArrayList<Session_1_0>(_sessions);
}
@Override
@@ -193,12 +195,6 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public String getUserName()
- {
- return getPrincipalAsString();
- }
-
- @Override
public boolean isSessionNameUnique(byte[] name)
{
return true; // TODO
@@ -216,7 +212,13 @@ public class Connection_1_0 implements ConnectionEventListener
return _conn.getRemoteContainerId();
}
- @Override
+ @Override
+ public String getRemoteContainerName()
+ {
+ return _conn.getRemoteContainerId();
+ }
+
+ @Override
public String getClientVersion()
{
return ""; //TODO
@@ -228,10 +230,9 @@ public class Connection_1_0 implements ConnectionEventListener
return ""; //TODO
}
- @Override
- public String getPrincipalAsString()
+ public Principal getAuthorizedPrincipal()
{
- return String.valueOf(_conn.getUser());
+ return _conn.getUser();
}
@Override
@@ -337,11 +338,10 @@ public class Connection_1_0 implements ConnectionEventListener
}
- };
AMQConnectionModel getModel()
{
- return _model;
+ return this;
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 546cc79f9e..f7e2d2df50 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -30,6 +30,9 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(queue == null)
{
- queue = _vhost.createQueue(
- UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
- name,
- isDurable,
- null,
- true,
- true,
- true,
- Collections.EMPTY_MAP);
+ Map<String,Object> attributes = new HashMap<String,Object>();
+ attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName()));
+ attributes.put(Queue.NAME, name);
+ attributes.put(Queue.DURABLE, isDurable);
+ attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK);
+
+ queue = _vhost.createQueue(getSession(), attributes);
}
else
{
@@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
- if(!isDurable)
- {
- final String queueName = name;
- final AMQQueue tempQueue = queue;
-
- final Action<Connection_1_0> deleteQueueTask =
- new Action<Connection_1_0>()
- {
- public void performAction(Connection_1_0 session)
- {
- if (_vhost.getQueue(queueName) == tempQueue)
- {
- try
- {
- _vhost.removeQueue(tempQueue);
- }
- catch (QpidSecurityException e)
- {
- //TODO
- _logger.error("Error removing queue", e);
- }
- }
- }
- };
-
- getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
- }
-
-
- });
- }
-
qd = new QueueDestination(queue);
}
catch (QpidSecurityException e)
@@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
_logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer");
throw new ConnectionScopedRuntimeException(e);
}
+ catch (MessageSource.ConsumerAccessRefused e)
+ {
+ _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy");
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index c055d1e840..6840c7344a 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -26,8 +26,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
@@ -36,13 +38,13 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.model.*;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -55,25 +57,34 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
+public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
{
private static final Logger _logger = Logger.getLogger(Session_1_0.class);
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+ private final SessionEndpoint _endpoint;
private VirtualHost _vhost;
private AutoCommitTransaction _transaction;
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
+
+ private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList =
+ new CopyOnWriteArrayList<Action<? super Session_1_0>>();
+
private final Connection_1_0 _connection;
private UUID _id = UUID.randomUUID();
+ private AtomicBoolean _closed = new AtomicBoolean();
- public Session_1_0(VirtualHost vhost, final Connection_1_0 connection)
+ public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
{
_vhost = vhost;
+ _endpoint = endpoint;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
@@ -333,64 +344,41 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
+ Map<String,Object> attributes = new HashMap<String,Object>();
+ attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()));
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
+ attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
- final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
- queueName,
- false, // durable
- null, // owner
- false, // autodelete
- false, // exclusive
- false,
- properties);
-
-
-
- if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
+ if(lifetimePolicy instanceof DeleteOnNoLinks)
{
- final Action<Connection_1_0> deleteQueueTask =
- new Action<Connection_1_0>()
- {
- public void performAction(Connection_1_0 session)
- {
- if (_vhost.getQueue(queueName) == tempQueue)
- {
- try
- {
- _vhost.removeQueue(tempQueue);
- }
- catch (QpidSecurityException e)
- {
- //TODO
- _logger.error("Error removing queue from vhost", e);
- }
- }
- }
- };
-
- _connection.addConnectionCloseTask(deleteQueueTask);
-
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- _connection.removeConnectionCloseTask(deleteQueueTask);
- }
-
-
- });
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS);
+ }
+ else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+ {
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
- else if(lifetimePolicy instanceof DeleteOnNoLinks)
+ else if(lifetimePolicy instanceof DeleteOnClose)
{
-
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
else if(lifetimePolicy instanceof DeleteOnNoMessages)
{
-
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.IN_USE);
}
- else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+ else
{
-
+ attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY,
+ org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE);
}
+
+
+ // TODO convert AMQP 1-0 node properties to queue attributes
+
+ final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes );
}
catch (QpidSecurityException e)
{
@@ -462,11 +450,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
- public void forceEnd()
- {
- }
-
-
@Override
public UUID getId()
{
@@ -474,9 +457,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public AMQConnectionModel getConnectionModel()
+ public Connection_1_0 getConnectionModel()
{
- return _connection.getModel();
+ return _connection;
}
@Override
@@ -489,14 +472,35 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
@Override
public void close()
{
- // TODO - required for AMQSessionModel / management initiated closing
+ performCloseTasks();
+ _endpoint.end();
+ }
+
+ protected void performCloseTasks()
+ {
+
+ if(_closed.compareAndSet(false, true))
+ {
+ List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList);
+ _taskList.clear();
+ for(Action<? super Session_1_0> task : taskList)
+ {
+ task.performAction(this);
+ }
+ }
}
@Override
public void close(AMQConstant cause, String message)
{
- // TODO - required for AMQSessionModel
+ performCloseTasks();
+ final End end = new End();
+ final Error theError = new Error();
+ theError.setDescription(message);
+ theError.setCondition(ConnectionError.CONNECTION_FORCED);
+ end.setError(theError);
+ _endpoint.end(end);
}
@Override
@@ -586,8 +590,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
@Override
public int getChannelId()
{
- // TODO
- return 0;
+ return _endpoint.getSendingChannel();
}
@Override
@@ -609,13 +612,12 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
connectionId,
getClientID(),
remoteAddress,
- _vhost.getName(), // TODO - virtual host
- 0) // TODO - channel)
- + "] ";
+ _vhost.getName(),
+ _endpoint.getSendingChannel()) + "] ";
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(Session_1_0 o)
{
return getId().compareTo(o.getId());
}
@@ -625,4 +627,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
return _connection;
}
+ @Override
+ public void addDeleteTask(final Action<? super Session_1_0> task)
+ {
+ if(!_closed.get())
+ {
+ _taskList.add(task);
+ }
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super Session_1_0> task)
+ {
+ _taskList.remove(task);
+ }
}