diff options
Diffstat (limited to 'java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol')
3 files changed, 149 insertions, 164 deletions
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index cae61f9d80..7f8237cc85 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.security.Principal; import java.text.MessageFormat; import java.util.Collection; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; @@ -42,7 +43,7 @@ import java.util.List; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; -public class Connection_1_0 implements ConnectionEventListener +public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { private final Port _port; @@ -53,8 +54,33 @@ public class Connection_1_0 implements ConnectionEventListener private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); private final Object _reference = new Object(); - private List<Action<Connection_1_0>> _closeTasks = - Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>()); + + private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); + private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); + private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); + private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); + + private final LogSubject _logSubject = new LogSubject() + { + @Override + public String toLogString() + { + return "[" + + MessageFormat.format(CONNECTION_FORMAT, + getConnectionId(), + getClientId(), + getRemoteAddressString(), + _vhost.getName()) + + "] "; + + } + }; + + private volatile boolean _stopped; + + + private List<Action<? super Connection_1_0>> _closeTasks = + Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); @@ -69,7 +95,7 @@ public class Connection_1_0 implements ConnectionEventListener _transport = transport; _conn = conn; _connectionId = connectionId; - _vhost.getConnectionRegistry().registerConnection(_model); + _vhost.getConnectionRegistry().registerConnection(this); } @@ -80,7 +106,7 @@ public class Connection_1_0 implements ConnectionEventListener public void remoteSessionCreation(SessionEndpoint endpoint) { - Session_1_0 session = new Session_1_0(_vhost, this); + Session_1_0 session = new Session_1_0(_vhost, this, endpoint); _sessions.add(session); endpoint.setSessionEventListener(session); } @@ -90,24 +116,24 @@ public class Connection_1_0 implements ConnectionEventListener _sessions.remove(session); } - void removeConnectionCloseTask(final Action<Connection_1_0> task) + public void removeDeleteTask(final Action<? super Connection_1_0> task) { _closeTasks.remove( task ); } - void addConnectionCloseTask(final Action<Connection_1_0> task) + public void addDeleteTask(final Action<? super Connection_1_0> task) { _closeTasks.add( task ); } public void closeReceived() { - List<Action<Connection_1_0>> taskCopy; + List<Action<? super Connection_1_0>> taskCopy; synchronized (_closeTasks) { - taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks); + taskCopy = new ArrayList<Action<? super Connection_1_0>>(_closeTasks); } - for(Action<Connection_1_0> task : taskCopy) + for(Action<? super Connection_1_0> task : taskCopy) { task.performAction(this); } @@ -115,7 +141,7 @@ public class Connection_1_0 implements ConnectionEventListener { _closeTasks.clear(); } - _vhost.getConnectionRegistry().deregisterConnection(_model); + _vhost.getConnectionRegistry().deregisterConnection(this); } @@ -125,30 +151,6 @@ public class Connection_1_0 implements ConnectionEventListener closeReceived(); } - private final AMQConnectionModel _model = new AMQConnectionModel() - { - private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); - private StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter(); - private StatisticsCounter _dataReceiptStatistics = new StatisticsCounter(); - - private final LogSubject _logSubject = new LogSubject() - { - @Override - public String toLogString() - { - return "[" + - MessageFormat.format(CONNECTION_FORMAT, - getConnectionId(), - getClientId(), - getRemoteAddressString(), - _vhost.getName()) - + "] "; - - } - }; - - private volatile boolean _stopped; @Override public void close(AMQConstant cause, String message) @@ -169,9 +171,9 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) + public void closeSession(Session_1_0 session, AMQConstant cause, String message) { - // TODO + session.close(cause, message); } @Override @@ -181,9 +183,9 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public List<AMQSessionModel> getSessionModels() + public List<Session_1_0> getSessionModels() { - return new ArrayList<AMQSessionModel>(_sessions); + return new ArrayList<Session_1_0>(_sessions); } @Override @@ -193,12 +195,6 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public String getUserName() - { - return getPrincipalAsString(); - } - - @Override public boolean isSessionNameUnique(byte[] name) { return true; // TODO @@ -216,7 +212,13 @@ public class Connection_1_0 implements ConnectionEventListener return _conn.getRemoteContainerId(); } - @Override + @Override + public String getRemoteContainerName() + { + return _conn.getRemoteContainerId(); + } + + @Override public String getClientVersion() { return ""; //TODO @@ -228,10 +230,9 @@ public class Connection_1_0 implements ConnectionEventListener return ""; //TODO } - @Override - public String getPrincipalAsString() + public Principal getAuthorizedPrincipal() { - return String.valueOf(_conn.getUser()); + return _conn.getUser(); } @Override @@ -337,11 +338,10 @@ public class Connection_1_0 implements ConnectionEventListener } - }; AMQConnectionModel getModel() { - return _model; + return this; } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 546cc79f9e..f7e2d2df50 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -30,6 +30,9 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -207,15 +210,14 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { - queue = _vhost.createQueue( - UUIDGenerator.generateQueueUUID(name, _vhost.getName()), - name, - isDurable, - null, - true, - true, - true, - Collections.EMPTY_MAP); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName())); + attributes.put(Queue.NAME, name); + attributes.put(Queue.DURABLE, isDurable); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS); + attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.LINK); + + queue = _vhost.createQueue(getSession(), attributes); } else { @@ -308,44 +310,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS exchange.addBinding(binding, queue,null); source.setDistributionMode(StdDistMode.COPY); - if(!isDurable) - { - final String queueName = name; - final AMQQueue tempQueue = queue; - - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue", e); - } - } - } - }; - - getSession().getConnection().addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - getSession().getConnection().removeConnectionCloseTask(deleteQueueTask); - } - - - }); - } - qd = new QueueDestination(queue); } catch (QpidSecurityException e) @@ -409,6 +373,11 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer"); throw new ConnectionScopedRuntimeException(e); } + catch (MessageSource.ConsumerAccessRefused e) + { + _logger.info("Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy"); + throw new ConnectionScopedRuntimeException(e); + } } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index c055d1e840..6840c7344a 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -26,8 +26,10 @@ import org.apache.log4j.Logger; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; import org.apache.qpid.amqp_1_0.transport.SessionEventListener; import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.LifetimePolicy; import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.messaging.Source; import org.apache.qpid.amqp_1_0.type.messaging.Target; @@ -36,13 +38,13 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; +import org.apache.qpid.server.model.*; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; @@ -55,25 +57,34 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.QueueExistsException; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject +public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject { private static final Logger _logger = Logger.getLogger(Session_1_0.class); private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); + private final SessionEndpoint _endpoint; private VirtualHost _vhost; private AutoCommitTransaction _transaction; private final LinkedHashMap<Integer, ServerTransaction> _openTransactions = new LinkedHashMap<Integer, ServerTransaction>(); + + private final CopyOnWriteArrayList<Action<? super Session_1_0>> _taskList = + new CopyOnWriteArrayList<Action<? super Session_1_0>>(); + private final Connection_1_0 _connection; private UUID _id = UUID.randomUUID(); + private AtomicBoolean _closed = new AtomicBoolean(); - public Session_1_0(VirtualHost vhost, final Connection_1_0 connection) + public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint) { _vhost = vhost; + _endpoint = endpoint; _transaction = new AutoCommitTransaction(vhost.getMessageStore()); _connection = connection; @@ -333,64 +344,41 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu LifetimePolicy lifetimePolicy = properties == null ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); + Map<String,Object> attributes = new HashMap<String,Object>(); + attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName())); + attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName); + attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false); - final AMQQueue tempQueue = queue = _vhost.createQueue( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()), - queueName, - false, // durable - null, // owner - false, // autodelete - false, // exclusive - false, - properties); - - - - if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose) + if(lifetimePolicy instanceof DeleteOnNoLinks) { - final Action<Connection_1_0> deleteQueueTask = - new Action<Connection_1_0>() - { - public void performAction(Connection_1_0 session) - { - if (_vhost.getQueue(queueName) == tempQueue) - { - try - { - _vhost.removeQueue(tempQueue); - } - catch (QpidSecurityException e) - { - //TODO - _logger.error("Error removing queue from vhost", e); - } - } - } - }; - - _connection.addConnectionCloseTask(deleteQueueTask); - - queue.addQueueDeleteTask(new Action<AMQQueue>() - { - public void performAction(AMQQueue queue) - { - _connection.removeConnectionCloseTask(deleteQueueTask); - } - - - }); + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_NO_LINKS); + } + else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + { + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinks) + else if(lifetimePolicy instanceof DeleteOnClose) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } else if(lifetimePolicy instanceof DeleteOnNoMessages) { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.IN_USE); } - else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages) + else { - + attributes.put(org.apache.qpid.server.model.Queue.LIFETIME_POLICY, + org.apache.qpid.server.model.LifetimePolicy.DELETE_ON_CONNECTION_CLOSE); } + + + // TODO convert AMQP 1-0 node properties to queue attributes + + final AMQQueue tempQueue = queue = _vhost.createQueue(this, attributes ); } catch (QpidSecurityException e) { @@ -462,11 +450,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } - public void forceEnd() - { - } - - @Override public UUID getId() { @@ -474,9 +457,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public AMQConnectionModel getConnectionModel() + public Connection_1_0 getConnectionModel() { - return _connection.getModel(); + return _connection; } @Override @@ -489,14 +472,35 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public void close() { - // TODO - required for AMQSessionModel / management initiated closing + performCloseTasks(); + _endpoint.end(); + } + + protected void performCloseTasks() + { + + if(_closed.compareAndSet(false, true)) + { + List<Action<? super Session_1_0>> taskList = new ArrayList<Action<? super Session_1_0>>(_taskList); + _taskList.clear(); + for(Action<? super Session_1_0> task : taskList) + { + task.performAction(this); + } + } } @Override public void close(AMQConstant cause, String message) { - // TODO - required for AMQSessionModel + performCloseTasks(); + final End end = new End(); + final Error theError = new Error(); + theError.setDescription(message); + theError.setCondition(ConnectionError.CONNECTION_FORCED); + end.setError(theError); + _endpoint.end(end); } @Override @@ -586,8 +590,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public int getChannelId() { - // TODO - return 0; + return _endpoint.getSendingChannel(); } @Override @@ -609,13 +612,12 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu connectionId, getClientID(), remoteAddress, - _vhost.getName(), // TODO - virtual host - 0) // TODO - channel) - + "] "; + _vhost.getName(), + _endpoint.getSendingChannel()) + "] "; } @Override - public int compareTo(AMQSessionModel o) + public int compareTo(Session_1_0 o) { return getId().compareTo(o.getId()); } @@ -625,4 +627,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu return _connection; } + @Override + public void addDeleteTask(final Action<? super Session_1_0> task) + { + if(!_closed.get()) + { + _taskList.add(task); + } + } + + @Override + public void removeDeleteTask(final Action<? super Session_1_0> task) + { + _taskList.remove(task); + } } |