summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-10-protocol/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src')
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java57
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java11
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java17
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java127
4 files changed, 90 insertions, 122 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index ff7ce0a79d..9c012eb782 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -25,6 +25,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -56,7 +58,8 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+ LogSubject, AuthorizationHolder
{
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -72,6 +75,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Transport _transport;
+
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+
private volatile boolean _stopped;
public ServerConnection(final long connectionId, Broker broker)
@@ -197,7 +204,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
_onOpenTask = task;
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+ public void closeSession(ServerSession session, AMQConstant cause, String message)
{
ExecutionException ex = new ExecutionException();
ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -211,7 +218,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
ex.setErrorCode(code);
ex.setDescription(message);
- ((ServerSession)session).invoke(ex);
+ session.invoke(ex);
session.close(cause, message);
}
@@ -315,6 +322,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void close(AMQConstant cause, String message)
{
closeSubscriptions();
+ performDeleteTasks();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -327,6 +335,14 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
close(replyCode, message);
}
+ protected void performDeleteTasks()
+ {
+ for(Action<? super ServerConnection> task : _taskList)
+ {
+ task.performAction(this);
+ }
+ }
+
public synchronized void block()
{
if(!_blocking)
@@ -367,12 +383,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
super.removeSession(ssn);
}
- public List<AMQSessionModel> getSessionModels()
+ public List<ServerSession> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+ List<ServerSession> sessions = new ArrayList<ServerSession>();
for (Session ssn : getChannels())
{
- sessions.add((AMQSessionModel) ssn);
+ sessions.add((ServerSession) ssn);
}
return sessions;
}
@@ -475,14 +491,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return String.valueOf(getRemoteAddress());
}
- public String getUserName()
- {
- return _authorizedPrincipal.getName();
- }
-
@Override
public void closed()
{
+ performDeleteTasks();
closeSubscriptions();
super.closed();
}
@@ -522,6 +534,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
@Override
+ public String getRemoteContainerName()
+ {
+ return getConnectionDelegate().getClientId();
+ }
+
+ @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
@@ -533,11 +551,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return getConnectionDelegate().getClientProduct();
}
- public String getPrincipalAsString()
- {
- return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
- }
-
public long getSessionCountLimit()
{
return getChannelMax();
@@ -565,4 +578,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
super.doHeartBeat();
}
+
+ @Override
+ public void addDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.add(task);
+ }
+
+ @Override
+ public void removeDeleteTask(final Action<? super ServerConnection> task)
+ {
+ _taskList.remove(task);
+ }
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index c85a415ce5..dc26249c61 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -310,14 +311,18 @@ public class ServerConnectionDelegate extends ServerDelegate
private boolean isSessionNameUnique(final byte[] name, final Connection conn)
{
final ServerConnection sconn = (ServerConnection) conn;
- final String userId = sconn.getUserName();
+ final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+ final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
final Iterator<AMQConnectionModel> connections =
((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
while(connections.hasNext())
{
- final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
- if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+ final AMQConnectionModel amqConnectionModel = connections.next();
+ final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+ ? ""
+ : amqConnectionModel.getAuthorizedPrincipal().getName();
+ if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
{
return false;
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 0e6b4d3b08..29f9fc549e 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -88,7 +89,9 @@ import static org.apache.qpid.util.Serial.gt;
public class ServerSession extends Session
implements AuthorizationHolder,
- AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
+ AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+ Deletable<ServerSession>
+
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
@@ -132,7 +135,7 @@ public class ServerSession extends Session
private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
+ private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -374,7 +377,7 @@ public class ServerSession extends Session
}
_messageDispositionListenerMap.clear();
- for (Action<ServerSession> task : _taskList)
+ for (Action<? super ServerSession> task : _taskList)
{
task.performAction(this);
}
@@ -610,12 +613,12 @@ public class ServerSession extends Session
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Action<ServerSession> task)
+ public void addDeleteTask(Action<? super ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Action<ServerSession> task)
+ public void removeDeleteTask(Action<? super ServerSession> task)
{
_taskList.remove(task);
}
@@ -652,7 +655,7 @@ public class ServerSession extends Session
return _id;
}
- public AMQConnectionModel getConnectionModel()
+ public ServerConnection getConnectionModel()
{
return getConnection();
}
@@ -922,7 +925,7 @@ public class ServerSession extends Session
}
@Override
- public int compareTo(AMQSessionModel o)
+ public int compareTo(ServerSession o)
{
return getId().compareTo(o.getId());
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 1bd50533ed..b0a60beaf5 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.Exchange;
@@ -204,47 +206,12 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
- else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ else if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
- if(queue.isExclusive())
- {
- ServerSession s = (ServerSession) session;
- queue.setExclusiveOwningSession(s);
-
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getExclusiveOwningSession() == session)
- {
- queue.setExclusiveOwningSession(null);
- }
- }
- });
-
- if(queue.getAuthorizationHolder() == null)
- {
- queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- if(queue.getAuthorizationHolder() == session)
- {
- queue.setAuthorizationHolder(null);
- }
- }
- });
- }
- }
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -302,6 +269,10 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
+ }
}
}
}
@@ -1197,7 +1168,7 @@ public class ServerSessionDelegate extends SessionDelegate
exception(session, method, errorCode, description);
}
- else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ else if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1214,7 +1185,6 @@ public class ServerSessionDelegate extends SessionDelegate
try
{
- String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
final String alternateExchangeName = method.getAlternateExchange();
@@ -1227,66 +1197,36 @@ public class ServerSessionDelegate extends SessionDelegate
final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
- final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+ arguments.put(Queue.ID, id);
+ arguments.put(Queue.NAME, queueName);
- queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
- autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
-
- if (autoDelete && exclusive)
+ LifetimePolicy lifetime;
+ if(autoDelete)
{
- final AMQQueue q = queue;
- final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- try
- {
- virtualHost.removeQueue(q);
- }
- catch (QpidSecurityException e)
- {
- exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
- }
- }
- };
- final ServerSession s = (ServerSession) session;
- s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(deleteQueueTask);
- }
- });
+ lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+ : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
}
- if (exclusive)
+ else
{
- final AMQQueue q = queue;
- final Action<ServerSession> removeExclusive = new Action<ServerSession>()
- {
- public void performAction(ServerSession session)
- {
- q.setAuthorizationHolder(null);
- q.setExclusiveOwningSession(null);
- }
- };
- final ServerSession s = (ServerSession) session;
- q.setExclusiveOwningSession(s);
- s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new Action<AMQQueue>()
- {
- public void performAction(AMQQueue queue)
- {
- s.removeSessionCloseTask(removeExclusive);
- }
- });
+ lifetime = LifetimePolicy.PERMANENT;
}
+
+ arguments.put(Queue.LIFETIME_POLICY, lifetime);
+
+ ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+
+
+ arguments.put(Queue.DURABLE, method.getDurable());
+
+ arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+
+ queue = virtualHost.createQueue((ServerSession)session, arguments);
+
}
catch(QueueExistsException qe)
{
queue = qe.getExistingQueue();
- if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+ if (!queue.verifySessionAccess((ServerSession)session))
{
String description = "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
@@ -1347,11 +1287,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
- {
- exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
- }
- else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+ if(!queue.verifySessionAccess((ServerSession)session))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
@@ -1424,7 +1360,7 @@ public class ServerSessionDelegate extends SessionDelegate
result.setQueue(queue.getName());
result.setDurable(queue.isDurable());
result.setExclusive(queue.isExclusive());
- result.setAutoDelete(queue.isAutoDelete());
+ result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
Map<String, Object> arguments = new LinkedHashMap<String, Object>();
Collection<String> availableAttrs = queue.getAvailableAttributes();
@@ -1500,7 +1436,6 @@ public class ServerSessionDelegate extends SessionDelegate
public void closed(Session session)
{
setThreadSubject(session);
-
ServerSession serverSession = (ServerSession)session;
serverSession.stopSubscriptions();